Refactor StorageFarmBroker handling of servers

Pass around IServer instance instead of (peerid, rref) tuple. Replace
"descriptor" with "server". Other replacements:

 get_all_servers -> get_connected_servers/get_known_servers
 get_servers_for_index -> get_servers_for_psi (now returns IServers)

This change still needs to be pushed further down: lots of code is now
getting the IServer and then distributing (peerid, rref) internally.
Instead, it ought to distribute the IServer internally and delay
extracting a serverid or rref until the last moment.

no_network.py was updated to retain parallelism.
This commit is contained in:
Brian Warner 2011-02-20 17:58:04 -08:00
parent 36d1cce5f0
commit ffd296fc5a
24 changed files with 173 additions and 137 deletions

View File

@ -468,7 +468,7 @@ class Client(node.Node, pollmixin.PollMixin):
temporary test network and need to know when it is safe to proceed temporary test network and need to know when it is safe to proceed
with an upload or download.""" with an upload or download."""
def _check(): def _check():
return len(self.storage_broker.get_all_servers()) >= num_clients return len(self.storage_broker.get_connected_servers()) >= num_clients
d = self.poll(_check, 0.5) d = self.poll(_check, 0.5)
d.addCallback(lambda res: None) d.addCallback(lambda res: None)
return d return d

View File

@ -91,7 +91,7 @@ class ControlServer(Referenceable, service.Service):
# 300ms. # 300ms.
results = {} results = {}
sb = self.parent.get_storage_broker() sb = self.parent.get_storage_broker()
everyone = sb.get_all_servers() everyone = sb.get_connected_servers()
num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3))) num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
everyone = list(everyone) * num_pings everyone = list(everyone) * num_pings
d = self._do_one_ping(None, everyone, results) d = self._do_one_ping(None, everyone, results)
@ -99,7 +99,9 @@ class ControlServer(Referenceable, service.Service):
def _do_one_ping(self, res, everyone_left, results): def _do_one_ping(self, res, everyone_left, results):
if not everyone_left: if not everyone_left:
return results return results
peerid, connection = everyone_left.pop(0) server = everyone_left.pop(0)
peerid = server.get_serverid()
connection = server.get_rref()
start = time.time() start = time.time()
d = connection.callRemote("get_buckets", "\x00"*16) d = connection.callRemote("get_buckets", "\x00"*16)
def _done(ignored): def _done(ignored):

View File

@ -463,9 +463,6 @@ class Checker(log.PrefixingLogMixin):
def __init__(self, verifycap, servers, verify, add_lease, secret_holder, def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
monitor): monitor):
assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap)) assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
assert precondition(isinstance(servers, (set, frozenset)), servers)
for (serverid, serverrref) in servers:
assert precondition(isinstance(serverid, str))
prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60) prefix = "%s" % base32.b2a_l(verifycap.get_storage_index()[:8], 60)
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix) log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
@ -489,7 +486,7 @@ class Checker(log.PrefixingLogMixin):
def _get_cancel_secret(self, peerid): def _get_cancel_secret(self, peerid):
return bucket_cancel_secret_hash(self.file_cancel_secret, peerid) return bucket_cancel_secret_hash(self.file_cancel_secret, peerid)
def _get_buckets(self, server, storageindex, serverid): def _get_buckets(self, s, storageindex):
"""Return a deferred that eventually fires with ({sharenum: bucket}, """Return a deferred that eventually fires with ({sharenum: bucket},
serverid, success). In case the server is disconnected or returns a serverid, success). In case the server is disconnected or returns a
Failure then it fires with ({}, serverid, False) (A server Failure then it fires with ({}, serverid, False) (A server
@ -498,14 +495,16 @@ class Checker(log.PrefixingLogMixin):
that we want to track and report whether or not each server that we want to track and report whether or not each server
responded.)""" responded.)"""
rref = s.get_rref()
serverid = s.get_serverid()
if self._add_lease: if self._add_lease:
renew_secret = self._get_renewal_secret(serverid) renew_secret = self._get_renewal_secret(serverid)
cancel_secret = self._get_cancel_secret(serverid) cancel_secret = self._get_cancel_secret(serverid)
d2 = server.callRemote("add_lease", storageindex, d2 = rref.callRemote("add_lease", storageindex,
renew_secret, cancel_secret) renew_secret, cancel_secret)
d2.addErrback(self._add_lease_failed, serverid, storageindex) d2.addErrback(self._add_lease_failed, serverid, storageindex)
d = server.callRemote("get_buckets", storageindex) d = rref.callRemote("get_buckets", storageindex)
def _wrap_results(res): def _wrap_results(res):
return (res, serverid, True) return (res, serverid, True)
@ -656,7 +655,7 @@ class Checker(log.PrefixingLogMixin):
return d return d
def _verify_server_shares(self, serverid, ss): def _verify_server_shares(self, s):
""" Return a deferred which eventually fires with a tuple of """ Return a deferred which eventually fires with a tuple of
(set(sharenum), serverid, set(corruptsharenum), (set(sharenum), serverid, set(corruptsharenum),
set(incompatiblesharenum), success) showing all the shares verified set(incompatiblesharenum), success) showing all the shares verified
@ -679,7 +678,7 @@ class Checker(log.PrefixingLogMixin):
then disconnected and ceased responding, or returned a failure, it is then disconnected and ceased responding, or returned a failure, it is
still marked with the True flag for 'success'. still marked with the True flag for 'success'.
""" """
d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid) d = self._get_buckets(s, self._verifycap.get_storage_index())
def _got_buckets(result): def _got_buckets(result):
bucketdict, serverid, success = result bucketdict, serverid, success = result
@ -710,12 +709,12 @@ class Checker(log.PrefixingLogMixin):
def _err(f): def _err(f):
f.trap(RemoteException, DeadReferenceError) f.trap(RemoteException, DeadReferenceError)
return (set(), serverid, set(), set(), False) return (set(), s.get_serverid(), set(), set(), False)
d.addCallbacks(_got_buckets, _err) d.addCallbacks(_got_buckets, _err)
return d return d
def _check_server_shares(self, serverid, ss): def _check_server_shares(self, s):
"""Return a deferred which eventually fires with a tuple of """Return a deferred which eventually fires with a tuple of
(set(sharenum), serverid, set(), set(), responded) showing all the (set(sharenum), serverid, set(), set(), responded) showing all the
shares claimed to be served by this server. In case the server is shares claimed to be served by this server. In case the server is
@ -726,7 +725,7 @@ class Checker(log.PrefixingLogMixin):
def _curry_empty_corrupted(res): def _curry_empty_corrupted(res):
buckets, serverid, responded = res buckets, serverid, responded = res
return (set(buckets), serverid, set(), set(), responded) return (set(buckets), serverid, set(), set(), responded)
d = self._get_buckets(ss, self._verifycap.get_storage_index(), serverid) d = self._get_buckets(s, self._verifycap.get_storage_index())
d.addCallback(_curry_empty_corrupted) d.addCallback(_curry_empty_corrupted)
return d return d
@ -794,10 +793,10 @@ class Checker(log.PrefixingLogMixin):
def start(self): def start(self):
ds = [] ds = []
if self._verify: if self._verify:
for (serverid, ss) in self._servers: for s in self._servers:
ds.append(self._verify_server_shares(serverid, ss)) ds.append(self._verify_server_shares(s))
else: else:
for (serverid, ss) in self._servers: for s in self._servers:
ds.append(self._check_server_shares(serverid, ss)) ds.append(self._check_server_shares(s))
return deferredutil.gatherResults(ds).addCallback(self._format_results) return deferredutil.gatherResults(ds).addCallback(self._format_results)

View File

@ -62,8 +62,9 @@ class ShareFinder:
# test_dirnode, which creates us with storage_broker=None # test_dirnode, which creates us with storage_broker=None
if not self._started: if not self._started:
si = self.verifycap.storage_index si = self.verifycap.storage_index
s = self._storage_broker.get_servers_for_index(si) servers = [(s.get_serverid(), s.get_rref())
self._servers = iter(s) for s in self._storage_broker.get_servers_for_psi(si)]
self._servers = iter(servers)
self._started = True self._started = True
def log(self, *args, **kwargs): def log(self, *args, **kwargs):

View File

@ -102,7 +102,7 @@ class CiphertextFileNode:
verifycap = self._verifycap verifycap = self._verifycap
storage_index = verifycap.storage_index storage_index = verifycap.storage_index
sb = self._storage_broker sb = self._storage_broker
servers = sb.get_all_servers() servers = sb.get_connected_servers()
sh = self._secret_holder sh = self._secret_holder
c = Checker(verifycap=verifycap, servers=servers, c = Checker(verifycap=verifycap, servers=servers,
@ -160,7 +160,7 @@ class CiphertextFileNode:
def check(self, monitor, verify=False, add_lease=False): def check(self, monitor, verify=False, add_lease=False):
verifycap = self._verifycap verifycap = self._verifycap
sb = self._storage_broker sb = self._storage_broker
servers = sb.get_all_servers() servers = sb.get_connected_servers()
sh = self._secret_holder sh = self._secret_holder
v = Checker(verifycap=verifycap, servers=servers, v = Checker(verifycap=verifycap, servers=servers,

View File

@ -53,10 +53,10 @@ class CHKCheckerAndUEBFetcher:
def _get_all_shareholders(self, storage_index): def _get_all_shareholders(self, storage_index):
dl = [] dl = []
for (peerid, ss) in self._peer_getter(storage_index): for s in self._peer_getter(storage_index):
d = ss.callRemote("get_buckets", storage_index) d = s.get_rref().callRemote("get_buckets", storage_index)
d.addCallbacks(self._got_response, self._got_error, d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,)) callbackArgs=(s.get_serverid(),))
dl.append(d) dl.append(d)
return defer.DeferredList(dl) return defer.DeferredList(dl)
@ -620,7 +620,7 @@ class Helper(Referenceable):
lp2 = self.log("doing a quick check+UEBfetch", lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY) parent=lp, level=log.NOISY)
sb = self._storage_broker sb = self._storage_broker
c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2) c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
d = c.check() d = c.check()
def _checked(res): def _checked(res):
if res: if res:

View File

@ -224,7 +224,8 @@ class Tahoe2PeerSelector(log.PrefixingLogMixin):
num_share_hashes, EXTENSION_SIZE, num_share_hashes, EXTENSION_SIZE,
None) None)
allocated_size = wbp.get_allocated_size() allocated_size = wbp.get_allocated_size()
all_peers = storage_broker.get_servers_for_index(storage_index) all_peers = [(s.get_serverid(), s.get_rref())
for s in storage_broker.get_servers_for_psi(storage_index)]
if not all_peers: if not all_peers:
raise NoServersError("client gave us zero peers") raise NoServersError("client gave us zero peers")

View File

@ -352,13 +352,17 @@ class IStorageBucketReader(Interface):
""" """
class IStorageBroker(Interface): class IStorageBroker(Interface):
def get_servers_for_index(peer_selection_index): def get_servers_for_psi(peer_selection_index):
""" """
@return: list of (peerid, versioned-rref) tuples @return: list of IServer instances
""" """
def get_all_servers(): def get_connected_servers():
""" """
@return: frozenset of (peerid, versioned-rref) tuples @return: frozenset of connected IServer instances
"""
def get_known_servers():
"""
@return: frozenset of IServer instances
""" """
def get_all_serverids(): def get_all_serverids():
""" """

View File

@ -179,7 +179,8 @@ class Publish:
self._encprivkey = self._node.get_encprivkey() self._encprivkey = self._node.get_encprivkey()
sb = self._storage_broker sb = self._storage_broker
full_peerlist = sb.get_servers_for_index(self._storage_index) full_peerlist = [(s.get_serverid(), s.get_rref())
for s in sb.get_servers_for_psi(self._storage_index)]
self.full_peerlist = full_peerlist # for use later, immutable self.full_peerlist = full_peerlist # for use later, immutable
self.bad_peers = set() # peerids who have errbacked/refused requests self.bad_peers = set() # peerids who have errbacked/refused requests

View File

@ -424,7 +424,8 @@ class ServermapUpdater:
self._queries_completed = 0 self._queries_completed = 0
sb = self._storage_broker sb = self._storage_broker
full_peerlist = sb.get_servers_for_index(self._storage_index) full_peerlist = [(s.get_serverid(), s.get_rref())
for s in sb.get_servers_for_psi(self._storage_index)]
self.full_peerlist = full_peerlist # for use later, immutable self.full_peerlist = full_peerlist # for use later, immutable
self.extra_peers = full_peerlist[:] # peers are removed as we use them self.extra_peers = full_peerlist[:] # peers are removed as we use them
self._good_peers = set() # peers who had some shares self._good_peers = set() # peers who had some shares

View File

@ -34,7 +34,7 @@ from zope.interface import implements, Interface
from foolscap.api import eventually from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker from allmydata.interfaces import IStorageBroker
from allmydata.util import idlib, log from allmydata.util import idlib, log
from allmydata.util.assertutil import _assert, precondition from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import sha1 from allmydata.util.hashutil import sha1
@ -66,11 +66,11 @@ class StorageFarmBroker:
self.tub = tub self.tub = tub
assert permute_peers # False not implemented yet assert permute_peers # False not implemented yet
self.permute_peers = permute_peers self.permute_peers = permute_peers
# self.descriptors maps serverid -> IServerDescriptor, and keeps # self.servers maps serverid -> IServer, and keeps track of all the
# track of all the storage servers that we've heard about. Each # storage servers that we've heard about. Each descriptor manages its
# descriptor manages its own Reconnector, and will give us a # own Reconnector, and will give us a RemoteReference when we ask
# RemoteReference when we ask them for it. # them for it.
self.descriptors = {} self.servers = {}
# self.test_servers are statically configured from unit tests # self.test_servers are statically configured from unit tests
self.test_servers = {} # serverid -> rref self.test_servers = {} # serverid -> rref
self.introducer_client = None self.introducer_client = None
@ -79,7 +79,7 @@ class StorageFarmBroker:
def test_add_server(self, serverid, rref): def test_add_server(self, serverid, rref):
self.test_servers[serverid] = rref self.test_servers[serverid] = rref
def test_add_descriptor(self, serverid, dsc): def test_add_descriptor(self, serverid, dsc):
self.descriptors[serverid] = dsc self.servers[serverid] = dsc
def use_introducer(self, introducer_client): def use_introducer(self, introducer_client):
self.introducer_client = ic = introducer_client self.introducer_client = ic = introducer_client
@ -89,16 +89,16 @@ class StorageFarmBroker:
precondition(isinstance(serverid, str), serverid) precondition(isinstance(serverid, str), serverid)
precondition(len(serverid) == 20, serverid) precondition(len(serverid) == 20, serverid)
assert ann_d["service-name"] == "storage" assert ann_d["service-name"] == "storage"
old = self.descriptors.get(serverid) old = self.servers.get(serverid)
if old: if old:
if old.get_announcement() == ann_d: if old.get_announcement() == ann_d:
return # duplicate return # duplicate
# replacement # replacement
del self.descriptors[serverid] del self.servers[serverid]
old.stop_connecting() old.stop_connecting()
# now we forget about them and start using the new one # now we forget about them and start using the new one
dsc = NativeStorageClientDescriptor(serverid, ann_d) dsc = NativeStorageServer(serverid, ann_d)
self.descriptors[serverid] = dsc self.servers[serverid] = dsc
dsc.start_connecting(self.tub, self._trigger_connections) dsc.start_connecting(self.tub, self._trigger_connections)
# the descriptor will manage their own Reconnector, and each time we # the descriptor will manage their own Reconnector, and each time we
# need servers, we'll ask them if they're connected or not. # need servers, we'll ask them if they're connected or not.
@ -111,48 +111,44 @@ class StorageFarmBroker:
# connections to only a subset of the servers, which would increase # connections to only a subset of the servers, which would increase
# the chances that we'll put shares in weird places (and not update # the chances that we'll put shares in weird places (and not update
# existing shares of mutable files). See #374 for more details. # existing shares of mutable files). See #374 for more details.
for dsc in self.descriptors.values(): for dsc in self.servers.values():
dsc.try_to_connect() dsc.try_to_connect()
def get_servers_for_psi(self, peer_selection_index):
# return a list of server objects (IServers)
def get_servers_for_index(self, peer_selection_index):
# first cut: return a list of (peerid, versioned-rref) tuples
assert self.permute_peers == True assert self.permute_peers == True
servers = self.get_all_servers() def _permuted(server):
key = peer_selection_index seed = server.get_permutation_seed()
return sorted(servers, key=lambda x: sha1(key+x[0]).digest()) return sha1(peer_selection_index + seed).digest()
return sorted(self.get_connected_servers(), key=_permuted)
def get_all_servers(self):
# return a frozenset of (peerid, versioned-rref) tuples
servers = {}
for serverid,rref in self.test_servers.items():
servers[serverid] = rref
for serverid,dsc in self.descriptors.items():
rref = dsc.get_rref()
if rref:
servers[serverid] = rref
result = frozenset(servers.items())
_assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
return result
def get_all_serverids(self): def get_all_serverids(self):
serverids = set() serverids = set()
serverids.update(self.test_servers.keys()) serverids.update(self.test_servers.keys())
serverids.update(self.descriptors.keys()) serverids.update(self.servers.keys())
return frozenset(serverids) return frozenset(serverids)
def get_all_descriptors(self): def get_connected_servers(self):
return sorted(self.descriptors.values(), return frozenset([s for s in self.get_known_servers()
key=lambda dsc: dsc.get_serverid()) if s.get_rref()])
def get_known_servers(self):
servers = []
for serverid,rref in self.test_servers.items():
s = NativeStorageServer(serverid, {})
s.rref = rref
servers.append(s)
servers.extend(self.servers.values())
return sorted(servers, key=lambda s: s.get_serverid())
def get_nickname_for_serverid(self, serverid): def get_nickname_for_serverid(self, serverid):
if serverid in self.descriptors: if serverid in self.servers:
return self.descriptors[serverid].get_nickname() return self.servers[serverid].get_nickname()
return None return None
class IServerDescriptor(Interface): class IServer(Interface):
"""I live in the client, and represent a single server."""
def start_connecting(tub, trigger_cb): def start_connecting(tub, trigger_cb):
pass pass
def get_nickname(): def get_nickname():
@ -160,7 +156,7 @@ class IServerDescriptor(Interface):
def get_rref(): def get_rref():
pass pass
class NativeStorageClientDescriptor: class NativeStorageServer:
"""I hold information about a storage server that we want to connect to. """I hold information about a storage server that we want to connect to.
If we are connected, I hold the RemoteReference, their host address, and If we are connected, I hold the RemoteReference, their host address, and
the their version information. I remember information about when we were the their version information. I remember information about when we were
@ -176,7 +172,7 @@ class NativeStorageClientDescriptor:
@ivar rref: the RemoteReference, if connected, otherwise None @ivar rref: the RemoteReference, if connected, otherwise None
@ivar remote_host: the IAddress, if connected, otherwise None @ivar remote_host: the IAddress, if connected, otherwise None
""" """
implements(IServerDescriptor) implements(IServer)
VERSION_DEFAULTS = { VERSION_DEFAULTS = {
"http://allmydata.org/tahoe/protocols/storage/v1" : "http://allmydata.org/tahoe/protocols/storage/v1" :
@ -203,6 +199,8 @@ class NativeStorageClientDescriptor:
def get_serverid(self): def get_serverid(self):
return self.serverid return self.serverid
def get_permutation_seed(self):
return self.serverid
def get_nickname(self): def get_nickname(self):
return self.announcement["nickname"].decode("utf-8") return self.announcement["nickname"].decode("utf-8")

View File

@ -572,7 +572,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
if not c.connected_to_introducer(): if not c.connected_to_introducer():
return False return False
sb = c.get_storage_broker() sb = c.get_storage_broker()
if len(sb.get_all_servers()) != self.numclients: if len(sb.get_connected_servers()) != self.numclients:
return False return False
return True return True

View File

@ -117,13 +117,26 @@ def wrap_storage_server(original):
wrapper.version = original.remote_get_version() wrapper.version = original.remote_get_version()
return wrapper return wrapper
class NoNetworkServer:
def __init__(self, serverid, rref):
self.serverid = serverid
self.rref = rref
def get_serverid(self):
return self.serverid
def get_permutation_seed(self):
return self.serverid
def get_rref(self):
return self.rref
class NoNetworkStorageBroker: class NoNetworkStorageBroker:
implements(IStorageBroker) implements(IStorageBroker)
def get_servers_for_index(self, key): def get_servers_for_psi(self, peer_selection_index):
return sorted(self.client._servers, def _permuted(server):
key=lambda x: sha1(key+x[0]).digest()) seed = server.get_permutation_seed()
def get_all_servers(self): return sha1(peer_selection_index + seed).digest()
return frozenset(self.client._servers) return sorted(self.get_connected_servers(), key=_permuted)
def get_connected_servers(self):
return self.client._servers
def get_nickname_for_serverid(self, serverid): def get_nickname_for_serverid(self, serverid):
return None return None
@ -181,8 +194,10 @@ class NoNetworkGrid(service.MultiService):
self.basedir = basedir self.basedir = basedir
fileutil.make_dirs(basedir) fileutil.make_dirs(basedir)
self.servers_by_number = {} self.servers_by_number = {} # maps to StorageServer instance
self.servers_by_id = {} self.wrappers_by_id = {} # maps to wrapped StorageServer instance
self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped
# StorageServer
self.clients = [] self.clients = []
for i in range(num_servers): for i in range(num_servers):
@ -234,11 +249,16 @@ class NoNetworkGrid(service.MultiService):
ss.setServiceParent(middleman) ss.setServiceParent(middleman)
serverid = ss.my_nodeid serverid = ss.my_nodeid
self.servers_by_number[i] = ss self.servers_by_number[i] = ss
self.servers_by_id[serverid] = wrap_storage_server(ss) wrapper = wrap_storage_server(ss)
self.wrappers_by_id[serverid] = wrapper
self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
self.rebuild_serverlist() self.rebuild_serverlist()
def get_all_serverids(self):
return self.proxies_by_id.keys()
def rebuild_serverlist(self): def rebuild_serverlist(self):
self.all_servers = frozenset(self.servers_by_id.items()) self.all_servers = frozenset(self.proxies_by_id.values())
for c in self.clients: for c in self.clients:
c._servers = self.all_servers c._servers = self.all_servers
@ -249,23 +269,24 @@ class NoNetworkGrid(service.MultiService):
if ss.my_nodeid == serverid: if ss.my_nodeid == serverid:
del self.servers_by_number[i] del self.servers_by_number[i]
break break
del self.servers_by_id[serverid] del self.wrappers_by_id[serverid]
del self.proxies_by_id[serverid]
self.rebuild_serverlist() self.rebuild_serverlist()
def break_server(self, serverid): def break_server(self, serverid):
# mark the given server as broken, so it will throw exceptions when # mark the given server as broken, so it will throw exceptions when
# asked to hold a share or serve a share # asked to hold a share or serve a share
self.servers_by_id[serverid].broken = True self.wrappers_by_id[serverid].broken = True
def hang_server(self, serverid): def hang_server(self, serverid):
# hang the given server # hang the given server
ss = self.servers_by_id[serverid] ss = self.wrappers_by_id[serverid]
assert ss.hung_until is None assert ss.hung_until is None
ss.hung_until = defer.Deferred() ss.hung_until = defer.Deferred()
def unhang_server(self, serverid): def unhang_server(self, serverid):
# unhang the given server # unhang the given server
ss = self.servers_by_id[serverid] ss = self.wrappers_by_id[serverid]
assert ss.hung_until is not None assert ss.hung_until is not None
ss.hung_until.callback(None) ss.hung_until.callback(None)
ss.hung_until = None ss.hung_until = None

View File

@ -3,7 +3,7 @@ import simplejson
from twisted.trial import unittest from twisted.trial import unittest
from allmydata import check_results, uri from allmydata import check_results, uri
from allmydata.web import check_results as web_check_results from allmydata.web import check_results as web_check_results
from allmydata.storage_client import StorageFarmBroker, NativeStorageClientDescriptor from allmydata.storage_client import StorageFarmBroker, NativeStorageServer
from allmydata.monitor import Monitor from allmydata.monitor import Monitor
from allmydata.test.no_network import GridTestMixin from allmydata.test.no_network import GridTestMixin
from allmydata.immutable.upload import Data from allmydata.immutable.upload import Data
@ -28,7 +28,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
"my-version": "ver", "my-version": "ver",
"oldest-supported": "oldest", "oldest-supported": "oldest",
} }
dsc = NativeStorageClientDescriptor(peerid, ann_d) dsc = NativeStorageServer(peerid, ann_d)
sb.test_add_descriptor(peerid, dsc) sb.test_add_descriptor(peerid, dsc)
c = FakeClient() c = FakeClient()
c.storage_broker = sb c.storage_broker = sb

