peer-selection: if we must loop, send a minimal number of queries (by asking for more than one share per peer on the second pass)

This commit is contained in:
Brian Warner 2007-09-16 01:53:00 -07:00
parent 7123ff82c1
commit 24e6ccddce
2 changed files with 36 additions and 7 deletions

View File

@ -78,6 +78,7 @@ class FakeStorageServer:
def __init__(self, mode):
self.mode = mode
self.allocated = []
self.queries = 0
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self, methname)
@ -89,6 +90,7 @@ class FakeStorageServer:
def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
sharenums, share_size, canary):
#print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
self.queries += 1
if self.mode == "full":
return (set(), {},)
elif self.mode == "already got them":
@ -304,6 +306,7 @@ class PeerSelection(unittest.TestCase):
for p in self.node.last_peers:
allocated = p.ss.allocated
self.failUnlessEqual(len(allocated), 1)
self.failUnlessEqual(p.ss.queries, 1)
d.addCallback(_check)
return d
@ -319,6 +322,7 @@ class PeerSelection(unittest.TestCase):
for p in self.node.last_peers:
allocated = p.ss.allocated
self.failUnlessEqual(len(allocated), 2)
self.failUnlessEqual(p.ss.queries, 2)
d.addCallback(_check)
return d
@ -337,14 +341,33 @@ class PeerSelection(unittest.TestCase):
allocated = p.ss.allocated
self.failUnless(len(allocated) in (1,2), len(allocated))
if len(allocated) == 1:
self.failUnlessEqual(p.ss.queries, 1)
got_one.append(p)
else:
self.failUnlessEqual(p.ss.queries, 2)
got_two.append(p)
self.failUnlessEqual(len(got_one), 49)
self.failUnlessEqual(len(got_two), 1)
d.addCallback(_check)
return d
def test_four_each(self):
# if we have 200 shares, and there are 50 peers, then each peer gets
# 4 shares. The design goal is to accomplish this with only two
# queries per peer.
data = self.get_data(SIZE_LARGE)
self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200)
d = self.u.upload_data(data)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
for p in self.node.last_peers:
allocated = p.ss.allocated
self.failUnlessEqual(len(allocated), 4)
self.failUnlessEqual(p.ss.queries, 2)
d.addCallback(_check)
return d
# TODO:
# upload with exactly 75 peers (shares_of_happiness)

View File

@ -124,7 +124,7 @@ class Tahoe2PeerSelector:
self.homeless_shares = range(total_shares)
# self.uncontacted_peers = list() # peers we haven't asked yet
self.contacted_peers = list() # peers worth asking again
self.contacted_peers = ["start"] # peers worth asking again
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
@ -191,13 +191,19 @@ class Tahoe2PeerSelector:
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask)
return d
elif self.contacted_peers:
elif len(self.contacted_peers) > 1:
# ask a peer that we've already asked.
num_shares = mathutil.div_ceil(len(self.homeless_shares),
len(self.contacted_peers))
shares_to_ask = set(self.homeless_shares[:num_shares])
self.homeless_shares[:num_shares] = []
peer = self.contacted_peers.pop(0)
if peer == "start":
# we're at the beginning of the list, so re-calculate
# shares_per_peer
num_shares = mathutil.div_ceil(len(self.homeless_shares),
len(self.contacted_peers))
self.shares_per_peer = num_shares
self.contacted_peers.append("start")
peer = self.contacted_peers.pop(0)
shares_to_ask = set(self.homeless_shares[:self.shares_per_peer])
self.homeless_shares[:self.shares_per_peer] = []
self.query_count += 1
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask)
@ -231,7 +237,7 @@ class Tahoe2PeerSelector:
log.msg("%s got error during peer selection: %s" % (peer, res))
self.error_count += 1
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
if self.uncontacted_peers or self.contacted_peers:
if self.uncontacted_peers or len(self.contacted_peers) > 1:
# there is still hope, so just loop
pass
else: