Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly servers, fixing an issue in #778
This commit is contained in:
parent
8c71df53f9
commit
a816de3f23
|
@ -114,6 +114,15 @@ class PeerTracker:
|
||||||
d.addCallback(self._got_reply)
|
d.addCallback(self._got_reply)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def query_allocated(self):
|
||||||
|
d = self._storageserver.callRemote("get_buckets",
|
||||||
|
self.storage_index)
|
||||||
|
d.addCallback(self._got_allocate_reply)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _got_allocate_reply(self, buckets):
|
||||||
|
return (self.peerid, buckets)
|
||||||
|
|
||||||
def _got_reply(self, (alreadygot, buckets)):
|
def _got_reply(self, (alreadygot, buckets)):
|
||||||
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
|
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
|
||||||
b = {}
|
b = {}
|
||||||
|
@ -183,6 +192,12 @@ class Tahoe2PeerSelector:
|
||||||
self._started_second_pass = False
|
self._started_second_pass = False
|
||||||
self.use_peers = set() # PeerTrackers that have shares assigned to them
|
self.use_peers = set() # PeerTrackers that have shares assigned to them
|
||||||
self.preexisting_shares = {} # sharenum -> peerid holding the share
|
self.preexisting_shares = {} # sharenum -> peerid holding the share
|
||||||
|
# We don't try to allocate shares to these servers, since they've
|
||||||
|
# said that they're incapable of storing shares of the size that
|
||||||
|
# we'd want to store. We keep them around because they may have
|
||||||
|
# existing shares for this storage index, which we want to know
|
||||||
|
# about for accurate servers_of_happiness accounting
|
||||||
|
self.readonly_peers = []
|
||||||
|
|
||||||
peers = storage_broker.get_servers_for_index(storage_index)
|
peers = storage_broker.get_servers_for_index(storage_index)
|
||||||
if not peers:
|
if not peers:
|
||||||
|
@ -209,10 +224,10 @@ class Tahoe2PeerSelector:
|
||||||
(peerid, conn) = peer
|
(peerid, conn) = peer
|
||||||
v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
|
v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
|
||||||
return v1["maximum-immutable-share-size"]
|
return v1["maximum-immutable-share-size"]
|
||||||
peers = [peer for peer in peers
|
new_peers = [peer for peer in peers
|
||||||
if _get_maxsize(peer) >= allocated_size]
|
if _get_maxsize(peer) >= allocated_size]
|
||||||
if not peers:
|
old_peers = list(set(peers).difference(set(new_peers)))
|
||||||
raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
|
peers = new_peers
|
||||||
|
|
||||||
# decide upon the renewal/cancel secrets, to include them in the
|
# decide upon the renewal/cancel secrets, to include them in the
|
||||||
# allocate_buckets query.
|
# allocate_buckets query.
|
||||||
|
@ -223,22 +238,38 @@ class Tahoe2PeerSelector:
|
||||||
storage_index)
|
storage_index)
|
||||||
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
|
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
|
||||||
storage_index)
|
storage_index)
|
||||||
|
def _make_trackers(peers):
|
||||||
trackers = [ PeerTracker(peerid, conn,
|
return [ PeerTracker(peerid, conn,
|
||||||
share_size, block_size,
|
share_size, block_size,
|
||||||
num_segments, num_share_hashes,
|
num_segments, num_share_hashes,
|
||||||
storage_index,
|
storage_index,
|
||||||
bucket_renewal_secret_hash(file_renewal_secret,
|
bucket_renewal_secret_hash(file_renewal_secret,
|
||||||
peerid),
|
peerid),
|
||||||
bucket_cancel_secret_hash(file_cancel_secret,
|
bucket_cancel_secret_hash(file_cancel_secret,
|
||||||
peerid),
|
peerid))
|
||||||
)
|
|
||||||
for (peerid, conn) in peers]
|
for (peerid, conn) in peers]
|
||||||
self.uncontacted_peers = trackers
|
self.uncontacted_peers = _make_trackers(peers)
|
||||||
|
self.readonly_peers = _make_trackers(old_peers)
|
||||||
d = defer.maybeDeferred(self._loop)
|
# Talk to the readonly servers to get an idea of what servers
|
||||||
|
# have what shares (if any) for this storage index
|
||||||
|
d = defer.maybeDeferred(self._existing_shares)
|
||||||
|
d.addCallback(lambda ign: self._loop())
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def _existing_shares(self):
|
||||||
|
if self.readonly_peers:
|
||||||
|
peer = self.readonly_peers.pop()
|
||||||
|
assert isinstance(peer, PeerTracker)
|
||||||
|
d = peer.query_allocated()
|
||||||
|
d.addCallback(self._handle_allocate_response)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _handle_allocate_response(self, (peer, buckets)):
|
||||||
|
for bucket in buckets:
|
||||||
|
self.preexisting_shares[bucket] = peer
|
||||||
|
if self.homeless_shares:
|
||||||
|
self.homeless_shares.remove(bucket)
|
||||||
|
return self._existing_shares()
|
||||||
|
|
||||||
def _loop(self):
|
def _loop(self):
|
||||||
if not self.homeless_shares:
|
if not self.homeless_shares:
|
||||||
|
|
Loading…
Reference in New Issue