View File

@ -134,13 +134,12 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0) self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
def _permute(self, sb, key): def _permute(self, sb, key):
return [ peerid return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ]
for (peerid,rref) in sb.get_servers_for_index(key) ]
def test_permute(self): def test_permute(self):
sb = StorageFarmBroker(None, True) sb = StorageFarmBroker(None, True)
for k in ["%d" % i for i in range(5)]: for k in ["%d" % i for i in range(5)]:
sb.test_add_server(k, None) sb.test_add_server(k, "rref")
self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2']) self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3']) self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])

View File

@ -287,14 +287,14 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
self.failUnlessEqual(d["list-corrupt-shares"], [], where) self.failUnlessEqual(d["list-corrupt-shares"], [], where)
if not incomplete: if not incomplete:
self.failUnlessEqual(sorted(d["servers-responding"]), self.failUnlessEqual(sorted(d["servers-responding"]),
sorted(self.g.servers_by_id.keys()), sorted(self.g.get_all_serverids()),
where) where)
self.failUnless("sharemap" in d, str((where, d))) self.failUnless("sharemap" in d, str((where, d)))
all_serverids = set() all_serverids = set()
for (shareid, serverids) in d["sharemap"].items(): for (shareid, serverids) in d["sharemap"].items():
all_serverids.update(serverids) all_serverids.update(serverids)
self.failUnlessEqual(sorted(all_serverids), self.failUnlessEqual(sorted(all_serverids),
sorted(self.g.servers_by_id.keys()), sorted(self.g.get_all_serverids()),
where) where)
self.failUnlessEqual(d["count-wrong-shares"], 0, where) self.failUnlessEqual(d["count-wrong-shares"], 0, where)
@ -545,7 +545,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
if not incomplete: if not incomplete:
self.failUnlessEqual(sorted(r["servers-responding"]), self.failUnlessEqual(sorted(r["servers-responding"]),
sorted([idlib.nodeid_b2a(sid) sorted([idlib.nodeid_b2a(sid)
for sid in self.g.servers_by_id]), for sid in self.g.get_all_serverids()]),
where) where)
self.failUnless("sharemap" in r, where) self.failUnless("sharemap" in r, where)
all_serverids = set() all_serverids = set()
@ -553,7 +553,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
all_serverids.update(serverids_s) all_serverids.update(serverids_s)
self.failUnlessEqual(sorted(all_serverids), self.failUnlessEqual(sorted(all_serverids),
sorted([idlib.nodeid_b2a(sid) sorted([idlib.nodeid_b2a(sid)
for sid in self.g.servers_by_id]), for sid in self.g.get_all_serverids()]),
where) where)
self.failUnlessEqual(r["count-wrong-shares"], 0, where) self.failUnlessEqual(r["count-wrong-shares"], 0, where)
self.failUnlessEqual(r["count-recoverable-versions"], 1, where) self.failUnlessEqual(r["count-recoverable-versions"], 1, where)

