refactor: s/peer/server/ in immutable/upload, happinessutil.py, test_upload
No behavioral changes, just updating variable/method names and log messages. The effects outside these three files should be minimal: some exception messages changed (to say "server" instead of "peer"), and some internal class names were changed. A few things still use "peer" to minimize external changes, like UploadResults.timings["peer_selection"] and happinessutil.merge_peers, which can be changed later.
This commit is contained in:
parent
9b9ea3f739
commit
ebfcb649f9
|
@ -68,14 +68,14 @@ EXTENSION_SIZE = 1000
|
||||||
def pretty_print_shnum_to_servers(s):
|
def pretty_print_shnum_to_servers(s):
|
||||||
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
|
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
|
||||||
|
|
||||||
class PeerTracker:
|
class ServerTracker:
|
||||||
def __init__(self, peerid, storage_server,
|
def __init__(self, serverid, storage_server,
|
||||||
sharesize, blocksize, num_segments, num_share_hashes,
|
sharesize, blocksize, num_segments, num_share_hashes,
|
||||||
storage_index,
|
storage_index,
|
||||||
bucket_renewal_secret, bucket_cancel_secret):
|
bucket_renewal_secret, bucket_cancel_secret):
|
||||||
precondition(isinstance(peerid, str), peerid)
|
precondition(isinstance(serverid, str), serverid)
|
||||||
precondition(len(peerid) == 20, peerid)
|
precondition(len(serverid) == 20, serverid)
|
||||||
self.peerid = peerid
|
self.serverid = serverid
|
||||||
self._storageserver = storage_server # to an RIStorageServer
|
self._storageserver = storage_server # to an RIStorageServer
|
||||||
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
||||||
self.sharesize = sharesize
|
self.sharesize = sharesize
|
||||||
|
@ -83,7 +83,7 @@ class PeerTracker:
|
||||||
wbp = layout.make_write_bucket_proxy(None, sharesize,
|
wbp = layout.make_write_bucket_proxy(None, sharesize,
|
||||||
blocksize, num_segments,
|
blocksize, num_segments,
|
||||||
num_share_hashes,
|
num_share_hashes,
|
||||||
EXTENSION_SIZE, peerid)
|
EXTENSION_SIZE, serverid)
|
||||||
self.wbp_class = wbp.__class__ # to create more of them
|
self.wbp_class = wbp.__class__ # to create more of them
|
||||||
self.allocated_size = wbp.get_allocated_size()
|
self.allocated_size = wbp.get_allocated_size()
|
||||||
self.blocksize = blocksize
|
self.blocksize = blocksize
|
||||||
|
@ -95,8 +95,8 @@ class PeerTracker:
|
||||||
self.cancel_secret = bucket_cancel_secret
|
self.cancel_secret = bucket_cancel_secret
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return ("<PeerTracker for peer %s and SI %s>"
|
return ("<ServerTracker for server %s and SI %s>"
|
||||||
% (idlib.shortnodeid_b2a(self.peerid),
|
% (idlib.shortnodeid_b2a(self.serverid),
|
||||||
si_b2a(self.storage_index)[:5]))
|
si_b2a(self.storage_index)[:5]))
|
||||||
|
|
||||||
def query(self, sharenums):
|
def query(self, sharenums):
|
||||||
|
@ -123,7 +123,7 @@ class PeerTracker:
|
||||||
self.num_segments,
|
self.num_segments,
|
||||||
self.num_share_hashes,
|
self.num_share_hashes,
|
||||||
EXTENSION_SIZE,
|
EXTENSION_SIZE,
|
||||||
self.peerid)
|
self.serverid)
|
||||||
b[sharenum] = bp
|
b[sharenum] = bp
|
||||||
self.buckets.update(b)
|
self.buckets.update(b)
|
||||||
return (alreadygot, set(b.keys()))
|
return (alreadygot, set(b.keys()))
|
||||||
|
@ -149,58 +149,59 @@ class PeerTracker:
|
||||||
def str_shareloc(shnum, bucketwriter):
|
def str_shareloc(shnum, bucketwriter):
|
||||||
return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
|
return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
|
||||||
|
|
||||||
class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||||
|
|
||||||
def __init__(self, upload_id, logparent=None, upload_status=None):
|
def __init__(self, upload_id, logparent=None, upload_status=None):
|
||||||
self.upload_id = upload_id
|
self.upload_id = upload_id
|
||||||
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
|
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
|
||||||
# Peers that are working normally, but full.
|
# Servers that are working normally, but full.
|
||||||
self.full_count = 0
|
self.full_count = 0
|
||||||
self.error_count = 0
|
self.error_count = 0
|
||||||
self.num_peers_contacted = 0
|
self.num_servers_contacted = 0
|
||||||
self.last_failure_msg = None
|
self.last_failure_msg = None
|
||||||
self._status = IUploadStatus(upload_status)
|
self._status = IUploadStatus(upload_status)
|
||||||
log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
|
log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
|
||||||
self.log("starting", level=log.OPERATIONAL)
|
self.log("starting", level=log.OPERATIONAL)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
|
return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
|
||||||
|
|
||||||
def get_shareholders(self, storage_broker, secret_holder,
|
def get_shareholders(self, storage_broker, secret_holder,
|
||||||
storage_index, share_size, block_size,
|
storage_index, share_size, block_size,
|
||||||
num_segments, total_shares, needed_shares,
|
num_segments, total_shares, needed_shares,
|
||||||
servers_of_happiness):
|
servers_of_happiness):
|
||||||
"""
|
"""
|
||||||
@return: (upload_servers, already_peers), where upload_servers is a set of
|
@return: (upload_servers, already_servers), where upload_servers is
|
||||||
PeerTracker instances that have agreed to hold some shares
|
a set of ServerTracker instances that have agreed to hold
|
||||||
for us (the shareids are stashed inside the PeerTracker),
|
some shares for us (the shareids are stashed inside the
|
||||||
and already_peers is a dict mapping shnum to a set of peers
|
ServerTracker), and already_servers is a dict mapping shnum
|
||||||
which claim to already have the share.
|
to a set of servers which claim to already have the share.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self._status:
|
if self._status:
|
||||||
self._status.set_status("Contacting Peers..")
|
self._status.set_status("Contacting Servers..")
|
||||||
|
|
||||||
self.total_shares = total_shares
|
self.total_shares = total_shares
|
||||||
self.servers_of_happiness = servers_of_happiness
|
self.servers_of_happiness = servers_of_happiness
|
||||||
self.needed_shares = needed_shares
|
self.needed_shares = needed_shares
|
||||||
|
|
||||||
self.homeless_shares = set(range(total_shares))
|
self.homeless_shares = set(range(total_shares))
|
||||||
self.contacted_peers = [] # peers worth asking again
|
self.contacted_servers = [] # servers worth asking again
|
||||||
self.contacted_peers2 = [] # peers that we have asked again
|
self.contacted_servers2 = [] # servers that we have asked again
|
||||||
self._started_second_pass = False
|
self._started_second_pass = False
|
||||||
self.use_peers = set() # PeerTrackers that have shares assigned to them
|
self.use_servers = set() # ServerTrackers that have shares assigned
|
||||||
self.preexisting_shares = {} # shareid => set(peerids) holding shareid
|
# to them
|
||||||
|
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
|
||||||
# We don't try to allocate shares to these servers, since they've said
|
# We don't try to allocate shares to these servers, since they've said
|
||||||
# that they're incapable of storing shares of the size that we'd want
|
# that they're incapable of storing shares of the size that we'd want
|
||||||
# to store. We keep them around because they may have existing shares
|
# to store. We keep them around because they may have existing shares
|
||||||
# for this storage index, which we want to know about for accurate
|
# for this storage index, which we want to know about for accurate
|
||||||
# servers_of_happiness accounting
|
# servers_of_happiness accounting
|
||||||
# (this is eventually a list, but it is initialized later)
|
# (this is eventually a list, but it is initialized later)
|
||||||
self.readonly_peers = None
|
self.readonly_servers = None
|
||||||
# These peers have shares -- any shares -- for our SI. We keep
|
# These servers have shares -- any shares -- for our SI. We keep
|
||||||
# track of these to write an error message with them later.
|
# track of these to write an error message with them later.
|
||||||
self.peers_with_shares = set()
|
self.servers_with_shares = set()
|
||||||
|
|
||||||
# this needed_hashes computation should mirror
|
# this needed_hashes computation should mirror
|
||||||
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
|
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
|
||||||
|
@ -214,22 +215,22 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
num_share_hashes, EXTENSION_SIZE,
|
num_share_hashes, EXTENSION_SIZE,
|
||||||
None)
|
None)
|
||||||
allocated_size = wbp.get_allocated_size()
|
allocated_size = wbp.get_allocated_size()
|
||||||
all_peers = [(s.get_serverid(), s.get_rref())
|
all_servers = [(s.get_serverid(), s.get_rref())
|
||||||
for s in storage_broker.get_servers_for_psi(storage_index)]
|
for s in storage_broker.get_servers_for_psi(storage_index)]
|
||||||
if not all_peers:
|
if not all_servers:
|
||||||
raise NoServersError("client gave us zero peers")
|
raise NoServersError("client gave us zero servers")
|
||||||
|
|
||||||
# filter the list of peers according to which ones can accomodate
|
# filter the list of servers according to which ones can accomodate
|
||||||
# this request. This excludes older peers (which used a 4-byte size
|
# this request. This excludes older servers (which used a 4-byte size
|
||||||
# field) from getting large shares (for files larger than about
|
# field) from getting large shares (for files larger than about
|
||||||
# 12GiB). See #439 for details.
|
# 12GiB). See #439 for details.
|
||||||
def _get_maxsize(peer):
|
def _get_maxsize(server):
|
||||||
(peerid, conn) = peer
|
(serverid, conn) = server
|
||||||
v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
|
v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
|
||||||
return v1["maximum-immutable-share-size"]
|
return v1["maximum-immutable-share-size"]
|
||||||
writable_peers = [peer for peer in all_peers
|
writable_servers = [server for server in all_servers
|
||||||
if _get_maxsize(peer) >= allocated_size]
|
if _get_maxsize(server) >= allocated_size]
|
||||||
readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
|
readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
|
||||||
|
|
||||||
# decide upon the renewal/cancel secrets, to include them in the
|
# decide upon the renewal/cancel secrets, to include them in the
|
||||||
# allocate_buckets query.
|
# allocate_buckets query.
|
||||||
|
@ -240,61 +241,61 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
storage_index)
|
storage_index)
|
||||||
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
|
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
|
||||||
storage_index)
|
storage_index)
|
||||||
def _make_trackers(peers):
|
def _make_trackers(servers):
|
||||||
return [PeerTracker(peerid, conn,
|
return [ServerTracker(serverid, conn,
|
||||||
share_size, block_size,
|
share_size, block_size,
|
||||||
num_segments, num_share_hashes,
|
num_segments, num_share_hashes,
|
||||||
storage_index,
|
storage_index,
|
||||||
bucket_renewal_secret_hash(file_renewal_secret,
|
bucket_renewal_secret_hash(file_renewal_secret,
|
||||||
peerid),
|
serverid),
|
||||||
bucket_cancel_secret_hash(file_cancel_secret,
|
bucket_cancel_secret_hash(file_cancel_secret,
|
||||||
peerid))
|
serverid))
|
||||||
for (peerid, conn) in peers]
|
for (serverid, conn) in servers]
|
||||||
self.uncontacted_peers = _make_trackers(writable_peers)
|
self.uncontacted_servers = _make_trackers(writable_servers)
|
||||||
self.readonly_peers = _make_trackers(readonly_peers)
|
self.readonly_servers = _make_trackers(readonly_servers)
|
||||||
# We now ask peers that can't hold any new shares about existing
|
# We now ask servers that can't hold any new shares about existing
|
||||||
# shares that they might have for our SI. Once this is done, we
|
# shares that they might have for our SI. Once this is done, we
|
||||||
# start placing the shares that we haven't already accounted
|
# start placing the shares that we haven't already accounted
|
||||||
# for.
|
# for.
|
||||||
ds = []
|
ds = []
|
||||||
if self._status and self.readonly_peers:
|
if self._status and self.readonly_servers:
|
||||||
self._status.set_status("Contacting readonly peers to find "
|
self._status.set_status("Contacting readonly servers to find "
|
||||||
"any existing shares")
|
"any existing shares")
|
||||||
for peer in self.readonly_peers:
|
for server in self.readonly_servers:
|
||||||
assert isinstance(peer, PeerTracker)
|
assert isinstance(server, ServerTracker)
|
||||||
d = peer.ask_about_existing_shares()
|
d = server.ask_about_existing_shares()
|
||||||
d.addBoth(self._handle_existing_response, peer.peerid)
|
d.addBoth(self._handle_existing_response, server.serverid)
|
||||||
ds.append(d)
|
ds.append(d)
|
||||||
self.num_peers_contacted += 1
|
self.num_servers_contacted += 1
|
||||||
self.query_count += 1
|
self.query_count += 1
|
||||||
self.log("asking peer %s for any existing shares" %
|
self.log("asking server %s for any existing shares" %
|
||||||
(idlib.shortnodeid_b2a(peer.peerid),),
|
(idlib.shortnodeid_b2a(server.serverid),),
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
dl = defer.DeferredList(ds)
|
dl = defer.DeferredList(ds)
|
||||||
dl.addCallback(lambda ign: self._loop())
|
dl.addCallback(lambda ign: self._loop())
|
||||||
return dl
|
return dl
|
||||||
|
|
||||||
|
|
||||||
def _handle_existing_response(self, res, peer):
|
def _handle_existing_response(self, res, server):
|
||||||
"""
|
"""
|
||||||
I handle responses to the queries sent by
|
I handle responses to the queries sent by
|
||||||
Tahoe2PeerSelector._existing_shares.
|
Tahoe2ServerSelector._existing_shares.
|
||||||
"""
|
"""
|
||||||
if isinstance(res, failure.Failure):
|
if isinstance(res, failure.Failure):
|
||||||
self.log("%s got error during existing shares check: %s"
|
self.log("%s got error during existing shares check: %s"
|
||||||
% (idlib.shortnodeid_b2a(peer), res),
|
% (idlib.shortnodeid_b2a(server), res),
|
||||||
level=log.UNUSUAL)
|
level=log.UNUSUAL)
|
||||||
self.error_count += 1
|
self.error_count += 1
|
||||||
self.bad_query_count += 1
|
self.bad_query_count += 1
|
||||||
else:
|
else:
|
||||||
buckets = res
|
buckets = res
|
||||||
if buckets:
|
if buckets:
|
||||||
self.peers_with_shares.add(peer)
|
self.servers_with_shares.add(server)
|
||||||
self.log("response to get_buckets() from peer %s: alreadygot=%s"
|
self.log("response to get_buckets() from server %s: alreadygot=%s"
|
||||||
% (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
|
% (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))),
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
for bucket in buckets:
|
for bucket in buckets:
|
||||||
self.preexisting_shares.setdefault(bucket, set()).add(peer)
|
self.preexisting_shares.setdefault(bucket, set()).add(server)
|
||||||
self.homeless_shares.discard(bucket)
|
self.homeless_shares.discard(bucket)
|
||||||
self.full_count += 1
|
self.full_count += 1
|
||||||
self.bad_query_count += 1
|
self.bad_query_count += 1
|
||||||
|
@ -310,36 +311,37 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
len(self.homeless_shares)))
|
len(self.homeless_shares)))
|
||||||
return (msg + "want to place shares on at least %d servers such that "
|
return (msg + "want to place shares on at least %d servers such that "
|
||||||
"any %d of them have enough shares to recover the file, "
|
"any %d of them have enough shares to recover the file, "
|
||||||
"sent %d queries to %d peers, "
|
"sent %d queries to %d servers, "
|
||||||
"%d queries placed some shares, %d placed none "
|
"%d queries placed some shares, %d placed none "
|
||||||
"(of which %d placed none due to the server being"
|
"(of which %d placed none due to the server being"
|
||||||
" full and %d placed none due to an error)" %
|
" full and %d placed none due to an error)" %
|
||||||
(self.servers_of_happiness, self.needed_shares,
|
(self.servers_of_happiness, self.needed_shares,
|
||||||
self.query_count, self.num_peers_contacted,
|
self.query_count, self.num_servers_contacted,
|
||||||
self.good_query_count, self.bad_query_count,
|
self.good_query_count, self.bad_query_count,
|
||||||
self.full_count, self.error_count))
|
self.full_count, self.error_count))
|
||||||
|
|
||||||
|
|
||||||
def _loop(self):
|
def _loop(self):
|
||||||
if not self.homeless_shares:
|
if not self.homeless_shares:
|
||||||
merged = merge_peers(self.preexisting_shares, self.use_peers)
|
merged = merge_peers(self.preexisting_shares, self.use_servers)
|
||||||
effective_happiness = servers_of_happiness(merged)
|
effective_happiness = servers_of_happiness(merged)
|
||||||
if self.servers_of_happiness <= effective_happiness:
|
if self.servers_of_happiness <= effective_happiness:
|
||||||
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
|
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
|
||||||
"self.use_peers: %s, self.preexisting_shares: %s") \
|
"self.use_servers: %s, self.preexisting_shares: %s") \
|
||||||
% (self, self._get_progress_message(),
|
% (self, self._get_progress_message(),
|
||||||
pretty_print_shnum_to_servers(merged),
|
pretty_print_shnum_to_servers(merged),
|
||||||
[', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
|
[', '.join([str_shareloc(k,v)
|
||||||
for p in self.use_peers],
|
for k,v in s.buckets.iteritems()])
|
||||||
|
for s in self.use_servers],
|
||||||
pretty_print_shnum_to_servers(self.preexisting_shares))
|
pretty_print_shnum_to_servers(self.preexisting_shares))
|
||||||
self.log(msg, level=log.OPERATIONAL)
|
self.log(msg, level=log.OPERATIONAL)
|
||||||
return (self.use_peers, self.preexisting_shares)
|
return (self.use_servers, self.preexisting_shares)
|
||||||
else:
|
else:
|
||||||
# We're not okay right now, but maybe we can fix it by
|
# We're not okay right now, but maybe we can fix it by
|
||||||
# redistributing some shares. In cases where one or two
|
# redistributing some shares. In cases where one or two
|
||||||
# servers has, before the upload, all or most of the
|
# servers has, before the upload, all or most of the
|
||||||
# shares for a given SI, this can work by allowing _loop
|
# shares for a given SI, this can work by allowing _loop
|
||||||
# a chance to spread those out over the other peers,
|
# a chance to spread those out over the other servers,
|
||||||
delta = self.servers_of_happiness - effective_happiness
|
delta = self.servers_of_happiness - effective_happiness
|
||||||
shares = shares_by_server(self.preexisting_shares)
|
shares = shares_by_server(self.preexisting_shares)
|
||||||
# Each server in shares maps to a set of shares stored on it.
|
# Each server in shares maps to a set of shares stored on it.
|
||||||
|
@ -350,7 +352,7 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
shares_to_spread = sum([len(list(sharelist)) - 1
|
shares_to_spread = sum([len(list(sharelist)) - 1
|
||||||
for (server, sharelist)
|
for (server, sharelist)
|
||||||
in shares.items()])
|
in shares.items()])
|
||||||
if delta <= len(self.uncontacted_peers) and \
|
if delta <= len(self.uncontacted_servers) and \
|
||||||
shares_to_spread >= delta:
|
shares_to_spread >= delta:
|
||||||
items = shares.items()
|
items = shares.items()
|
||||||
while len(self.homeless_shares) < delta:
|
while len(self.homeless_shares) < delta:
|
||||||
|
@ -366,13 +368,13 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
if not self.preexisting_shares[share]:
|
if not self.preexisting_shares[share]:
|
||||||
del self.preexisting_shares[share]
|
del self.preexisting_shares[share]
|
||||||
items.append((server, sharelist))
|
items.append((server, sharelist))
|
||||||
for writer in self.use_peers:
|
for writer in self.use_servers:
|
||||||
writer.abort_some_buckets(self.homeless_shares)
|
writer.abort_some_buckets(self.homeless_shares)
|
||||||
return self._loop()
|
return self._loop()
|
||||||
else:
|
else:
|
||||||
# Redistribution won't help us; fail.
|
# Redistribution won't help us; fail.
|
||||||
peer_count = len(self.peers_with_shares)
|
server_count = len(self.servers_with_shares)
|
||||||
failmsg = failure_message(peer_count,
|
failmsg = failure_message(server_count,
|
||||||
self.needed_shares,
|
self.needed_shares,
|
||||||
self.servers_of_happiness,
|
self.servers_of_happiness,
|
||||||
effective_happiness)
|
effective_happiness)
|
||||||
|
@ -386,63 +388,62 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
self.log(servmsg, level=log.INFREQUENT)
|
self.log(servmsg, level=log.INFREQUENT)
|
||||||
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
|
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
|
||||||
|
|
||||||
if self.uncontacted_peers:
|
if self.uncontacted_servers:
|
||||||
peer = self.uncontacted_peers.pop(0)
|
server = self.uncontacted_servers.pop(0)
|
||||||
# TODO: don't pre-convert all peerids to PeerTrackers
|
# TODO: don't pre-convert all serverids to ServerTrackers
|
||||||
assert isinstance(peer, PeerTracker)
|
assert isinstance(server, ServerTracker)
|
||||||
|
|
||||||
shares_to_ask = set(sorted(self.homeless_shares)[:1])
|
shares_to_ask = set(sorted(self.homeless_shares)[:1])
|
||||||
self.homeless_shares -= shares_to_ask
|
self.homeless_shares -= shares_to_ask
|
||||||
self.query_count += 1
|
self.query_count += 1
|
||||||
self.num_peers_contacted += 1
|
self.num_servers_contacted += 1
|
||||||
if self._status:
|
if self._status:
|
||||||
self._status.set_status("Contacting Peers [%s] (first query),"
|
self._status.set_status("Contacting Servers [%s] (first query),"
|
||||||
" %d shares left.."
|
" %d shares left.."
|
||||||
% (idlib.shortnodeid_b2a(peer.peerid),
|
% (idlib.shortnodeid_b2a(server.serverid),
|
||||||
len(self.homeless_shares)))
|
len(self.homeless_shares)))
|
||||||
d = peer.query(shares_to_ask)
|
d = server.query(shares_to_ask)
|
||||||
d.addBoth(self._got_response, peer, shares_to_ask,
|
d.addBoth(self._got_response, server, shares_to_ask,
|
||||||
self.contacted_peers)
|
self.contacted_servers)
|
||||||
return d
|
return d
|
||||||
elif self.contacted_peers:
|
elif self.contacted_servers:
|
||||||
# ask a peer that we've already asked.
|
# ask a server that we've already asked.
|
||||||
if not self._started_second_pass:
|
if not self._started_second_pass:
|
||||||
self.log("starting second pass",
|
self.log("starting second pass",
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
self._started_second_pass = True
|
self._started_second_pass = True
|
||||||
num_shares = mathutil.div_ceil(len(self.homeless_shares),
|
num_shares = mathutil.div_ceil(len(self.homeless_shares),
|
||||||
len(self.contacted_peers))
|
len(self.contacted_servers))
|
||||||
peer = self.contacted_peers.pop(0)
|
server = self.contacted_servers.pop(0)
|
||||||
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
|
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
|
||||||
self.homeless_shares -= shares_to_ask
|
self.homeless_shares -= shares_to_ask
|
||||||
self.query_count += 1
|
self.query_count += 1
|
||||||
if self._status:
|
if self._status:
|
||||||
self._status.set_status("Contacting Peers [%s] (second query),"
|
self._status.set_status("Contacting Servers [%s] (second query),"
|
||||||
" %d shares left.."
|
" %d shares left.."
|
||||||
% (idlib.shortnodeid_b2a(peer.peerid),
|
% (idlib.shortnodeid_b2a(server.serverid),
|
||||||
len(self.homeless_shares)))
|
len(self.homeless_shares)))
|
||||||
d = peer.query(shares_to_ask)
|
d = server.query(shares_to_ask)
|
||||||
d.addBoth(self._got_response, peer, shares_to_ask,
|
d.addBoth(self._got_response, server, shares_to_ask,
|
||||||
self.contacted_peers2)
|
self.contacted_servers2)
|
||||||
return d
|
return d
|
||||||
elif self.contacted_peers2:
|
elif self.contacted_servers2:
|
||||||
# we've finished the second-or-later pass. Move all the remaining
|
# we've finished the second-or-later pass. Move all the remaining
|
||||||
# peers back into self.contacted_peers for the next pass.
|
# servers back into self.contacted_servers for the next pass.
|
||||||
self.contacted_peers.extend(self.contacted_peers2)
|
self.contacted_servers.extend(self.contacted_servers2)
|
||||||
self.contacted_peers2[:] = []
|
self.contacted_servers2[:] = []
|
||||||
return self._loop()
|
return self._loop()
|
||||||
else:
|
else:
|
||||||
# no more peers. If we haven't placed enough shares, we fail.
|
# no more servers. If we haven't placed enough shares, we fail.
|
||||||
merged = merge_peers(self.preexisting_shares, self.use_peers)
|
merged = merge_peers(self.preexisting_shares, self.use_servers)
|
||||||
effective_happiness = servers_of_happiness(merged)
|
effective_happiness = servers_of_happiness(merged)
|
||||||
if effective_happiness < self.servers_of_happiness:
|
if effective_happiness < self.servers_of_happiness:
|
||||||
msg = failure_message(len(self.peers_with_shares),
|
msg = failure_message(len(self.servers_with_shares),
|
||||||
self.needed_shares,
|
self.needed_shares,
|
||||||
self.servers_of_happiness,
|
self.servers_of_happiness,
|
||||||
effective_happiness)
|
effective_happiness)
|
||||||
msg = ("peer selection failed for %s: %s (%s)" % (self,
|
msg = ("server selection failed for %s: %s (%s)" %
|
||||||
msg,
|
(self, msg, self._get_progress_message()))
|
||||||
self._get_progress_message()))
|
|
||||||
if self.last_failure_msg:
|
if self.last_failure_msg:
|
||||||
msg += " (%s)" % (self.last_failure_msg,)
|
msg += " (%s)" % (self.last_failure_msg,)
|
||||||
self.log(msg, level=log.UNUSUAL)
|
self.log(msg, level=log.UNUSUAL)
|
||||||
|
@ -454,53 +455,53 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
|
msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
|
||||||
self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
|
self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
|
||||||
self.log(msg, level=log.OPERATIONAL)
|
self.log(msg, level=log.OPERATIONAL)
|
||||||
return (self.use_peers, self.preexisting_shares)
|
return (self.use_servers, self.preexisting_shares)
|
||||||
|
|
||||||
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
|
def _got_response(self, res, server, shares_to_ask, put_server_here):
|
||||||
if isinstance(res, failure.Failure):
|
if isinstance(res, failure.Failure):
|
||||||
# This is unusual, and probably indicates a bug or a network
|
# This is unusual, and probably indicates a bug or a network
|
||||||
# problem.
|
# problem.
|
||||||
self.log("%s got error during peer selection: %s" % (peer, res),
|
self.log("%s got error during server selection: %s" % (server, res),
|
||||||
level=log.UNUSUAL)
|
level=log.UNUSUAL)
|
||||||
self.error_count += 1
|
self.error_count += 1
|
||||||
self.bad_query_count += 1
|
self.bad_query_count += 1
|
||||||
self.homeless_shares |= shares_to_ask
|
self.homeless_shares |= shares_to_ask
|
||||||
if (self.uncontacted_peers
|
if (self.uncontacted_servers
|
||||||
or self.contacted_peers
|
or self.contacted_servers
|
||||||
or self.contacted_peers2):
|
or self.contacted_servers2):
|
||||||
# there is still hope, so just loop
|
# there is still hope, so just loop
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
# No more peers, so this upload might fail (it depends upon
|
# No more servers, so this upload might fail (it depends upon
|
||||||
# whether we've hit servers_of_happiness or not). Log the last
|
# whether we've hit servers_of_happiness or not). Log the last
|
||||||
# failure we got: if a coding error causes all peers to fail
|
# failure we got: if a coding error causes all servers to fail
|
||||||
# in the same way, this allows the common failure to be seen
|
# in the same way, this allows the common failure to be seen
|
||||||
# by the uploader and should help with debugging
|
# by the uploader and should help with debugging
|
||||||
msg = ("last failure (from %s) was: %s" % (peer, res))
|
msg = ("last failure (from %s) was: %s" % (server, res))
|
||||||
self.last_failure_msg = msg
|
self.last_failure_msg = msg
|
||||||
else:
|
else:
|
||||||
(alreadygot, allocated) = res
|
(alreadygot, allocated) = res
|
||||||
self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
|
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
|
||||||
% (idlib.shortnodeid_b2a(peer.peerid),
|
% (idlib.shortnodeid_b2a(server.serverid),
|
||||||
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
|
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
progress = False
|
progress = False
|
||||||
for s in alreadygot:
|
for s in alreadygot:
|
||||||
self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
|
self.preexisting_shares.setdefault(s, set()).add(server.serverid)
|
||||||
if s in self.homeless_shares:
|
if s in self.homeless_shares:
|
||||||
self.homeless_shares.remove(s)
|
self.homeless_shares.remove(s)
|
||||||
progress = True
|
progress = True
|
||||||
elif s in shares_to_ask:
|
elif s in shares_to_ask:
|
||||||
progress = True
|
progress = True
|
||||||
|
|
||||||
# the PeerTracker will remember which shares were allocated on
|
# the ServerTracker will remember which shares were allocated on
|
||||||
# that peer. We just have to remember to use them.
|
# that peer. We just have to remember to use them.
|
||||||
if allocated:
|
if allocated:
|
||||||
self.use_peers.add(peer)
|
self.use_servers.add(server)
|
||||||
progress = True
|
progress = True
|
||||||
|
|
||||||
if allocated or alreadygot:
|
if allocated or alreadygot:
|
||||||
self.peers_with_shares.add(peer.peerid)
|
self.servers_with_shares.add(server.serverid)
|
||||||
|
|
||||||
not_yet_present = set(shares_to_ask) - set(alreadygot)
|
not_yet_present = set(shares_to_ask) - set(alreadygot)
|
||||||
still_homeless = not_yet_present - set(allocated)
|
still_homeless = not_yet_present - set(allocated)
|
||||||
|
@ -517,11 +518,11 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
|
|
||||||
if still_homeless:
|
if still_homeless:
|
||||||
# In networks with lots of space, this is very unusual and
|
# In networks with lots of space, this is very unusual and
|
||||||
# probably indicates an error. In networks with peers that
|
# probably indicates an error. In networks with servers that
|
||||||
# are full, it is merely unusual. In networks that are very
|
# are full, it is merely unusual. In networks that are very
|
||||||
# full, it is common, and many uploads will fail. In most
|
# full, it is common, and many uploads will fail. In most
|
||||||
# cases, this is obviously not fatal, and we'll just use some
|
# cases, this is obviously not fatal, and we'll just use some
|
||||||
# other peers.
|
# other servers.
|
||||||
|
|
||||||
# some shares are still homeless, keep trying to find them a
|
# some shares are still homeless, keep trying to find them a
|
||||||
# home. The ones that were rejected get first priority.
|
# home. The ones that were rejected get first priority.
|
||||||
|
@ -531,7 +532,7 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
else:
|
else:
|
||||||
# if they *were* able to accept everything, they might be
|
# if they *were* able to accept everything, they might be
|
||||||
# willing to accept even more.
|
# willing to accept even more.
|
||||||
put_peer_here.append(peer)
|
put_server_here.append(server)
|
||||||
|
|
||||||
# now loop
|
# now loop
|
||||||
return self._loop()
|
return self._loop()
|
||||||
|
@ -539,15 +540,15 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
|
||||||
|
|
||||||
def _failed(self, msg):
|
def _failed(self, msg):
|
||||||
"""
|
"""
|
||||||
I am called when peer selection fails. I first abort all of the
|
I am called when server selection fails. I first abort all of the
|
||||||
remote buckets that I allocated during my unsuccessful attempt to
|
remote buckets that I allocated during my unsuccessful attempt to
|
||||||
place shares for this file. I then raise an
|
place shares for this file. I then raise an
|
||||||
UploadUnhappinessError with my msg argument.
|
UploadUnhappinessError with my msg argument.
|
||||||
"""
|
"""
|
||||||
for peer in self.use_peers:
|
for server in self.use_servers:
|
||||||
assert isinstance(peer, PeerTracker)
|
assert isinstance(server, ServerTracker)
|
||||||
|
|
||||||
peer.abort()
|
server.abort()
|
||||||
|
|
||||||
raise UploadUnhappinessError(msg)
|
raise UploadUnhappinessError(msg)
|
||||||
|
|
||||||
|
@ -825,10 +826,10 @@ class UploadStatus:
|
||||||
self.results = value
|
self.results = value
|
||||||
|
|
||||||
class CHKUploader:
|
class CHKUploader:
|
||||||
peer_selector_class = Tahoe2PeerSelector
|
server_selector_class = Tahoe2ServerSelector
|
||||||
|
|
||||||
def __init__(self, storage_broker, secret_holder):
|
def __init__(self, storage_broker, secret_holder):
|
||||||
# peer_selector needs storage_broker and secret_holder
|
# server_selector needs storage_broker and secret_holder
|
||||||
self._storage_broker = storage_broker
|
self._storage_broker = storage_broker
|
||||||
self._secret_holder = secret_holder
|
self._secret_holder = secret_holder
|
||||||
self._log_number = self.log("CHKUploader starting", parent=None)
|
self._log_number = self.log("CHKUploader starting", parent=None)
|
||||||
|
@ -841,7 +842,7 @@ class CHKUploader:
|
||||||
self._upload_status.set_results(self._results)
|
self._upload_status.set_results(self._results)
|
||||||
|
|
||||||
# locate_all_shareholders() will create the following attribute:
|
# locate_all_shareholders() will create the following attribute:
|
||||||
# self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
|
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
|
||||||
|
|
||||||
def log(self, *args, **kwargs):
|
def log(self, *args, **kwargs):
|
||||||
if "parent" not in kwargs:
|
if "parent" not in kwargs:
|
||||||
|
@ -892,7 +893,7 @@ class CHKUploader:
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def locate_all_shareholders(self, encoder, started):
|
def locate_all_shareholders(self, encoder, started):
|
||||||
peer_selection_started = now = time.time()
|
server_selection_started = now = time.time()
|
||||||
self._storage_index_elapsed = now - started
|
self._storage_index_elapsed = now - started
|
||||||
storage_broker = self._storage_broker
|
storage_broker = self._storage_broker
|
||||||
secret_holder = self._secret_holder
|
secret_holder = self._secret_holder
|
||||||
|
@ -900,7 +901,8 @@ class CHKUploader:
|
||||||
self._storage_index = storage_index
|
self._storage_index = storage_index
|
||||||
upload_id = si_b2a(storage_index)[:5]
|
upload_id = si_b2a(storage_index)[:5]
|
||||||
self.log("using storage index %s" % upload_id)
|
self.log("using storage index %s" % upload_id)
|
||||||
peer_selector = self.peer_selector_class(upload_id, self._log_number,
|
server_selector = self.server_selector_class(upload_id,
|
||||||
|
self._log_number,
|
||||||
self._upload_status)
|
self._upload_status)
|
||||||
|
|
||||||
share_size = encoder.get_param("share_size")
|
share_size = encoder.get_param("share_size")
|
||||||
|
@ -908,47 +910,49 @@ class CHKUploader:
|
||||||
num_segments = encoder.get_param("num_segments")
|
num_segments = encoder.get_param("num_segments")
|
||||||
k,desired,n = encoder.get_param("share_counts")
|
k,desired,n = encoder.get_param("share_counts")
|
||||||
|
|
||||||
self._peer_selection_started = time.time()
|
self._server_selection_started = time.time()
|
||||||
d = peer_selector.get_shareholders(storage_broker, secret_holder,
|
d = server_selector.get_shareholders(storage_broker, secret_holder,
|
||||||
storage_index,
|
storage_index,
|
||||||
share_size, block_size,
|
share_size, block_size,
|
||||||
num_segments, n, k, desired)
|
num_segments, n, k, desired)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
self._peer_selection_elapsed = time.time() - peer_selection_started
|
self._server_selection_elapsed = time.time() - server_selection_started
|
||||||
return res
|
return res
|
||||||
d.addCallback(_done)
|
d.addCallback(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def set_shareholders(self, (upload_servers, already_peers), encoder):
|
def set_shareholders(self, (upload_servers, already_servers), encoder):
|
||||||
"""
|
"""
|
||||||
@param upload_servers: a sequence of PeerTracker objects that have agreed to hold some
|
@param upload_servers: a sequence of ServerTracker objects that
|
||||||
shares for us (the shareids are stashed inside the PeerTracker)
|
have agreed to hold some shares for us (the
|
||||||
@paran already_peers: a dict mapping sharenum to a set of peerids
|
shareids are stashed inside the ServerTracker)
|
||||||
|
@paran already_servers: a dict mapping sharenum to a set of serverids
|
||||||
that claim to already have this share
|
that claim to already have this share
|
||||||
"""
|
"""
|
||||||
msgtempl = "set_shareholders; upload_servers is %s, already_peers is %s"
|
msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s"
|
||||||
values = ([', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
|
values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()])
|
||||||
for p in upload_servers], already_peers)
|
for s in upload_servers], already_servers)
|
||||||
self.log(msgtempl % values, level=log.OPERATIONAL)
|
self.log(msgtempl % values, level=log.OPERATIONAL)
|
||||||
# record already-present shares in self._results
|
# record already-present shares in self._results
|
||||||
self._results.preexisting_shares = len(already_peers)
|
self._results.preexisting_shares = len(already_servers)
|
||||||
|
|
||||||
self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
|
self._server_trackers = {} # k: shnum, v: instance of ServerTracker
|
||||||
for peer in upload_servers:
|
for server in upload_servers:
|
||||||
assert isinstance(peer, PeerTracker)
|
assert isinstance(server, ServerTracker)
|
||||||
buckets = {}
|
buckets = {}
|
||||||
servermap = already_peers.copy()
|
servermap = already_servers.copy()
|
||||||
for peer in upload_servers:
|
for server in upload_servers:
|
||||||
buckets.update(peer.buckets)
|
buckets.update(server.buckets)
|
||||||
for shnum in peer.buckets:
|
for shnum in server.buckets:
|
||||||
self._peer_trackers[shnum] = peer
|
self._server_trackers[shnum] = server
|
||||||
servermap.setdefault(shnum, set()).add(peer.peerid)
|
servermap.setdefault(shnum, set()).add(server.serverid)
|
||||||
assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), \
|
assert len(buckets) == sum([len(server.buckets)
|
||||||
|
for server in upload_servers]), \
|
||||||
"%s (%s) != %s (%s)" % (
|
"%s (%s) != %s (%s)" % (
|
||||||
len(buckets),
|
len(buckets),
|
||||||
buckets,
|
buckets,
|
||||||
sum([len(peer.buckets) for peer in upload_servers]),
|
sum([len(server.buckets) for server in upload_servers]),
|
||||||
[(p.buckets, p.peerid) for p in upload_servers]
|
[(s.buckets, s.serverid) for s in upload_servers]
|
||||||
)
|
)
|
||||||
encoder.set_shareholders(buckets, servermap)
|
encoder.set_shareholders(buckets, servermap)
|
||||||
|
|
||||||
|
@ -956,16 +960,16 @@ class CHKUploader:
|
||||||
""" Returns a Deferred that will fire with the UploadResults instance. """
|
""" Returns a Deferred that will fire with the UploadResults instance. """
|
||||||
r = self._results
|
r = self._results
|
||||||
for shnum in self._encoder.get_shares_placed():
|
for shnum in self._encoder.get_shares_placed():
|
||||||
peer_tracker = self._peer_trackers[shnum]
|
server_tracker = self._server_trackers[shnum]
|
||||||
peerid = peer_tracker.peerid
|
serverid = server_tracker.serverid
|
||||||
r.sharemap.add(shnum, peerid)
|
r.sharemap.add(shnum, serverid)
|
||||||
r.servermap.add(peerid, shnum)
|
r.servermap.add(serverid, shnum)
|
||||||
r.pushed_shares = len(self._encoder.get_shares_placed())
|
r.pushed_shares = len(self._encoder.get_shares_placed())
|
||||||
now = time.time()
|
now = time.time()
|
||||||
r.file_size = self._encoder.file_size
|
r.file_size = self._encoder.file_size
|
||||||
r.timings["total"] = now - self._started
|
r.timings["total"] = now - self._started
|
||||||
r.timings["storage_index"] = self._storage_index_elapsed
|
r.timings["storage_index"] = self._storage_index_elapsed
|
||||||
r.timings["peer_selection"] = self._peer_selection_elapsed
|
r.timings["peer_selection"] = self._server_selection_elapsed
|
||||||
r.timings.update(self._encoder.get_times())
|
r.timings.update(self._encoder.get_times())
|
||||||
r.uri_extension_data = self._encoder.get_uri_extension_data()
|
r.uri_extension_data = self._encoder.get_uri_extension_data()
|
||||||
r.verifycapstr = verifycap.to_string()
|
r.verifycapstr = verifycap.to_string()
|
||||||
|
|
|
@ -193,12 +193,12 @@ class FakeClient:
|
||||||
self.num_servers = num_servers
|
self.num_servers = num_servers
|
||||||
if type(mode) is str:
|
if type(mode) is str:
|
||||||
mode = dict([i,mode] for i in range(num_servers))
|
mode = dict([i,mode] for i in range(num_servers))
|
||||||
peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
|
servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
|
||||||
for fakeid in range(self.num_servers) ]
|
for fakeid in range(self.num_servers) ]
|
||||||
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
|
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
|
||||||
for (serverid, rref) in peers:
|
for (serverid, rref) in servers:
|
||||||
self.storage_broker.test_add_rref(serverid, rref)
|
self.storage_broker.test_add_rref(serverid, rref)
|
||||||
self.last_peers = [p[1] for p in peers]
|
self.last_servers = [s[1] for s in servers]
|
||||||
|
|
||||||
def log(self, *args, **kwargs):
|
def log(self, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
@ -411,7 +411,7 @@ class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
|
||||||
def test_first_error_all(self):
|
def test_first_error_all(self):
|
||||||
self.make_node("first-fail")
|
self.make_node("first-fail")
|
||||||
d = self.shouldFail(UploadUnhappinessError, "first_error_all",
|
d = self.shouldFail(UploadUnhappinessError, "first_error_all",
|
||||||
"peer selection failed",
|
"server selection failed",
|
||||||
upload_data, self.u, DATA)
|
upload_data, self.u, DATA)
|
||||||
def _check((f,)):
|
def _check((f,)):
|
||||||
self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
|
self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
|
||||||
|
@ -443,7 +443,7 @@ class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
|
||||||
def test_second_error_all(self):
|
def test_second_error_all(self):
|
||||||
self.make_node("second-fail")
|
self.make_node("second-fail")
|
||||||
d = self.shouldFail(UploadUnhappinessError, "second_error_all",
|
d = self.shouldFail(UploadUnhappinessError, "second_error_all",
|
||||||
"peer selection failed",
|
"server selection failed",
|
||||||
upload_data, self.u, DATA)
|
upload_data, self.u, DATA)
|
||||||
def _check((f,)):
|
def _check((f,)):
|
||||||
self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
|
self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
|
||||||
|
@ -468,7 +468,7 @@ class FullServer(unittest.TestCase):
|
||||||
d.addBoth(self._should_fail)
|
d.addBoth(self._should_fail)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
class PeerSelection(unittest.TestCase):
|
class ServerSelection(unittest.TestCase):
|
||||||
|
|
||||||
def make_client(self, num_servers=50):
|
def make_client(self, num_servers=50):
|
||||||
self.node = FakeClient(mode="good", num_servers=num_servers)
|
self.node = FakeClient(mode="good", num_servers=num_servers)
|
||||||
|
@ -497,8 +497,8 @@ class PeerSelection(unittest.TestCase):
|
||||||
self.node.DEFAULT_ENCODING_PARAMETERS = p
|
self.node.DEFAULT_ENCODING_PARAMETERS = p
|
||||||
|
|
||||||
def test_one_each(self):
|
def test_one_each(self):
|
||||||
# if we have 50 shares, and there are 50 peers, and they all accept a
|
# if we have 50 shares, and there are 50 servers, and they all accept
|
||||||
# share, we should get exactly one share per peer
|
# a share, we should get exactly one share per server
|
||||||
|
|
||||||
self.make_client()
|
self.make_client()
|
||||||
data = self.get_data(SIZE_LARGE)
|
data = self.get_data(SIZE_LARGE)
|
||||||
|
@ -507,35 +507,35 @@ class PeerSelection(unittest.TestCase):
|
||||||
d.addCallback(extract_uri)
|
d.addCallback(extract_uri)
|
||||||
d.addCallback(self._check_large, SIZE_LARGE)
|
d.addCallback(self._check_large, SIZE_LARGE)
|
||||||
def _check(res):
|
def _check(res):
|
||||||
for p in self.node.last_peers:
|
for s in self.node.last_servers:
|
||||||
allocated = p.allocated
|
allocated = s.allocated
|
||||||
self.failUnlessEqual(len(allocated), 1)
|
self.failUnlessEqual(len(allocated), 1)
|
||||||
self.failUnlessEqual(p.queries, 1)
|
self.failUnlessEqual(s.queries, 1)
|
||||||
d.addCallback(_check)
|
d.addCallback(_check)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_two_each(self):
|
def test_two_each(self):
|
||||||
# if we have 100 shares, and there are 50 peers, and they all accept
|
# if we have 100 shares, and there are 50 servers, and they all
|
||||||
# all shares, we should get exactly two shares per peer
|
# accept all shares, we should get exactly two shares per server
|
||||||
|
|
||||||
self.make_client()
|
self.make_client()
|
||||||
data = self.get_data(SIZE_LARGE)
|
data = self.get_data(SIZE_LARGE)
|
||||||
# if there are 50 peers, then happy needs to be <= 50
|
# if there are 50 servers, then happy needs to be <= 50
|
||||||
self.set_encoding_parameters(50, 50, 100)
|
self.set_encoding_parameters(50, 50, 100)
|
||||||
d = upload_data(self.u, data)
|
d = upload_data(self.u, data)
|
||||||
d.addCallback(extract_uri)
|
d.addCallback(extract_uri)
|
||||||
d.addCallback(self._check_large, SIZE_LARGE)
|
d.addCallback(self._check_large, SIZE_LARGE)
|
||||||
def _check(res):
|
def _check(res):
|
||||||
for p in self.node.last_peers:
|
for s in self.node.last_servers:
|
||||||
allocated = p.allocated
|
allocated = s.allocated
|
||||||
self.failUnlessEqual(len(allocated), 2)
|
self.failUnlessEqual(len(allocated), 2)
|
||||||
self.failUnlessEqual(p.queries, 2)
|
self.failUnlessEqual(s.queries, 2)
|
||||||
d.addCallback(_check)
|
d.addCallback(_check)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_one_each_plus_one_extra(self):
|
def test_one_each_plus_one_extra(self):
|
||||||
# if we have 51 shares, and there are 50 peers, then one peer gets
|
# if we have 51 shares, and there are 50 servers, then one server
|
||||||
# two shares and the rest get just one
|
# gets two shares and the rest get just one
|
||||||
|
|
||||||
self.make_client()
|
self.make_client()
|
||||||
data = self.get_data(SIZE_LARGE)
|
data = self.get_data(SIZE_LARGE)
|
||||||
|
@ -546,38 +546,38 @@ class PeerSelection(unittest.TestCase):
|
||||||
def _check(res):
|
def _check(res):
|
||||||
got_one = []
|
got_one = []
|
||||||
got_two = []
|
got_two = []
|
||||||
for p in self.node.last_peers:
|
for s in self.node.last_servers:
|
||||||
allocated = p.allocated
|
allocated = s.allocated
|
||||||
self.failUnless(len(allocated) in (1,2), len(allocated))
|
self.failUnless(len(allocated) in (1,2), len(allocated))
|
||||||
if len(allocated) == 1:
|
if len(allocated) == 1:
|
||||||
self.failUnlessEqual(p.queries, 1)
|
self.failUnlessEqual(s.queries, 1)
|
||||||
got_one.append(p)
|
got_one.append(s)
|
||||||
else:
|
else:
|
||||||
self.failUnlessEqual(p.queries, 2)
|
self.failUnlessEqual(s.queries, 2)
|
||||||
got_two.append(p)
|
got_two.append(s)
|
||||||
self.failUnlessEqual(len(got_one), 49)
|
self.failUnlessEqual(len(got_one), 49)
|
||||||
self.failUnlessEqual(len(got_two), 1)
|
self.failUnlessEqual(len(got_two), 1)
|
||||||
d.addCallback(_check)
|
d.addCallback(_check)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_four_each(self):
|
def test_four_each(self):
|
||||||
# if we have 200 shares, and there are 50 peers, then each peer gets
|
# if we have 200 shares, and there are 50 servers, then each server
|
||||||
# 4 shares. The design goal is to accomplish this with only two
|
# gets 4 shares. The design goal is to accomplish this with only two
|
||||||
# queries per peer.
|
# queries per server.
|
||||||
|
|
||||||
self.make_client()
|
self.make_client()
|
||||||
data = self.get_data(SIZE_LARGE)
|
data = self.get_data(SIZE_LARGE)
|
||||||
# if there are 50 peers, then happy should be no more than 50 if
|
# if there are 50 servers, then happy should be no more than 50 if we
|
||||||
# we want this to work.
|
# want this to work.
|
||||||
self.set_encoding_parameters(100, 50, 200)
|
self.set_encoding_parameters(100, 50, 200)
|
||||||
d = upload_data(self.u, data)
|
d = upload_data(self.u, data)
|
||||||
d.addCallback(extract_uri)
|
d.addCallback(extract_uri)
|
||||||
d.addCallback(self._check_large, SIZE_LARGE)
|
d.addCallback(self._check_large, SIZE_LARGE)
|
||||||
def _check(res):
|
def _check(res):
|
||||||
for p in self.node.last_peers:
|
for s in self.node.last_servers:
|
||||||
allocated = p.allocated
|
allocated = s.allocated
|
||||||
self.failUnlessEqual(len(allocated), 4)
|
self.failUnlessEqual(len(allocated), 4)
|
||||||
self.failUnlessEqual(p.queries, 2)
|
self.failUnlessEqual(s.queries, 2)
|
||||||
d.addCallback(_check)
|
d.addCallback(_check)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -593,8 +593,8 @@ class PeerSelection(unittest.TestCase):
|
||||||
d.addCallback(self._check_large, SIZE_LARGE)
|
d.addCallback(self._check_large, SIZE_LARGE)
|
||||||
def _check(res):
|
def _check(res):
|
||||||
counts = {}
|
counts = {}
|
||||||
for p in self.node.last_peers:
|
for s in self.node.last_servers:
|
||||||
allocated = p.allocated
|
allocated = s.allocated
|
||||||
counts[len(allocated)] = counts.get(len(allocated), 0) + 1
|
counts[len(allocated)] = counts.get(len(allocated), 0) + 1
|
||||||
histogram = [counts.get(i, 0) for i in range(5)]
|
histogram = [counts.get(i, 0) for i in range(5)]
|
||||||
self.failUnlessEqual(histogram, [0,0,0,2,1])
|
self.failUnlessEqual(histogram, [0,0,0,2,1])
|
||||||
|
@ -616,10 +616,10 @@ class PeerSelection(unittest.TestCase):
|
||||||
d.addCallback(extract_uri)
|
d.addCallback(extract_uri)
|
||||||
d.addCallback(self._check_large, SIZE_LARGE)
|
d.addCallback(self._check_large, SIZE_LARGE)
|
||||||
def _check(res):
|
def _check(res):
|
||||||
# we should have put one share each on the big peers, and zero
|
# we should have put one share each on the big servers, and zero
|
||||||
# shares on the small peers
|
# shares on the small servers
|
||||||
total_allocated = 0
|
total_allocated = 0
|
||||||
for p in self.node.last_peers:
|
for p in self.node.last_servers:
|
||||||
if p.mode == "good":
|
if p.mode == "good":
|
||||||
self.failUnlessEqual(len(p.allocated), 1)
|
self.failUnlessEqual(len(p.allocated), 1)
|
||||||
elif p.mode == "small":
|
elif p.mode == "small":
|
||||||
|
@ -750,8 +750,9 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
def _do_upload_with_broken_servers(self, servers_to_break):
|
def _do_upload_with_broken_servers(self, servers_to_break):
|
||||||
"""
|
"""
|
||||||
I act like a normal upload, but before I send the results of
|
I act like a normal upload, but before I send the results of
|
||||||
Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
|
Tahoe2ServerSelector to the Encoder, I break the first
|
||||||
PeerTrackers in the upload_servers part of the return result.
|
servers_to_break ServerTrackers in the upload_servers part of the
|
||||||
|
return result.
|
||||||
"""
|
"""
|
||||||
assert self.g, "I tried to find a grid at self.g, but failed"
|
assert self.g, "I tried to find a grid at self.g, but failed"
|
||||||
broker = self.g.clients[0].storage_broker
|
broker = self.g.clients[0].storage_broker
|
||||||
|
@ -764,7 +765,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
encoder = encode.Encoder()
|
encoder = encode.Encoder()
|
||||||
encoder.set_encrypted_uploadable(uploadable)
|
encoder.set_encrypted_uploadable(uploadable)
|
||||||
status = upload.UploadStatus()
|
status = upload.UploadStatus()
|
||||||
selector = upload.Tahoe2PeerSelector("dglev", "test", status)
|
selector = upload.Tahoe2ServerSelector("dglev", "test", status)
|
||||||
storage_index = encoder.get_param("storage_index")
|
storage_index = encoder.get_param("storage_index")
|
||||||
share_size = encoder.get_param("share_size")
|
share_size = encoder.get_param("share_size")
|
||||||
block_size = encoder.get_param("block_size")
|
block_size = encoder.get_param("block_size")
|
||||||
|
@ -772,18 +773,18 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
d = selector.get_shareholders(broker, sh, storage_index,
|
d = selector.get_shareholders(broker, sh, storage_index,
|
||||||
share_size, block_size, num_segments,
|
share_size, block_size, num_segments,
|
||||||
10, 3, 4)
|
10, 3, 4)
|
||||||
def _have_shareholders((upload_servers, already_peers)):
|
def _have_shareholders((upload_servers, already_servers)):
|
||||||
assert servers_to_break <= len(upload_servers)
|
assert servers_to_break <= len(upload_servers)
|
||||||
for index in xrange(servers_to_break):
|
for index in xrange(servers_to_break):
|
||||||
server = list(upload_servers)[index]
|
server = list(upload_servers)[index]
|
||||||
for share in server.buckets.keys():
|
for share in server.buckets.keys():
|
||||||
server.buckets[share].abort()
|
server.buckets[share].abort()
|
||||||
buckets = {}
|
buckets = {}
|
||||||
servermap = already_peers.copy()
|
servermap = already_servers.copy()
|
||||||
for peer in upload_servers:
|
for server in upload_servers:
|
||||||
buckets.update(peer.buckets)
|
buckets.update(server.buckets)
|
||||||
for bucket in peer.buckets:
|
for bucket in server.buckets:
|
||||||
servermap.setdefault(bucket, set()).add(peer.peerid)
|
servermap.setdefault(bucket, set()).add(server.serverid)
|
||||||
encoder.set_shareholders(buckets, servermap)
|
encoder.set_shareholders(buckets, servermap)
|
||||||
d = encoder.start()
|
d = encoder.start()
|
||||||
return d
|
return d
|
||||||
|
@ -1054,7 +1055,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
# one share from our initial upload to each of these.
|
# one share from our initial upload to each of these.
|
||||||
# The counterintuitive ordering of the share numbers is to deal with
|
# The counterintuitive ordering of the share numbers is to deal with
|
||||||
# the permuting of these servers -- distributing the shares this
|
# the permuting of these servers -- distributing the shares this
|
||||||
# way ensures that the Tahoe2PeerSelector sees them in the order
|
# way ensures that the Tahoe2ServerSelector sees them in the order
|
||||||
# described below.
|
# described below.
|
||||||
d = self._setup_and_upload()
|
d = self._setup_and_upload()
|
||||||
d.addCallback(lambda ign:
|
d.addCallback(lambda ign:
|
||||||
|
@ -1069,7 +1070,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
# server 2: share 0
|
# server 2: share 0
|
||||||
# server 3: share 1
|
# server 3: share 1
|
||||||
# We change the 'happy' parameter in the client to 4.
|
# We change the 'happy' parameter in the client to 4.
|
||||||
# The Tahoe2PeerSelector will see the peers permuted as:
|
# The Tahoe2ServerSelector will see the servers permuted as:
|
||||||
# 2, 3, 1, 0
|
# 2, 3, 1, 0
|
||||||
# Ideally, a reupload of our original data should work.
|
# Ideally, a reupload of our original data should work.
|
||||||
def _reset_encoding_parameters(ign, happy=4):
|
def _reset_encoding_parameters(ign, happy=4):
|
||||||
|
@ -1084,17 +1085,17 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
|
|
||||||
|
|
||||||
# This scenario is basically comment:53, but changed so that the
|
# This scenario is basically comment:53, but changed so that the
|
||||||
# Tahoe2PeerSelector sees the server with all of the shares before
|
# Tahoe2ServerSelector sees the server with all of the shares before
|
||||||
# any of the other servers.
|
# any of the other servers.
|
||||||
# The layout is:
|
# The layout is:
|
||||||
# server 2: shares 0 - 9
|
# server 2: shares 0 - 9
|
||||||
# server 3: share 0
|
# server 3: share 0
|
||||||
# server 1: share 1
|
# server 1: share 1
|
||||||
# server 4: share 2
|
# server 4: share 2
|
||||||
# The Tahoe2PeerSelector sees the peers permuted as:
|
# The Tahoe2ServerSelector sees the servers permuted as:
|
||||||
# 2, 3, 1, 4
|
# 2, 3, 1, 4
|
||||||
# Note that server 0 has been replaced by server 4; this makes it
|
# Note that server 0 has been replaced by server 4; this makes it
|
||||||
# easier to ensure that the last server seen by Tahoe2PeerSelector
|
# easier to ensure that the last server seen by Tahoe2ServerSelector
|
||||||
# has only one share.
|
# has only one share.
|
||||||
d.addCallback(_change_basedir)
|
d.addCallback(_change_basedir)
|
||||||
d.addCallback(lambda ign:
|
d.addCallback(lambda ign:
|
||||||
|
@ -1124,7 +1125,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
|
|
||||||
|
|
||||||
# Try the same thing, but with empty servers after the first one
|
# Try the same thing, but with empty servers after the first one
|
||||||
# We want to make sure that Tahoe2PeerSelector will redistribute
|
# We want to make sure that Tahoe2ServerSelector will redistribute
|
||||||
# shares as necessary, not simply discover an existing layout.
|
# shares as necessary, not simply discover an existing layout.
|
||||||
# The layout is:
|
# The layout is:
|
||||||
# server 2: shares 0 - 9
|
# server 2: shares 0 - 9
|
||||||
|
@ -1184,7 +1185,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
return d
|
return d
|
||||||
test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
|
test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
|
||||||
|
|
||||||
def test_happiness_with_some_readonly_peers(self):
|
def test_happiness_with_some_readonly_servers(self):
|
||||||
# Try the following layout
|
# Try the following layout
|
||||||
# server 2: shares 0-9
|
# server 2: shares 0-9
|
||||||
# server 4: share 0, read-only
|
# server 4: share 0, read-only
|
||||||
|
@ -1223,13 +1224,13 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
def test_happiness_with_all_readonly_peers(self):
|
def test_happiness_with_all_readonly_servers(self):
|
||||||
# server 3: share 1, read-only
|
# server 3: share 1, read-only
|
||||||
# server 1: share 2, read-only
|
# server 1: share 2, read-only
|
||||||
# server 2: shares 0-9, read-only
|
# server 2: shares 0-9, read-only
|
||||||
# server 4: share 0, read-only
|
# server 4: share 0, read-only
|
||||||
# The idea with this test is to make sure that the survey of
|
# The idea with this test is to make sure that the survey of
|
||||||
# read-only peers doesn't undercount servers of happiness
|
# read-only servers doesn't undercount servers of happiness
|
||||||
self.basedir = self.mktemp()
|
self.basedir = self.mktemp()
|
||||||
d = self._setup_and_upload()
|
d = self._setup_and_upload()
|
||||||
d.addCallback(lambda ign:
|
d.addCallback(lambda ign:
|
||||||
|
@ -1268,7 +1269,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
# the layout presented to it satisfies "servers_of_happiness"
|
# the layout presented to it satisfies "servers_of_happiness"
|
||||||
# until a failure occurs)
|
# until a failure occurs)
|
||||||
#
|
#
|
||||||
# This test simulates an upload where servers break after peer
|
# This test simulates an upload where servers break after server
|
||||||
# selection, but before they are written to.
|
# selection, but before they are written to.
|
||||||
def _set_basedir(ign=None):
|
def _set_basedir(ign=None):
|
||||||
self.basedir = self.mktemp()
|
self.basedir = self.mktemp()
|
||||||
|
@ -1283,7 +1284,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
self._add_server(server_number=5)
|
self._add_server(server_number=5)
|
||||||
d.addCallback(_do_server_setup)
|
d.addCallback(_do_server_setup)
|
||||||
# remove the original server
|
# remove the original server
|
||||||
# (necessary to ensure that the Tahoe2PeerSelector will distribute
|
# (necessary to ensure that the Tahoe2ServerSelector will distribute
|
||||||
# all the shares)
|
# all the shares)
|
||||||
def _remove_server(ign):
|
def _remove_server(ign):
|
||||||
server = self.g.servers_by_number[0]
|
server = self.g.servers_by_number[0]
|
||||||
|
@ -1343,7 +1344,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
|
|
||||||
def test_merge_peers(self):
|
def test_merge_peers(self):
|
||||||
# merge_peers merges a list of upload_servers and a dict of
|
# merge_peers merges a list of upload_servers and a dict of
|
||||||
# shareid -> peerid mappings.
|
# shareid -> serverid mappings.
|
||||||
shares = {
|
shares = {
|
||||||
1 : set(["server1"]),
|
1 : set(["server1"]),
|
||||||
2 : set(["server2"]),
|
2 : set(["server2"]),
|
||||||
|
@ -1354,12 +1355,12 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
# if not provided with a upload_servers argument, it should just
|
# if not provided with a upload_servers argument, it should just
|
||||||
# return the first argument unchanged.
|
# return the first argument unchanged.
|
||||||
self.failUnlessEqual(shares, merge_peers(shares, set([])))
|
self.failUnlessEqual(shares, merge_peers(shares, set([])))
|
||||||
class FakePeerTracker:
|
class FakeServerTracker:
|
||||||
pass
|
pass
|
||||||
trackers = []
|
trackers = []
|
||||||
for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
|
for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
|
||||||
t = FakePeerTracker()
|
t = FakeServerTracker()
|
||||||
t.peerid = server
|
t.serverid = server
|
||||||
t.buckets = [i]
|
t.buckets = [i]
|
||||||
trackers.append(t)
|
trackers.append(t)
|
||||||
expected = {
|
expected = {
|
||||||
|
@ -1386,8 +1387,8 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
expected = {}
|
expected = {}
|
||||||
for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
|
for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
|
||||||
shares3[i] = set([server])
|
shares3[i] = set([server])
|
||||||
t = FakePeerTracker()
|
t = FakeServerTracker()
|
||||||
t.peerid = server
|
t.serverid = server
|
||||||
t.buckets = [i]
|
t.buckets = [i]
|
||||||
trackers.append(t)
|
trackers.append(t)
|
||||||
expected[i] = set([server])
|
expected[i] = set([server])
|
||||||
|
@ -1403,7 +1404,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
# value for given inputs.
|
# value for given inputs.
|
||||||
|
|
||||||
# servers_of_happiness expects a dict of
|
# servers_of_happiness expects a dict of
|
||||||
# shnum => set(peerids) as a preexisting shares argument.
|
# shnum => set(serverids) as a preexisting shares argument.
|
||||||
test1 = {
|
test1 = {
|
||||||
1 : set(["server1"]),
|
1 : set(["server1"]),
|
||||||
2 : set(["server2"]),
|
2 : set(["server2"]),
|
||||||
|
@ -1417,22 +1418,22 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
# should be 3 instead of 4.
|
# should be 3 instead of 4.
|
||||||
happy = servers_of_happiness(test1)
|
happy = servers_of_happiness(test1)
|
||||||
self.failUnlessEqual(3, happy)
|
self.failUnlessEqual(3, happy)
|
||||||
# The second argument of merge_peers should be a set of
|
# The second argument of merge_peers should be a set of objects with
|
||||||
# objects with peerid and buckets as attributes. In actual use,
|
# serverid and buckets as attributes. In actual use, these will be
|
||||||
# these will be PeerTracker instances, but for testing it is fine
|
# ServerTracker instances, but for testing it is fine to make a
|
||||||
# to make a FakePeerTracker whose job is to hold those instance
|
# FakeServerTracker whose job is to hold those instance variables to
|
||||||
# variables to test that part.
|
# test that part.
|
||||||
class FakePeerTracker:
|
class FakeServerTracker:
|
||||||
pass
|
pass
|
||||||
trackers = []
|
trackers = []
|
||||||
for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
|
for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
|
||||||
t = FakePeerTracker()
|
t = FakeServerTracker()
|
||||||
t.peerid = server
|
t.serverid = server
|
||||||
t.buckets = [i]
|
t.buckets = [i]
|
||||||
trackers.append(t)
|
trackers.append(t)
|
||||||
# Recall that test1 is a server layout with servers_of_happiness
|
# Recall that test1 is a server layout with servers_of_happiness
|
||||||
# = 3. Since there isn't any overlap between the shnum ->
|
# = 3. Since there isn't any overlap between the shnum ->
|
||||||
# set([peerid]) correspondences in test1 and those in trackers,
|
# set([serverid]) correspondences in test1 and those in trackers,
|
||||||
# the result here should be 7.
|
# the result here should be 7.
|
||||||
test2 = merge_peers(test1, set(trackers))
|
test2 = merge_peers(test1, set(trackers))
|
||||||
happy = servers_of_happiness(test2)
|
happy = servers_of_happiness(test2)
|
||||||
|
@ -1440,8 +1441,8 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
# Now add an overlapping server to trackers. This is redundant,
|
# Now add an overlapping server to trackers. This is redundant,
|
||||||
# so it should not cause the previously reported happiness value
|
# so it should not cause the previously reported happiness value
|
||||||
# to change.
|
# to change.
|
||||||
t = FakePeerTracker()
|
t = FakeServerTracker()
|
||||||
t.peerid = "server1"
|
t.serverid = "server1"
|
||||||
t.buckets = [1]
|
t.buckets = [1]
|
||||||
trackers.append(t)
|
trackers.append(t)
|
||||||
test2 = merge_peers(test1, set(trackers))
|
test2 = merge_peers(test1, set(trackers))
|
||||||
|
@ -1459,17 +1460,17 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
4 : set(['server4']),
|
4 : set(['server4']),
|
||||||
}
|
}
|
||||||
trackers = []
|
trackers = []
|
||||||
t = FakePeerTracker()
|
t = FakeServerTracker()
|
||||||
t.peerid = 'server5'
|
t.serverid = 'server5'
|
||||||
t.buckets = [4]
|
t.buckets = [4]
|
||||||
trackers.append(t)
|
trackers.append(t)
|
||||||
t = FakePeerTracker()
|
t = FakeServerTracker()
|
||||||
t.peerid = 'server6'
|
t.serverid = 'server6'
|
||||||
t.buckets = [3, 5]
|
t.buckets = [3, 5]
|
||||||
trackers.append(t)
|
trackers.append(t)
|
||||||
# The value returned by servers_of_happiness is the size
|
# The value returned by servers_of_happiness is the size
|
||||||
# of a maximum matching in the bipartite graph that
|
# of a maximum matching in the bipartite graph that
|
||||||
# servers_of_happiness() makes between peerids and share
|
# servers_of_happiness() makes between serverids and share
|
||||||
# numbers. It should find something like this:
|
# numbers. It should find something like this:
|
||||||
# (server 1, share 1)
|
# (server 1, share 1)
|
||||||
# (server 2, share 2)
|
# (server 2, share 2)
|
||||||
|
@ -1527,7 +1528,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
sbs = shares_by_server(test1)
|
sbs = shares_by_server(test1)
|
||||||
self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
|
self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
|
||||||
self.failUnlessEqual(set([4, 5]), sbs["server2"])
|
self.failUnlessEqual(set([4, 5]), sbs["server2"])
|
||||||
# This should fail unless the peerid part of the mapping is a set
|
# This should fail unless the serverid part of the mapping is a set
|
||||||
test2 = {1: "server1"}
|
test2 = {1: "server1"}
|
||||||
self.shouldFail(AssertionError,
|
self.shouldFail(AssertionError,
|
||||||
"test_shares_by_server",
|
"test_shares_by_server",
|
||||||
|
@ -1543,7 +1544,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
# server 2: empty
|
# server 2: empty
|
||||||
# server 3: empty
|
# server 3: empty
|
||||||
# server 4: empty
|
# server 4: empty
|
||||||
# The purpose of this test is to make sure that the peer selector
|
# The purpose of this test is to make sure that the server selector
|
||||||
# knows about the shares on server 1, even though it is read-only.
|
# knows about the shares on server 1, even though it is read-only.
|
||||||
# It used to simply filter these out, which would cause the test
|
# It used to simply filter these out, which would cause the test
|
||||||
# to fail when servers_of_happiness = 4.
|
# to fail when servers_of_happiness = 4.
|
||||||
|
@ -1574,7 +1575,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
|
|
||||||
|
|
||||||
def test_query_counting(self):
|
def test_query_counting(self):
|
||||||
# If peer selection fails, Tahoe2PeerSelector prints out a lot
|
# If server selection fails, Tahoe2ServerSelector prints out a lot
|
||||||
# of helpful diagnostic information, including query stats.
|
# of helpful diagnostic information, including query stats.
|
||||||
# This test helps make sure that that information is accurate.
|
# This test helps make sure that that information is accurate.
|
||||||
self.basedir = self.mktemp()
|
self.basedir = self.mktemp()
|
||||||
|
@ -1597,7 +1598,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
c.upload, upload.Data("data" * 10000,
|
c.upload, upload.Data("data" * 10000,
|
||||||
convergence="")))
|
convergence="")))
|
||||||
# Now try with some readonly servers. We want to make sure that
|
# Now try with some readonly servers. We want to make sure that
|
||||||
# the readonly peer share discovery phase is counted correctly.
|
# the readonly server share discovery phase is counted correctly.
|
||||||
def _reset(ign):
|
def _reset(ign):
|
||||||
self.basedir = self.mktemp()
|
self.basedir = self.mktemp()
|
||||||
self.g = None
|
self.g = None
|
||||||
|
@ -1668,13 +1669,13 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
d.addCallback(lambda client:
|
d.addCallback(lambda client:
|
||||||
self.shouldFail(UploadUnhappinessError,
|
self.shouldFail(UploadUnhappinessError,
|
||||||
"test_upper_limit_on_readonly_queries",
|
"test_upper_limit_on_readonly_queries",
|
||||||
"sent 8 queries to 8 peers",
|
"sent 8 queries to 8 servers",
|
||||||
client.upload,
|
client.upload,
|
||||||
upload.Data('data' * 10000, convergence="")))
|
upload.Data('data' * 10000, convergence="")))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
def test_exception_messages_during_peer_selection(self):
|
def test_exception_messages_during_server_selection(self):
|
||||||
# server 1: read-only, no shares
|
# server 1: read-only, no shares
|
||||||
# server 2: read-only, no shares
|
# server 2: read-only, no shares
|
||||||
# server 3: read-only, no shares
|
# server 3: read-only, no shares
|
||||||
|
@ -1707,7 +1708,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
"total (10 homeless), want to place shares on at "
|
"total (10 homeless), want to place shares on at "
|
||||||
"least 4 servers such that any 3 of them have "
|
"least 4 servers such that any 3 of them have "
|
||||||
"enough shares to recover the file, "
|
"enough shares to recover the file, "
|
||||||
"sent 5 queries to 5 peers, 0 queries placed "
|
"sent 5 queries to 5 servers, 0 queries placed "
|
||||||
"some shares, 5 placed none "
|
"some shares, 5 placed none "
|
||||||
"(of which 5 placed none due to the server being "
|
"(of which 5 placed none due to the server being "
|
||||||
"full and 0 placed none due to an error)",
|
"full and 0 placed none due to an error)",
|
||||||
|
@ -1748,7 +1749,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
"total (10 homeless), want to place shares on at "
|
"total (10 homeless), want to place shares on at "
|
||||||
"least 4 servers such that any 3 of them have "
|
"least 4 servers such that any 3 of them have "
|
||||||
"enough shares to recover the file, "
|
"enough shares to recover the file, "
|
||||||
"sent 5 queries to 5 peers, 0 queries placed "
|
"sent 5 queries to 5 servers, 0 queries placed "
|
||||||
"some shares, 5 placed none "
|
"some shares, 5 placed none "
|
||||||
"(of which 4 placed none due to the server being "
|
"(of which 4 placed none due to the server being "
|
||||||
"full and 1 placed none due to an error)",
|
"full and 1 placed none due to an error)",
|
||||||
|
@ -2009,9 +2010,9 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
def test_peer_selector_bucket_abort(self):
|
def test_server_selector_bucket_abort(self):
|
||||||
# If peer selection for an upload fails due to an unhappy
|
# If server selection for an upload fails due to an unhappy
|
||||||
# layout, the peer selection process should abort the buckets it
|
# layout, the server selection process should abort the buckets it
|
||||||
# allocates before failing, so that the space can be re-used.
|
# allocates before failing, so that the space can be re-used.
|
||||||
self.basedir = self.mktemp()
|
self.basedir = self.mktemp()
|
||||||
self.set_up_grid(num_servers=5)
|
self.set_up_grid(num_servers=5)
|
||||||
|
@ -2024,7 +2025,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
d = defer.succeed(None)
|
d = defer.succeed(None)
|
||||||
d.addCallback(lambda ignored:
|
d.addCallback(lambda ignored:
|
||||||
self.shouldFail(UploadUnhappinessError,
|
self.shouldFail(UploadUnhappinessError,
|
||||||
"test_peer_selection_bucket_abort",
|
"test_server_selection_bucket_abort",
|
||||||
"",
|
"",
|
||||||
client.upload, upload.Data("data" * 10000,
|
client.upload, upload.Data("data" * 10000,
|
||||||
convergence="")))
|
convergence="")))
|
||||||
|
@ -2079,7 +2080,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
# upload with exactly 75 peers (shares_of_happiness)
|
# upload with exactly 75 servers (shares_of_happiness)
|
||||||
# have a download fail
|
# have a download fail
|
||||||
# cancel a download (need to implement more cancel stuff)
|
# cancel a download (need to implement more cancel stuff)
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ def merge_peers(servermap, upload_servers=None):
|
||||||
|
|
||||||
for peer in upload_servers:
|
for peer in upload_servers:
|
||||||
for shnum in peer.buckets:
|
for shnum in peer.buckets:
|
||||||
servermap.setdefault(shnum, set()).add(peer.peerid)
|
servermap.setdefault(shnum, set()).add(peer.serverid)
|
||||||
return servermap
|
return servermap
|
||||||
|
|
||||||
def servers_of_happiness(sharemap):
|
def servers_of_happiness(sharemap):
|
||||||
|
|
Loading…
Reference in New Issue