Reduce implementation-dependency of IStorageServer
#1117
|
@ -0,0 +1 @@
|
||||||
|
|||||||
|
Fixed bug where share corruption events were not logged on storage servers running on Windows.
|
||||||
Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)? Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?
|
|
@ -475,7 +475,9 @@ class Share(object):
|
||||||
# there was corruption somewhere in the given range
|
# there was corruption somewhere in the given range
|
||||||
reason = "corruption in share[%d-%d): %s" % (start, start+offset,
|
reason = "corruption in share[%d-%d): %s" % (start, start+offset,
|
||||||
str(f.value))
|
str(f.value))
|
||||||
self._rref.callRemoteOnly("advise_corrupt_share", reason.encode("utf-8"))
|
self._rref.callRemote(
|
||||||
|
"advise_corrupt_share", reason.encode("utf-8")
|
||||||
|
).addErrback(log.err, "Error from remote call to advise_corrupt_share")
|
||||||
|
|
||||||
def _satisfy_block_hash_tree(self, needed_hashes):
|
def _satisfy_block_hash_tree(self, needed_hashes):
|
||||||
o_bh = self.actual_offsets["block_hashes"]
|
o_bh = self.actual_offsets["block_hashes"]
|
||||||
|
|
|
@ -15,7 +15,7 @@ from zope.interface import implementer
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
|
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
|
||||||
FileTooLargeError, HASH_SIZE
|
FileTooLargeError, HASH_SIZE
|
||||||
from allmydata.util import mathutil, observer, pipeline
|
from allmydata.util import mathutil, observer, pipeline, log
|
||||||
from allmydata.util.assertutil import precondition
|
from allmydata.util.assertutil import precondition
|
||||||
from allmydata.storage.server import si_b2a
|
from allmydata.storage.server import si_b2a
|
||||||
|
|
||||||
|
@ -254,8 +254,7 @@ class WriteBucketProxy(object):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def abort(self):
|
def abort(self):
|
||||||
return self._rref.callRemoteOnly("abort")
|
self._rref.callRemote("abort").addErrback(log.err, "Error from remote call to abort an immutable write bucket")
|
||||||
|
|
||||||
|
|
||||||
def get_servername(self):
|
def get_servername(self):
|
||||||
return self._server.get_name()
|
return self._server.get_name()
|
||||||
|
|
|
@ -607,13 +607,14 @@ class ServermapUpdater(object):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _do_read(self, server, storage_index, shnums, readv):
|
def _do_read(self, server, storage_index, shnums, readv):
|
||||||
|
"""
|
||||||
|
If self._add_lease is true, a lease is added, and the result only fires
|
||||||
|
once the least has also been added.
|
||||||
|
"""
|
||||||
ss = server.get_storage_server()
|
ss = server.get_storage_server()
|
||||||
if self._add_lease:
|
if self._add_lease:
|
||||||
# send an add-lease message in parallel. The results are handled
|
# send an add-lease message in parallel. The results are handled
|
||||||
# separately. This is sent before the slot_readv() so that we can
|
# separately.
|
||||||
# be sure the add_lease is retired by the time slot_readv comes
|
|
||||||
# back (this relies upon our knowledge that the server code for
|
|
||||||
# add_lease is synchronous).
|
|
||||||
renew_secret = self._node.get_renewal_secret(server)
|
renew_secret = self._node.get_renewal_secret(server)
|
||||||
cancel_secret = self._node.get_cancel_secret(server)
|
cancel_secret = self._node.get_cancel_secret(server)
|
||||||
d2 = ss.add_lease(
|
d2 = ss.add_lease(
|
||||||
|
@ -623,7 +624,16 @@ class ServermapUpdater(object):
|
||||||
)
|
)
|
||||||
# we ignore success
|
# we ignore success
|
||||||
d2.addErrback(self._add_lease_failed, server, storage_index)
|
d2.addErrback(self._add_lease_failed, server, storage_index)
|
||||||
|
else:
|
||||||
|
d2 = defer.succeed(None)
|
||||||
d = ss.slot_readv(storage_index, shnums, readv)
|
d = ss.slot_readv(storage_index, shnums, readv)
|
||||||
|
|
||||||
|
def passthrough(result):
|
||||||
|
# Wait for d2, but fire with result of slot_readv() regardless of
|
||||||
|
# result of d2.
|
||||||
|
return d2.addBoth(lambda _: result)
|
||||||
|
|
||||||
|
d.addCallback(passthrough)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -708,8 +708,10 @@ class StorageServer(service.MultiService, Referenceable):
|
||||||
now = time_format.iso_utc(sep="T")
|
now = time_format.iso_utc(sep="T")
|
||||||
si_s = si_b2a(storage_index)
|
si_s = si_b2a(storage_index)
|
||||||
# windows can't handle colons in the filename
|
# windows can't handle colons in the filename
|
||||||
fn = os.path.join(self.corruption_advisory_dir,
|
fn = os.path.join(
|
||||||
"%s--%s-%d" % (now, str(si_s, "utf-8"), shnum)).replace(":","")
|
self.corruption_advisory_dir,
|
||||||
|
("%s--%s-%d" % (now, str(si_s, "utf-8"), shnum)).replace(":","")
|
||||||
|
)
|
||||||
with open(fn, "w") as f:
|
with open(fn, "w") as f:
|
||||||
This is a (easy to miss) bugfix. Previously the replace was on the whole path, thus removing all colons from paths. This is a (easy to miss) bugfix. Previously the replace was on the _whole_ path, thus removing all colons from paths.
|
|||||||
f.write("report: Share Corruption\n")
|
f.write("report: Share Corruption\n")
|
||||||
f.write("type: %s\n" % bytes_to_native_str(share_type))
|
f.write("type: %s\n" % bytes_to_native_str(share_type))
|
||||||
|
|
|
@ -1009,10 +1009,10 @@ class _StorageServer(object):
|
||||||
shnum,
|
shnum,
|
||||||
reason,
|
reason,
|
||||||
):
|
):
|
||||||
return self._rref.callRemoteOnly(
|
self._rref.callRemote(
|
||||||
"advise_corrupt_share",
|
"advise_corrupt_share",
|
||||||
share_type,
|
share_type,
|
||||||
storage_index,
|
storage_index,
|
||||||
shnum,
|
shnum,
|
||||||
reason,
|
reason,
|
||||||
)
|
).addErrback(log.err, "Error from remote call to advise_corrupt_share")
|
||||||
|
|
|
@ -96,8 +96,14 @@ class FakeStorage(object):
|
||||||
shares[shnum] = f.getvalue()
|
shares[shnum] = f.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
# This doesn't actually implement the whole interface, but adding a commented
|
||||||
|
# interface implementation annotation for grepping purposes.
|
||||||
|
#@implementer(RIStorageServer)
|
||||||
class FakeStorageServer(object):
|
class FakeStorageServer(object):
|
||||||
|
"""
|
||||||
|
A fake Foolscap remote object, implemented by overriding callRemote() to
|
||||||
|
call local methods.
|
||||||
|
"""
|
||||||
def __init__(self, peerid, storage):
|
def __init__(self, peerid, storage):
|
||||||
self.peerid = peerid
|
self.peerid = peerid
|
||||||
self.storage = storage
|
self.storage = storage
|
||||||
|
|
|
@ -122,7 +122,15 @@ class SetDEPMixin(object):
|
||||||
}
|
}
|
||||||
self.node.encoding_params = p
|
self.node.encoding_params = p
|
||||||
|
|
||||||
|
|
||||||
|
# This doesn't actually implement the whole interface, but adding a commented
|
||||||
|
# interface implementation annotation for grepping purposes.
|
||||||
|
#@implementer(RIStorageServer)
|
||||||
class FakeStorageServer(object):
|
class FakeStorageServer(object):
|
||||||
|
"""
|
||||||
|
A fake Foolscap remote object, implemented by overriding callRemote() to
|
||||||
|
call local methods.
|
||||||
|
"""
|
||||||
def __init__(self, mode, reactor=None):
|
def __init__(self, mode, reactor=None):
|
||||||
self.mode = mode
|
self.mode = mode
|
||||||
self.allocated = []
|
self.allocated = []
|
||||||
|
|
Loading…
Reference in New Issue
Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?
Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?