View File

@ -596,7 +596,8 @@ class DownloadTest(_Base, unittest.TestCase):
# tweak the client's copies of server-version data, so it believes # tweak the client's copies of server-version data, so it believes
# that they're old and can't handle reads that overrun the length of # that they're old and can't handle reads that overrun the length of
# the share. This exercises a different code path. # the share. This exercises a different code path.
for (peerid, rref) in self.c0.storage_broker.get_all_servers(): for s in self.c0.storage_broker.get_connected_servers():
rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False v1["tolerates-immutable-read-overrun"] = False
@ -1167,7 +1168,8 @@ class DownloadV2(_Base, unittest.TestCase):
# tweak the client's copies of server-version data, so it believes # tweak the client's copies of server-version data, so it believes
# that they're old and can't handle reads that overrun the length of # that they're old and can't handle reads that overrun the length of
# the share. This exercises a different code path. # the share. This exercises a different code path.
for (peerid, rref) in self.c0.storage_broker.get_all_servers(): for s in self.c0.storage_broker.get_connected_servers():
rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False v1["tolerates-immutable-read-overrun"] = False
@ -1186,7 +1188,8 @@ class DownloadV2(_Base, unittest.TestCase):
self.set_up_grid() self.set_up_grid()
self.c0 = self.g.clients[0] self.c0 = self.g.clients[0]
for (peerid, rref) in self.c0.storage_broker.get_all_servers(): for s in self.c0.storage_broker.get_connected_servers():
rref = s.get_rref()
v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
v1["tolerates-immutable-read-overrun"] = False v1["tolerates-immutable-read-overrun"] = False

