Reduce implementation-dependency of IStorageServer #1117

Merged
itamarst merged 10 commits from 3779-istorageserver-with-fewer-assumptions into master 2021-09-13 13:28:47 +00:00
8 changed files with 41 additions and 13 deletions

View File

@ -0,0 +1 @@
exarkun commented 2021-09-10 13:26:49 +00:00 (Migrated from github.com)
Review

Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?

Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?
exarkun commented 2021-09-10 13:26:49 +00:00 (Migrated from github.com)
Review

Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?

Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?
Fixed bug where share corruption events were not logged on storage servers running on Windows.
exarkun commented 2021-09-10 13:26:49 +00:00 (Migrated from github.com)
Review

Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?

Can you make it clear here that this is about behavior for a storage server running on Windows (ie, it doesn't matter where the client is running)?

View File

@ -475,7 +475,9 @@ class Share(object):
# there was corruption somewhere in the given range # there was corruption somewhere in the given range
reason = "corruption in share[%d-%d): %s" % (start, start+offset, reason = "corruption in share[%d-%d): %s" % (start, start+offset,
str(f.value)) str(f.value))
self._rref.callRemoteOnly("advise_corrupt_share", reason.encode("utf-8")) self._rref.callRemote(
"advise_corrupt_share", reason.encode("utf-8")
).addErrback(log.err, "Error from remote call to advise_corrupt_share")
def _satisfy_block_hash_tree(self, needed_hashes): def _satisfy_block_hash_tree(self, needed_hashes):
o_bh = self.actual_offsets["block_hashes"] o_bh = self.actual_offsets["block_hashes"]

View File

@ -15,7 +15,7 @@ from zope.interface import implementer
from twisted.internet import defer from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, observer, pipeline from allmydata.util import mathutil, observer, pipeline, log
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a from allmydata.storage.server import si_b2a
@ -254,8 +254,7 @@ class WriteBucketProxy(object):
return d return d
def abort(self): def abort(self):
return self._rref.callRemoteOnly("abort") self._rref.callRemote("abort").addErrback(log.err, "Error from remote call to abort an immutable write bucket")
def get_servername(self): def get_servername(self):
return self._server.get_name() return self._server.get_name()

View File

@ -607,13 +607,14 @@ class ServermapUpdater(object):
return d return d
def _do_read(self, server, storage_index, shnums, readv): def _do_read(self, server, storage_index, shnums, readv):
"""
If self._add_lease is true, a lease is added, and the result only fires
once the least has also been added.
"""
ss = server.get_storage_server() ss = server.get_storage_server()
if self._add_lease: if self._add_lease:
# send an add-lease message in parallel. The results are handled # send an add-lease message in parallel. The results are handled
# separately. This is sent before the slot_readv() so that we can # separately.
# be sure the add_lease is retired by the time slot_readv comes
# back (this relies upon our knowledge that the server code for
# add_lease is synchronous).
renew_secret = self._node.get_renewal_secret(server) renew_secret = self._node.get_renewal_secret(server)
cancel_secret = self._node.get_cancel_secret(server) cancel_secret = self._node.get_cancel_secret(server)
d2 = ss.add_lease( d2 = ss.add_lease(
@ -623,7 +624,16 @@ class ServermapUpdater(object):
) )
# we ignore success # we ignore success
d2.addErrback(self._add_lease_failed, server, storage_index) d2.addErrback(self._add_lease_failed, server, storage_index)
else:
d2 = defer.succeed(None)
d = ss.slot_readv(storage_index, shnums, readv) d = ss.slot_readv(storage_index, shnums, readv)
def passthrough(result):
# Wait for d2, but fire with result of slot_readv() regardless of
# result of d2.
return d2.addBoth(lambda _: result)
d.addCallback(passthrough)
return d return d

View File

@ -708,8 +708,10 @@ class StorageServer(service.MultiService, Referenceable):
now = time_format.iso_utc(sep="T") now = time_format.iso_utc(sep="T")
si_s = si_b2a(storage_index) si_s = si_b2a(storage_index)
# windows can't handle colons in the filename # windows can't handle colons in the filename
fn = os.path.join(self.corruption_advisory_dir, fn = os.path.join(
"%s--%s-%d" % (now, str(si_s, "utf-8"), shnum)).replace(":","") self.corruption_advisory_dir,
("%s--%s-%d" % (now, str(si_s, "utf-8"), shnum)).replace(":","")
)
with open(fn, "w") as f: with open(fn, "w") as f:
itamarst commented 2021-09-03 17:14:54 +00:00 (Migrated from github.com)
Review

This is a (easy to miss) bugfix. Previously the replace was on the whole path, thus removing all colons from paths.

This is a (easy to miss) bugfix. Previously the replace was on the _whole_ path, thus removing all colons from paths.
f.write("report: Share Corruption\n") f.write("report: Share Corruption\n")
f.write("type: %s\n" % bytes_to_native_str(share_type)) f.write("type: %s\n" % bytes_to_native_str(share_type))

View File

@ -1009,10 +1009,10 @@ class _StorageServer(object):
shnum, shnum,
reason, reason,
): ):
return self._rref.callRemoteOnly( self._rref.callRemote(
"advise_corrupt_share", "advise_corrupt_share",
share_type, share_type,
storage_index, storage_index,
shnum, shnum,
reason, reason,
) ).addErrback(log.err, "Error from remote call to advise_corrupt_share")

View File

@ -96,8 +96,14 @@ class FakeStorage(object):
shares[shnum] = f.getvalue() shares[shnum] = f.getvalue()
# This doesn't actually implement the whole interface, but adding a commented
# interface implementation annotation for grepping purposes.
#@implementer(RIStorageServer)
class FakeStorageServer(object): class FakeStorageServer(object):
"""
A fake Foolscap remote object, implemented by overriding callRemote() to
call local methods.
"""
def __init__(self, peerid, storage): def __init__(self, peerid, storage):
self.peerid = peerid self.peerid = peerid
self.storage = storage self.storage = storage

View File

@ -122,7 +122,15 @@ class SetDEPMixin(object):
} }
self.node.encoding_params = p self.node.encoding_params = p
# This doesn't actually implement the whole interface, but adding a commented
# interface implementation annotation for grepping purposes.
#@implementer(RIStorageServer)
class FakeStorageServer(object): class FakeStorageServer(object):
"""
A fake Foolscap remote object, implemented by overriding callRemote() to
call local methods.
"""
def __init__(self, mode, reactor=None): def __init__(self, mode, reactor=None):
self.mode = mode self.mode = mode
self.allocated = [] self.allocated = []