View File

@ -101,7 +101,8 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
self.c0 = self.g.clients[0] self.c0 = self.g.clients[0]
nm = self.c0.nodemaker nm = self.c0.nodemaker
self.servers = sorted([(id, ss) for (id, ss) in nm.storage_broker.get_all_servers()]) self.servers = sorted([(s.get_serverid(), s.get_rref())
for s in nm.storage_broker.get_connected_servers()])
self.servers = self.servers[5:] + self.servers[:5] self.servers = self.servers[5:] + self.servers[:5]
if mutable: if mutable:

View File

@ -95,12 +95,23 @@ class TestShareFinder(unittest.TestCase):
self.s.hungry() self.s.hungry()
eventually(_give_buckets_and_hunger_again) eventually(_give_buckets_and_hunger_again)
return d return d
class MockIServer(object):
def __init__(self, serverid, rref):
self.serverid = serverid
self.rref = rref
def get_serverid(self):
return self.serverid
def get_rref(self):
return self.rref
mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()}) mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
mockserver2 = MockServer({}) mockserver2 = MockServer({})
mockserver3 = MockServer({3: mock.Mock()}) mockserver3 = MockServer({3: mock.Mock()})
mockstoragebroker = mock.Mock() mockstoragebroker = mock.Mock()
mockstoragebroker.get_servers_for_index.return_value = [ ('ms1', mockserver1), ('ms2', mockserver2), ('ms3', mockserver3), ] servers = [ MockIServer("ms1", mockserver1),
MockIServer("ms2", mockserver2),
MockIServer("ms3", mockserver3), ]
mockstoragebroker.get_servers_for_psi.return_value = servers
mockdownloadstatus = mock.Mock() mockdownloadstatus = mock.Mock()
mocknode = MockNode(check_reneging=True, check_fetch_failed=True) mocknode = MockNode(check_reneging=True, check_fetch_failed=True)

View File

@ -1910,11 +1910,9 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
d.addCallback(_got_key) d.addCallback(_got_key)
def _break_peer0(res): def _break_peer0(res):
si = self._storage_index si = self._storage_index
peerlist = nm.storage_broker.get_servers_for_index(si) servers = nm.storage_broker.get_servers_for_psi(si)
peerid0, connection0 = peerlist[0] self.g.break_server(servers[0].get_serverid())
peerid1, connection1 = peerlist[1] self.server1 = servers[1]
connection0.broken = True
self.connection1 = connection1
d.addCallback(_break_peer0) d.addCallback(_break_peer0)
# now "create" the file, using the pre-established key, and let the # now "create" the file, using the pre-established key, and let the
# initial publish finally happen # initial publish finally happen
@ -1925,7 +1923,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
# now break the second peer # now break the second peer
def _break_peer1(res): def _break_peer1(res):
self.connection1.broken = True self.g.break_server(self.server1.get_serverid())
d.addCallback(_break_peer1) d.addCallback(_break_peer1)
d.addCallback(lambda res: n.overwrite("contents 2")) d.addCallback(lambda res: n.overwrite("contents 2"))
# that ought to work too # that ought to work too
@ -1956,7 +1954,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
nm = self.g.clients[0].nodemaker nm = self.g.clients[0].nodemaker
sb = nm.storage_broker sb = nm.storage_broker
peerids = [serverid for (serverid,ss) in sb.get_all_servers()] peerids = [s.get_serverid() for s in sb.get_connected_servers()]
self.g.break_server(peerids[0]) self.g.break_server(peerids[0])
d = nm.create_mutable_file("contents 1") d = nm.create_mutable_file("contents 1")
@ -1980,8 +1978,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
self.basedir = "mutable/Problems/test_publish_all_servers_bad" self.basedir = "mutable/Problems/test_publish_all_servers_bad"
self.set_up_grid() self.set_up_grid()
nm = self.g.clients[0].nodemaker nm = self.g.clients[0].nodemaker
for (serverid,ss) in nm.storage_broker.get_all_servers(): for s in nm.storage_broker.get_connected_servers():
ss.broken = True s.get_rref().broken = True
d = self.shouldFail(NotEnoughServersError, d = self.shouldFail(NotEnoughServersError,
"test_publish_all_servers_bad", "test_publish_all_servers_bad",
@ -2033,8 +2031,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
# 1. notice which server gets a read() call first # 1. notice which server gets a read() call first
# 2. tell that server to start throwing errors # 2. tell that server to start throwing errors
killer = FirstServerGetsKilled() killer = FirstServerGetsKilled()
for (serverid,ss) in nm.storage_broker.get_all_servers(): for s in nm.storage_broker.get_connected_servers():
ss.post_call_notifier = killer.notify s.get_rref().post_call_notifier = killer.notify
d.addCallback(_created) d.addCallback(_created)
# now we update a servermap from a new node (which doesn't have the # now we update a servermap from a new node (which doesn't have the
@ -2059,8 +2057,8 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
self.uri = n.get_uri() self.uri = n.get_uri()
self.n2 = nm.create_from_cap(self.uri) self.n2 = nm.create_from_cap(self.uri)
deleter = FirstServerGetsDeleted() deleter = FirstServerGetsDeleted()
for (serverid,ss) in nm.storage_broker.get_all_servers(): for s in nm.storage_broker.get_connected_servers():
ss.post_call_notifier = deleter.notify s.get_rref().post_call_notifier = deleter.notify
d.addCallback(_created) d.addCallback(_created)
d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE)) d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
return d return d

View File

@ -65,7 +65,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
all_peerids = c.get_storage_broker().get_all_serverids() all_peerids = c.get_storage_broker().get_all_serverids()
self.failUnlessEqual(len(all_peerids), self.numclients+1) self.failUnlessEqual(len(all_peerids), self.numclients+1)
sb = c.storage_broker sb = c.storage_broker
permuted_peers = sb.get_servers_for_index("a") permuted_peers = sb.get_servers_for_psi("a")
self.failUnlessEqual(len(permuted_peers), self.numclients+1) self.failUnlessEqual(len(permuted_peers), self.numclients+1)
d.addCallback(_check) d.addCallback(_check)
@ -101,7 +101,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
all_peerids = c.get_storage_broker().get_all_serverids() all_peerids = c.get_storage_broker().get_all_serverids()
self.failUnlessEqual(len(all_peerids), self.numclients) self.failUnlessEqual(len(all_peerids), self.numclients)
sb = c.storage_broker sb = c.storage_broker
permuted_peers = sb.get_servers_for_index("a") permuted_peers = sb.get_servers_for_psi("a")
self.failUnlessEqual(len(permuted_peers), self.numclients) self.failUnlessEqual(len(permuted_peers), self.numclients)
d.addCallback(_check_connections) d.addCallback(_check_connections)

View File

@ -1730,12 +1730,8 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
d.addCallback(lambda ign: d.addCallback(lambda ign:
self._add_server(server_number=2)) self._add_server(server_number=2))
def _break_server_2(ign): def _break_server_2(ign):
server = self.g.servers_by_number[2].my_nodeid serverid = self.g.servers_by_number[2].my_nodeid
# We have to break the server in servers_by_id, self.g.break_server(serverid)
# because the one in servers_by_number isn't wrapped,
# and doesn't look at its broken attribute when answering
# queries.
self.g.servers_by_id[server].broken = True
d.addCallback(_break_server_2) d.addCallback(_break_server_2)
d.addCallback(lambda ign: d.addCallback(lambda ign:
self._add_server(server_number=3, readonly=True)) self._add_server(server_number=3, readonly=True))

View File

@ -139,9 +139,9 @@ class ResultsBase:
# this table is sorted by permuted order # this table is sorted by permuted order
sb = c.get_storage_broker() sb = c.get_storage_broker()
permuted_peer_ids = [peerid permuted_peer_ids = [s.get_serverid()
for (peerid, rref) for s
in sb.get_servers_for_index(cr.get_storage_index())] in sb.get_servers_for_psi(cr.get_storage_index())]
num_shares_left = sum([len(shares) for shares in servers.values()]) num_shares_left = sum([len(shares) for shares in servers.values()])
servermap = [] servermap = []

View File

@ -247,18 +247,18 @@ class Root(rend.Page):
def data_connected_storage_servers(self, ctx, data): def data_connected_storage_servers(self, ctx, data):
sb = self.client.get_storage_broker() sb = self.client.get_storage_broker()
return len(sb.get_all_servers()) return len(sb.get_connected_servers())
def data_services(self, ctx, data): def data_services(self, ctx, data):
sb = self.client.get_storage_broker() sb = self.client.get_storage_broker()
return sb.get_all_descriptors() return sb.get_known_servers()
def render_service_row(self, ctx, descriptor): def render_service_row(self, ctx, server):
nodeid = descriptor.get_serverid() nodeid = server.get_serverid()
ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid)) ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid))
ctx.fillSlots("nickname", descriptor.get_nickname()) ctx.fillSlots("nickname", server.get_nickname())
rhost = descriptor.get_remote_host() rhost = server.get_remote_host()
if rhost: if rhost:
if nodeid == self.client.nodeid: if nodeid == self.client.nodeid:
rhost_s = "(loopback)" rhost_s = "(loopback)"
@ -267,12 +267,12 @@ class Root(rend.Page):
else: else:
rhost_s = str(rhost) rhost_s = str(rhost)
connected = "Yes: to " + rhost_s connected = "Yes: to " + rhost_s
since = descriptor.get_last_connect_time() since = server.get_last_connect_time()
else: else:
connected = "No" connected = "No"
since = descriptor.get_last_loss_time() since = server.get_last_loss_time()
announced = descriptor.get_announcement_time() announced = server.get_announcement_time()
announcement = descriptor.get_announcement() announcement = server.get_announcement()
version = announcement["my-version"] version = announcement["my-version"]
service_name = announcement["service-name"] service_name = announcement["service-name"]