start to factor server-connection-management into a distinct 'StorageServerFarmBroker' object, separate from the client and the introducer. This is the starting point for #467: static server selection
This commit is contained in:
parent
d29281c9c5
commit
c516361fd2
@ -11,6 +11,7 @@ from pycryptopp.publickey import rsa
|
|||||||
|
|
||||||
import allmydata
|
import allmydata
|
||||||
from allmydata.storage.server import StorageServer
|
from allmydata.storage.server import StorageServer
|
||||||
|
from allmydata import storage_client
|
||||||
from allmydata.immutable.upload import Uploader
|
from allmydata.immutable.upload import Uploader
|
||||||
from allmydata.immutable.download import Downloader
|
from allmydata.immutable.download import Downloader
|
||||||
from allmydata.immutable.filenode import FileNode, LiteralFileNode
|
from allmydata.immutable.filenode import FileNode, LiteralFileNode
|
||||||
@ -220,6 +221,8 @@ class Client(node.Node, pollmixin.PollMixin):
|
|||||||
convergence_s = self.get_or_create_private_config('convergence', _make_secret)
|
convergence_s = self.get_or_create_private_config('convergence', _make_secret)
|
||||||
self.convergence = base32.a2b(convergence_s)
|
self.convergence = base32.a2b(convergence_s)
|
||||||
self._node_cache = weakref.WeakValueDictionary() # uri -> node
|
self._node_cache = weakref.WeakValueDictionary() # uri -> node
|
||||||
|
|
||||||
|
self.init_client_storage_broker()
|
||||||
self.add_service(History(self.stats_provider))
|
self.add_service(History(self.stats_provider))
|
||||||
self.add_service(Uploader(helper_furl, self.stats_provider))
|
self.add_service(Uploader(helper_furl, self.stats_provider))
|
||||||
download_cachedir = os.path.join(self.basedir,
|
download_cachedir = os.path.join(self.basedir,
|
||||||
@ -229,6 +232,37 @@ class Client(node.Node, pollmixin.PollMixin):
|
|||||||
self.add_service(Downloader(self.stats_provider))
|
self.add_service(Downloader(self.stats_provider))
|
||||||
self.init_stub_client()
|
self.init_stub_client()
|
||||||
|
|
||||||
|
def init_client_storage_broker(self):
|
||||||
|
# create a StorageFarmBroker object, for use by Uploader/Downloader
|
||||||
|
# (and everybody else who wants to use storage servers)
|
||||||
|
self.storage_broker = sb = storage_client.StorageFarmBroker()
|
||||||
|
|
||||||
|
# load static server specifications from tahoe.cfg, if any
|
||||||
|
#if self.config.has_section("client-server-selection"):
|
||||||
|
# server_params = {} # maps serverid to dict of parameters
|
||||||
|
# for (name, value) in self.config.items("client-server-selection"):
|
||||||
|
# pieces = name.split(".")
|
||||||
|
# if pieces[0] == "server":
|
||||||
|
# serverid = pieces[1]
|
||||||
|
# if serverid not in server_params:
|
||||||
|
# server_params[serverid] = {}
|
||||||
|
# server_params[serverid][pieces[2]] = value
|
||||||
|
# for serverid, params in server_params.items():
|
||||||
|
# server_type = params.pop("type")
|
||||||
|
# if server_type == "tahoe-foolscap":
|
||||||
|
# s = storage_client.NativeStorageClient(*params)
|
||||||
|
# else:
|
||||||
|
# msg = ("unrecognized server type '%s' in "
|
||||||
|
# "tahoe.cfg [client-server-selection]server.%s.type"
|
||||||
|
# % (server_type, serverid))
|
||||||
|
# raise storage_client.UnknownServerTypeError(msg)
|
||||||
|
# sb.add_server(s.serverid, s)
|
||||||
|
|
||||||
|
# check to see if we're supposed to use the introducer too
|
||||||
|
if self.get_config("client-server-selection", "use_introducer",
|
||||||
|
default=True, boolean=True):
|
||||||
|
sb.use_introducer(self.introducer_client)
|
||||||
|
|
||||||
def init_stub_client(self):
|
def init_stub_client(self):
|
||||||
def _publish(res):
|
def _publish(res):
|
||||||
# we publish an empty object so that the introducer can count how
|
# we publish an empty object so that the introducer can count how
|
||||||
@ -338,18 +372,10 @@ class Client(node.Node, pollmixin.PollMixin):
|
|||||||
self.log("hotline file missing, shutting down")
|
self.log("hotline file missing, shutting down")
|
||||||
reactor.stop()
|
reactor.stop()
|
||||||
|
|
||||||
def get_all_peerids(self):
|
def get_all_serverids(self):
|
||||||
return self.introducer_client.get_all_peerids()
|
return self.storage_broker.get_all_serverids()
|
||||||
def get_nickname_for_peerid(self, peerid):
|
def get_nickname_for_serverid(self, serverid):
|
||||||
return self.introducer_client.get_nickname_for_peerid(peerid)
|
return self.storage_broker.get_nickname_for_serverid(serverid)
|
||||||
|
|
||||||
def get_permuted_peers(self, service_name, key):
|
|
||||||
"""
|
|
||||||
@return: list of (peerid, connection,)
|
|
||||||
"""
|
|
||||||
assert isinstance(service_name, str)
|
|
||||||
assert isinstance(key, str)
|
|
||||||
return self.introducer_client.get_permuted_peers(service_name, key)
|
|
||||||
|
|
||||||
def get_encoding_parameters(self):
|
def get_encoding_parameters(self):
|
||||||
return self.DEFAULT_ENCODING_PARAMETERS
|
return self.DEFAULT_ENCODING_PARAMETERS
|
||||||
@ -371,7 +397,7 @@ class Client(node.Node, pollmixin.PollMixin):
|
|||||||
temporary test network and need to know when it is safe to proceed
|
temporary test network and need to know when it is safe to proceed
|
||||||
with an upload or download."""
|
with an upload or download."""
|
||||||
def _check():
|
def _check():
|
||||||
current_clients = list(self.get_all_peerids())
|
current_clients = list(self.get_all_serverids())
|
||||||
return len(current_clients) >= num_clients
|
return len(current_clients) >= num_clients
|
||||||
d = self.poll(_check, 0.5)
|
d = self.poll(_check, 0.5)
|
||||||
d.addCallback(lambda res: None)
|
d.addCallback(lambda res: None)
|
||||||
|
@ -743,8 +743,8 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
|||||||
|
|
||||||
def _get_all_shareholders(self):
|
def _get_all_shareholders(self):
|
||||||
dl = []
|
dl = []
|
||||||
for (peerid,ss) in self._client.get_permuted_peers("storage",
|
sb = self._client.storage_broker
|
||||||
self._storage_index):
|
for (peerid,ss) in sb.get_servers(self._storage_index):
|
||||||
d = ss.callRemote("get_buckets", self._storage_index)
|
d = ss.callRemote("get_buckets", self._storage_index)
|
||||||
d.addCallbacks(self._got_response, self._got_error,
|
d.addCallbacks(self._got_response, self._got_error,
|
||||||
callbackArgs=(peerid,))
|
callbackArgs=(peerid,))
|
||||||
|
@ -54,7 +54,7 @@ class CHKCheckerAndUEBFetcher:
|
|||||||
|
|
||||||
def _get_all_shareholders(self, storage_index):
|
def _get_all_shareholders(self, storage_index):
|
||||||
dl = []
|
dl = []
|
||||||
for (peerid, ss) in self._peer_getter("storage", storage_index):
|
for (peerid, ss) in self._peer_getter(storage_index):
|
||||||
d = ss.callRemote("get_buckets", storage_index)
|
d = ss.callRemote("get_buckets", storage_index)
|
||||||
d.addCallbacks(self._got_response, self._got_error,
|
d.addCallbacks(self._got_response, self._got_error,
|
||||||
callbackArgs=(peerid,))
|
callbackArgs=(peerid,))
|
||||||
@ -622,8 +622,8 @@ class Helper(Referenceable, service.MultiService):
|
|||||||
# see if this file is already in the grid
|
# see if this file is already in the grid
|
||||||
lp2 = self.log("doing a quick check+UEBfetch",
|
lp2 = self.log("doing a quick check+UEBfetch",
|
||||||
parent=lp, level=log.NOISY)
|
parent=lp, level=log.NOISY)
|
||||||
c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
|
sb = self.parent.storage_broker
|
||||||
storage_index, lp2)
|
c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2)
|
||||||
d = c.check()
|
d = c.check()
|
||||||
def _checked(res):
|
def _checked(res):
|
||||||
if res:
|
if res:
|
||||||
|
@ -166,7 +166,8 @@ class Tahoe2PeerSelector:
|
|||||||
self.use_peers = set() # PeerTrackers that have shares assigned to them
|
self.use_peers = set() # PeerTrackers that have shares assigned to them
|
||||||
self.preexisting_shares = {} # sharenum -> peerid holding the share
|
self.preexisting_shares = {} # sharenum -> peerid holding the share
|
||||||
|
|
||||||
peers = client.get_permuted_peers("storage", storage_index)
|
sb = client.storage_broker
|
||||||
|
peers = list(sb.get_servers(storage_index))
|
||||||
if not peers:
|
if not peers:
|
||||||
raise NoServersError("client gave us zero peers")
|
raise NoServersError("client gave us zero peers")
|
||||||
|
|
||||||
|
@ -190,9 +190,8 @@ class Publish:
|
|||||||
assert self._privkey
|
assert self._privkey
|
||||||
self._encprivkey = self._node.get_encprivkey()
|
self._encprivkey = self._node.get_encprivkey()
|
||||||
|
|
||||||
client = self._node._client
|
sb = self._node._client.storage_broker
|
||||||
full_peerlist = client.get_permuted_peers("storage",
|
full_peerlist = sb.get_servers(self._storage_index)
|
||||||
self._storage_index)
|
|
||||||
self.full_peerlist = full_peerlist # for use later, immutable
|
self.full_peerlist = full_peerlist # for use later, immutable
|
||||||
self.bad_peers = set() # peerids who have errbacked/refused requests
|
self.bad_peers = set() # peerids who have errbacked/refused requests
|
||||||
|
|
||||||
|
@ -421,9 +421,8 @@ class ServermapUpdater:
|
|||||||
|
|
||||||
self._queries_completed = 0
|
self._queries_completed = 0
|
||||||
|
|
||||||
client = self._node._client
|
sb = self._node._client.storage_broker
|
||||||
full_peerlist = client.get_permuted_peers("storage",
|
full_peerlist = list(sb.get_servers(self._node._storage_index))
|
||||||
self._node._storage_index)
|
|
||||||
self.full_peerlist = full_peerlist # for use later, immutable
|
self.full_peerlist = full_peerlist # for use later, immutable
|
||||||
self.extra_peers = full_peerlist[:] # peers are removed as we use them
|
self.extra_peers = full_peerlist[:] # peers are removed as we use them
|
||||||
self._good_peers = set() # peers who had some shares
|
self._good_peers = set() # peers who had some shares
|
||||||
|
78
src/allmydata/storage_client.py
Normal file
78
src/allmydata/storage_client.py
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
|
||||||
|
"""
|
||||||
|
I contain the client-side code which speaks to storage servers, in particular
|
||||||
|
the foolscap-based server implemented in src/allmydata/storage/*.py .
|
||||||
|
"""
|
||||||
|
|
||||||
|
# roadmap:
|
||||||
|
#
|
||||||
|
# implement ServerFarm, change Client to create it, change
|
||||||
|
# uploader/servermap to get rrefs from it. ServerFarm calls
|
||||||
|
# IntroducerClient.subscribe_to .
|
||||||
|
#
|
||||||
|
# implement NativeStorageClient, change Tahoe2PeerSelector to use it. All
|
||||||
|
# NativeStorageClients come from the introducer
|
||||||
|
#
|
||||||
|
# change web/check_results.py to get NativeStorageClients from check results,
|
||||||
|
# ask it for a nickname (instead of using client.get_nickname_for_serverid)
|
||||||
|
#
|
||||||
|
# implement tahoe.cfg scanner, create static NativeStorageClients
|
||||||
|
|
||||||
|
import sha
|
||||||
|
|
||||||
|
class StorageFarmBroker:
|
||||||
|
"""I live on the client, and know about storage servers. For each server
|
||||||
|
that is participating in a grid, I either maintain a connection to it or
|
||||||
|
remember enough information to establish a connection to it on demand.
|
||||||
|
I'm also responsible for subscribing to the IntroducerClient to find out
|
||||||
|
about new servers as they are announced by the Introducer.
|
||||||
|
"""
|
||||||
|
def __init__(self, permute_peers=True):
|
||||||
|
assert permute_peers # False not implemented yet
|
||||||
|
self.servers = {} # serverid -> StorageClient instance
|
||||||
|
self.permute_peers = permute_peers
|
||||||
|
self.introducer_client = None
|
||||||
|
def add_server(self, serverid, s):
|
||||||
|
self.servers[serverid] = s
|
||||||
|
def use_introducer(self, introducer_client):
|
||||||
|
self.introducer_client = ic = introducer_client
|
||||||
|
ic.subscribe_to("storage")
|
||||||
|
|
||||||
|
def get_servers(self, peer_selection_index):
|
||||||
|
# first cut: return an iterator of (peerid, versioned-rref) tuples
|
||||||
|
assert self.permute_peers == True
|
||||||
|
servers = {}
|
||||||
|
for serverid,server in self.servers.items():
|
||||||
|
servers[serverid] = server
|
||||||
|
if self.introducer_client:
|
||||||
|
ic = self.introducer_client
|
||||||
|
for serverid,server in ic.get_permuted_peers("storage",
|
||||||
|
peer_selection_index):
|
||||||
|
servers[serverid] = server
|
||||||
|
servers = servers.items()
|
||||||
|
key = peer_selection_index
|
||||||
|
return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
|
||||||
|
|
||||||
|
def get_all_serverids(self):
|
||||||
|
for serverid in self.servers:
|
||||||
|
yield serverid
|
||||||
|
if self.introducer_client:
|
||||||
|
for serverid,server in self.introducer_client.get_peers("storage"):
|
||||||
|
yield serverid
|
||||||
|
|
||||||
|
def get_nickname_for_serverid(self, serverid):
|
||||||
|
if serverid in self.servers:
|
||||||
|
return self.servers[serverid].nickname
|
||||||
|
if self.introducer_client:
|
||||||
|
return self.introducer_client.get_nickname_for_peerid(serverid)
|
||||||
|
return None
|
||||||
|
|
||||||
|
class NativeStorageClient:
|
||||||
|
def __init__(self, serverid, furl, nickname, min_shares=1):
|
||||||
|
self.serverid = serverid
|
||||||
|
self.furl = furl
|
||||||
|
self.nickname = nickname
|
||||||
|
self.min_shares = min_shares
|
||||||
|
|
||||||
|
class UnknownServerTypeError(Exception):
|
||||||
|
pass
|
@ -104,6 +104,11 @@ def wrap(original, service_name):
|
|||||||
wrapper.version = version
|
wrapper.version = version
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
class NoNetworkStorageBroker:
|
||||||
|
def get_servers(self, key):
|
||||||
|
return sorted(self.client._servers,
|
||||||
|
key=lambda x: sha.new(key+x[0]).digest())
|
||||||
|
|
||||||
class NoNetworkClient(Client):
|
class NoNetworkClient(Client):
|
||||||
|
|
||||||
def create_tub(self):
|
def create_tub(self):
|
||||||
@ -126,15 +131,16 @@ class NoNetworkClient(Client):
|
|||||||
pass
|
pass
|
||||||
def init_storage(self):
|
def init_storage(self):
|
||||||
pass
|
pass
|
||||||
|
def init_client_storage_broker(self):
|
||||||
|
self.storage_broker = NoNetworkStorageBroker()
|
||||||
|
self.storage_broker.client = self
|
||||||
def init_stub_client(self):
|
def init_stub_client(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_servers(self, service_name):
|
def get_servers(self, service_name):
|
||||||
return self._servers
|
return self._servers
|
||||||
|
|
||||||
def get_permuted_peers(self, service_name, key):
|
def get_nickname_for_serverid(self, serverid):
|
||||||
return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest())
|
|
||||||
def get_nickname_for_peerid(self, peerid):
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
class SimpleStats:
|
class SimpleStats:
|
||||||
|
@ -3,32 +3,32 @@ import simplejson
|
|||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from allmydata import check_results, uri
|
from allmydata import check_results, uri
|
||||||
from allmydata.web import check_results as web_check_results
|
from allmydata.web import check_results as web_check_results
|
||||||
|
from allmydata.storage_client import StorageFarmBroker, NativeStorageClient
|
||||||
from common_web import WebRenderingMixin
|
from common_web import WebRenderingMixin
|
||||||
|
|
||||||
class FakeClient:
|
class FakeClient:
|
||||||
def get_nickname_for_peerid(self, peerid):
|
def get_nickname_for_serverid(self, serverid):
|
||||||
if peerid == "\x00"*20:
|
return self.storage_broker.get_nickname_for_serverid(serverid)
|
||||||
return "peer-0"
|
|
||||||
if peerid == "\xff"*20:
|
|
||||||
return "peer-f"
|
|
||||||
if peerid == "\x11"*20:
|
|
||||||
return "peer-11"
|
|
||||||
return "peer-unknown"
|
|
||||||
|
|
||||||
def get_permuted_peers(self, service, key):
|
|
||||||
return [("\x00"*20, None),
|
|
||||||
("\x11"*20, None),
|
|
||||||
("\xff"*20, None),
|
|
||||||
]
|
|
||||||
|
|
||||||
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
|
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
|
||||||
|
|
||||||
|
def create_fake_client(self):
|
||||||
|
sb = StorageFarmBroker()
|
||||||
|
for (peerid, nickname) in [("\x00"*20, "peer-0"),
|
||||||
|
("\xff"*20, "peer-f"),
|
||||||
|
("\x11"*20, "peer-11")] :
|
||||||
|
n = NativeStorageClient(peerid, None, nickname)
|
||||||
|
sb.add_server(peerid, n)
|
||||||
|
c = FakeClient()
|
||||||
|
c.storage_broker = sb
|
||||||
|
return c
|
||||||
|
|
||||||
def render_json(self, page):
|
def render_json(self, page):
|
||||||
d = self.render1(page, args={"output": ["json"]})
|
d = self.render1(page, args={"output": ["json"]})
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_literal(self):
|
def test_literal(self):
|
||||||
c = FakeClient()
|
c = self.create_fake_client()
|
||||||
lcr = web_check_results.LiteralCheckResults(c)
|
lcr = web_check_results.LiteralCheckResults(c)
|
||||||
|
|
||||||
d = self.render1(lcr)
|
d = self.render1(lcr)
|
||||||
@ -53,7 +53,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def test_check(self):
|
def test_check(self):
|
||||||
c = FakeClient()
|
c = self.create_fake_client()
|
||||||
serverid_1 = "\x00"*20
|
serverid_1 = "\x00"*20
|
||||||
serverid_f = "\xff"*20
|
serverid_f = "\xff"*20
|
||||||
u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
|
u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
|
||||||
@ -151,7 +151,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
|
|||||||
|
|
||||||
|
|
||||||
def test_check_and_repair(self):
|
def test_check_and_repair(self):
|
||||||
c = FakeClient()
|
c = self.create_fake_client()
|
||||||
serverid_1 = "\x00"*20
|
serverid_1 = "\x00"*20
|
||||||
serverid_f = "\xff"*20
|
serverid_f = "\xff"*20
|
||||||
u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
|
u = uri.CHKFileURI("\x00"*16, "\x00"*32, 3, 10, 1234)
|
||||||
|
@ -6,6 +6,7 @@ from twisted.python import log
|
|||||||
|
|
||||||
import allmydata
|
import allmydata
|
||||||
from allmydata import client
|
from allmydata import client
|
||||||
|
from allmydata.storage_client import StorageFarmBroker
|
||||||
from allmydata.introducer.client import IntroducerClient
|
from allmydata.introducer.client import IntroducerClient
|
||||||
from allmydata.util import base32
|
from allmydata.util import base32
|
||||||
from foolscap.api import flushEventualQueue
|
from foolscap.api import flushEventualQueue
|
||||||
@ -140,30 +141,19 @@ class Basic(unittest.TestCase):
|
|||||||
c = client.Client(basedir)
|
c = client.Client(basedir)
|
||||||
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
|
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
|
||||||
|
|
||||||
def _permute(self, c, key):
|
def _permute(self, sb, key):
|
||||||
return [ peerid
|
return [ peerid
|
||||||
for (peerid,rref) in c.get_permuted_peers("storage", key) ]
|
for (peerid,rref) in sb.get_servers(key) ]
|
||||||
|
|
||||||
def test_permute(self):
|
def test_permute(self):
|
||||||
basedir = "test_client.Basic.test_permute"
|
sb = StorageFarmBroker()
|
||||||
os.mkdir(basedir)
|
|
||||||
open(os.path.join(basedir, "introducer.furl"), "w").write("")
|
|
||||||
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
|
|
||||||
c = client.Client(basedir)
|
|
||||||
c.introducer_client = FakeIntroducerClient()
|
|
||||||
for k in ["%d" % i for i in range(5)]:
|
for k in ["%d" % i for i in range(5)]:
|
||||||
c.introducer_client.add_peer(k)
|
sb.add_server(k, None)
|
||||||
|
|
||||||
self.failUnlessEqual(self._permute(c, "one"), ['3','1','0','4','2'])
|
self.failUnlessEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
|
||||||
self.failUnlessEqual(self._permute(c, "two"), ['0','4','2','1','3'])
|
self.failUnlessEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
|
||||||
c.introducer_client.remove_all_peers()
|
sb.servers = {}
|
||||||
self.failUnlessEqual(self._permute(c, "one"), [])
|
self.failUnlessEqual(self._permute(sb, "one"), [])
|
||||||
|
|
||||||
c2 = client.Client(basedir)
|
|
||||||
c2.introducer_client = FakeIntroducerClient()
|
|
||||||
for k in ["%d" % i for i in range(5)]:
|
|
||||||
c2.introducer_client.add_peer(k)
|
|
||||||
self.failUnlessEqual(self._permute(c2, "one"), ['3','1','0','4','2'])
|
|
||||||
|
|
||||||
def test_versions(self):
|
def test_versions(self):
|
||||||
basedir = "test_client.Basic.test_versions"
|
basedir = "test_client.Basic.test_versions"
|
||||||
|
@ -6,6 +6,7 @@ from foolscap.api import Tub, fireEventually, flushEventualQueue
|
|||||||
from foolscap.logging import log
|
from foolscap.logging import log
|
||||||
|
|
||||||
from allmydata.storage.server import si_b2a
|
from allmydata.storage.server import si_b2a
|
||||||
|
from allmydata.storage_client import StorageFarmBroker
|
||||||
from allmydata.immutable import offloaded, upload
|
from allmydata.immutable import offloaded, upload
|
||||||
from allmydata import uri
|
from allmydata import uri
|
||||||
from allmydata.util import hashutil, fileutil, mathutil
|
from allmydata.util import hashutil, fileutil, mathutil
|
||||||
@ -62,12 +63,11 @@ class FakeClient(service.MultiService):
|
|||||||
"max_segment_size": 1*MiB,
|
"max_segment_size": 1*MiB,
|
||||||
}
|
}
|
||||||
stats_provider = None
|
stats_provider = None
|
||||||
|
storage_broker = StorageFarmBroker()
|
||||||
def log(self, *args, **kwargs):
|
def log(self, *args, **kwargs):
|
||||||
return log.msg(*args, **kwargs)
|
return log.msg(*args, **kwargs)
|
||||||
def get_encoding_parameters(self):
|
def get_encoding_parameters(self):
|
||||||
return self.DEFAULT_ENCODING_PARAMETERS
|
return self.DEFAULT_ENCODING_PARAMETERS
|
||||||
def get_permuted_peers(self, service_name, storage_index):
|
|
||||||
return []
|
|
||||||
|
|
||||||
def flush_but_dont_ignore(res):
|
def flush_but_dont_ignore(res):
|
||||||
d = flushEventualQueue()
|
d = flushEventualQueue()
|
||||||
|
@ -17,7 +17,7 @@ from allmydata.monitor import Monitor
|
|||||||
from allmydata.test.common import ShouldFailMixin
|
from allmydata.test.common import ShouldFailMixin
|
||||||
from foolscap.api import eventually, fireEventually
|
from foolscap.api import eventually, fireEventually
|
||||||
from foolscap.logging import log
|
from foolscap.logging import log
|
||||||
import sha
|
from allmydata.storage_client import StorageFarmBroker
|
||||||
|
|
||||||
from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
|
from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
|
||||||
from allmydata.mutable.common import ResponseCache, \
|
from allmydata.mutable.common import ResponseCache, \
|
||||||
@ -171,12 +171,22 @@ class FakeClient:
|
|||||||
def __init__(self, num_peers=10):
|
def __init__(self, num_peers=10):
|
||||||
self._storage = FakeStorage()
|
self._storage = FakeStorage()
|
||||||
self._num_peers = num_peers
|
self._num_peers = num_peers
|
||||||
self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
|
peerids = [tagged_hash("peerid", "%d" % i)[:20]
|
||||||
for i in range(self._num_peers)]
|
for i in range(self._num_peers)]
|
||||||
self._connections = dict([(peerid, FakeStorageServer(peerid,
|
|
||||||
self._storage))
|
|
||||||
for peerid in self._peerids])
|
|
||||||
self.nodeid = "fakenodeid"
|
self.nodeid = "fakenodeid"
|
||||||
|
self.storage_broker = StorageFarmBroker()
|
||||||
|
for peerid in peerids:
|
||||||
|
fss = FakeStorageServer(peerid, self._storage)
|
||||||
|
self.storage_broker.add_server(peerid, fss)
|
||||||
|
|
||||||
|
def get_all_serverids(self):
|
||||||
|
return self.storage_broker.get_all_serverids()
|
||||||
|
def debug_break_connection(self, peerid):
|
||||||
|
self.storage_broker.servers[peerid].broken = True
|
||||||
|
def debug_remove_connection(self, peerid):
|
||||||
|
self.storage_broker.servers.pop(peerid)
|
||||||
|
def debug_get_connection(self, peerid):
|
||||||
|
return self.storage_broker.servers[peerid]
|
||||||
|
|
||||||
def get_encoding_parameters(self):
|
def get_encoding_parameters(self):
|
||||||
return {"k": 3, "n": 10}
|
return {"k": 3, "n": 10}
|
||||||
@ -204,19 +214,6 @@ class FakeClient:
|
|||||||
res = self.mutable_file_node_class(self).init_from_uri(u)
|
res = self.mutable_file_node_class(self).init_from_uri(u)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def get_permuted_peers(self, service_name, key):
|
|
||||||
"""
|
|
||||||
@return: list of (peerid, connection,)
|
|
||||||
"""
|
|
||||||
results = []
|
|
||||||
for (peerid, connection) in self._connections.items():
|
|
||||||
assert isinstance(peerid, str)
|
|
||||||
permuted = sha.new(key + peerid).digest()
|
|
||||||
results.append((permuted, peerid, connection))
|
|
||||||
results.sort()
|
|
||||||
results = [ (r[1],r[2]) for r in results]
|
|
||||||
return results
|
|
||||||
|
|
||||||
def upload(self, uploadable):
|
def upload(self, uploadable):
|
||||||
assert IUploadable.providedBy(uploadable)
|
assert IUploadable.providedBy(uploadable)
|
||||||
d = uploadable.get_size()
|
d = uploadable.get_size()
|
||||||
@ -276,7 +273,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
def _created(n):
|
def _created(n):
|
||||||
self.failUnless(isinstance(n, FastMutableFileNode))
|
self.failUnless(isinstance(n, FastMutableFileNode))
|
||||||
self.failUnlessEqual(n.get_storage_index(), n._storage_index)
|
self.failUnlessEqual(n.get_storage_index(), n._storage_index)
|
||||||
peer0 = self.client._peerids[0]
|
peer0 = sorted(self.client.get_all_serverids())[0]
|
||||||
shnums = self.client._storage._peers[peer0].keys()
|
shnums = self.client._storage._peers[peer0].keys()
|
||||||
self.failUnlessEqual(len(shnums), 1)
|
self.failUnlessEqual(len(shnums), 1)
|
||||||
d.addCallback(_created)
|
d.addCallback(_created)
|
||||||
@ -1575,7 +1572,7 @@ class MultipleEncodings(unittest.TestCase):
|
|||||||
|
|
||||||
sharemap = {}
|
sharemap = {}
|
||||||
|
|
||||||
for i,peerid in enumerate(self._client._peerids):
|
for i,peerid in enumerate(self._client.get_all_serverids()):
|
||||||
peerid_s = shortnodeid_b2a(peerid)
|
peerid_s = shortnodeid_b2a(peerid)
|
||||||
for shnum in self._shares1.get(peerid, {}):
|
for shnum in self._shares1.get(peerid, {}):
|
||||||
if shnum < len(places):
|
if shnum < len(places):
|
||||||
@ -1798,15 +1795,15 @@ class LessFakeClient(FakeClient):
|
|||||||
|
|
||||||
def __init__(self, basedir, num_peers=10):
|
def __init__(self, basedir, num_peers=10):
|
||||||
self._num_peers = num_peers
|
self._num_peers = num_peers
|
||||||
self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
|
peerids = [tagged_hash("peerid", "%d" % i)[:20]
|
||||||
for i in range(self._num_peers)]
|
for i in range(self._num_peers)]
|
||||||
self._connections = {}
|
self.storage_broker = StorageFarmBroker()
|
||||||
for peerid in self._peerids:
|
for peerid in peerids:
|
||||||
peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
|
peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
|
||||||
make_dirs(peerdir)
|
make_dirs(peerdir)
|
||||||
ss = StorageServer(peerdir, peerid)
|
ss = StorageServer(peerdir, peerid)
|
||||||
lw = LocalWrapper(ss)
|
lw = LocalWrapper(ss)
|
||||||
self._connections[peerid] = lw
|
self.storage_broker.add_server(peerid, lw)
|
||||||
self.nodeid = "fakenodeid"
|
self.nodeid = "fakenodeid"
|
||||||
|
|
||||||
|
|
||||||
@ -1886,7 +1883,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
self.old_map = smap
|
self.old_map = smap
|
||||||
# now shut down one of the servers
|
# now shut down one of the servers
|
||||||
peer0 = list(smap.make_sharemap()[0])[0]
|
peer0 = list(smap.make_sharemap()[0])[0]
|
||||||
self.client._connections.pop(peer0)
|
self.client.debug_remove_connection(peer0)
|
||||||
# then modify the file, leaving the old map untouched
|
# then modify the file, leaving the old map untouched
|
||||||
log.msg("starting winning write")
|
log.msg("starting winning write")
|
||||||
return n.overwrite("contents 2")
|
return n.overwrite("contents 2")
|
||||||
@ -1920,7 +1917,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
d.addCallback(n._generated)
|
d.addCallback(n._generated)
|
||||||
def _break_peer0(res):
|
def _break_peer0(res):
|
||||||
si = n.get_storage_index()
|
si = n.get_storage_index()
|
||||||
peerlist = self.client.get_permuted_peers("storage", si)
|
peerlist = list(self.client.storage_broker.get_servers(si))
|
||||||
peerid0, connection0 = peerlist[0]
|
peerid0, connection0 = peerlist[0]
|
||||||
peerid1, connection1 = peerlist[1]
|
peerid1, connection1 = peerlist[1]
|
||||||
connection0.broken = True
|
connection0.broken = True
|
||||||
@ -1939,6 +1936,12 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
# that ought to work too
|
# that ought to work too
|
||||||
d.addCallback(lambda res: n.download_best_version())
|
d.addCallback(lambda res: n.download_best_version())
|
||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
|
||||||
|
def _explain_error(f):
|
||||||
|
print f
|
||||||
|
if f.check(NotEnoughServersError):
|
||||||
|
print "first_error:", f.value.first_error
|
||||||
|
return f
|
||||||
|
d.addErrback(_explain_error)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_bad_server_overlap(self):
|
def test_bad_server_overlap(self):
|
||||||
@ -1954,8 +1957,8 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
|
basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
|
||||||
self.client = LessFakeClient(basedir, 10)
|
self.client = LessFakeClient(basedir, 10)
|
||||||
|
|
||||||
peerids = sorted(self.client._connections.keys())
|
peerids = list(self.client.get_all_serverids())
|
||||||
self.client._connections[peerids[0]].broken = True
|
self.client.debug_break_connection(peerids[0])
|
||||||
|
|
||||||
d = self.client.create_mutable_file("contents 1")
|
d = self.client.create_mutable_file("contents 1")
|
||||||
def _created(n):
|
def _created(n):
|
||||||
@ -1963,7 +1966,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
|
||||||
# now break one of the remaining servers
|
# now break one of the remaining servers
|
||||||
def _break_second_server(res):
|
def _break_second_server(res):
|
||||||
self.client._connections[peerids[1]].broken = True
|
self.client.debug_break_connection(peerids[1])
|
||||||
d.addCallback(_break_second_server)
|
d.addCallback(_break_second_server)
|
||||||
d.addCallback(lambda res: n.overwrite("contents 2"))
|
d.addCallback(lambda res: n.overwrite("contents 2"))
|
||||||
# that ought to work too
|
# that ought to work too
|
||||||
@ -1977,8 +1980,8 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
# Break all servers: the publish should fail
|
# Break all servers: the publish should fail
|
||||||
basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
|
basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
|
||||||
self.client = LessFakeClient(basedir, 20)
|
self.client = LessFakeClient(basedir, 20)
|
||||||
for connection in self.client._connections.values():
|
for peerid in self.client.get_all_serverids():
|
||||||
connection.broken = True
|
self.client.debug_break_connection(peerid)
|
||||||
d = self.shouldFail(NotEnoughServersError,
|
d = self.shouldFail(NotEnoughServersError,
|
||||||
"test_publish_all_servers_bad",
|
"test_publish_all_servers_bad",
|
||||||
"Ran out of non-bad servers",
|
"Ran out of non-bad servers",
|
||||||
|
@ -72,9 +72,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
|
|||||||
def _check(extra_node):
|
def _check(extra_node):
|
||||||
self.extra_node = extra_node
|
self.extra_node = extra_node
|
||||||
for c in self.clients:
|
for c in self.clients:
|
||||||
all_peerids = list(c.get_all_peerids())
|
all_peerids = list(c.get_all_serverids())
|
||||||
self.failUnlessEqual(len(all_peerids), self.numclients+1)
|
self.failUnlessEqual(len(all_peerids), self.numclients+1)
|
||||||
permuted_peers = list(c.get_permuted_peers("storage", "a"))
|
sb = c.storage_broker
|
||||||
|
permuted_peers = list(sb.get_servers("a"))
|
||||||
self.failUnlessEqual(len(permuted_peers), self.numclients+1)
|
self.failUnlessEqual(len(permuted_peers), self.numclients+1)
|
||||||
|
|
||||||
d.addCallback(_check)
|
d.addCallback(_check)
|
||||||
@ -109,9 +110,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
|
|||||||
d = self.set_up_nodes()
|
d = self.set_up_nodes()
|
||||||
def _check_connections(res):
|
def _check_connections(res):
|
||||||
for c in self.clients:
|
for c in self.clients:
|
||||||
all_peerids = list(c.get_all_peerids())
|
all_peerids = list(c.get_all_serverids())
|
||||||
self.failUnlessEqual(len(all_peerids), self.numclients)
|
self.failUnlessEqual(len(all_peerids), self.numclients)
|
||||||
permuted_peers = list(c.get_permuted_peers("storage", "a"))
|
sb = c.storage_broker
|
||||||
|
permuted_peers = list(sb.get_servers("a"))
|
||||||
self.failUnlessEqual(len(permuted_peers), self.numclients)
|
self.failUnlessEqual(len(permuted_peers), self.numclients)
|
||||||
d.addCallback(_check_connections)
|
d.addCallback(_check_connections)
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ from allmydata.util.assertutil import precondition
|
|||||||
from allmydata.util.deferredutil import DeferredListShouldSucceed
|
from allmydata.util.deferredutil import DeferredListShouldSucceed
|
||||||
from no_network import GridTestMixin
|
from no_network import GridTestMixin
|
||||||
from common_util import ShouldFailMixin
|
from common_util import ShouldFailMixin
|
||||||
|
from allmydata.storage_client import StorageFarmBroker
|
||||||
|
|
||||||
MiB = 1024*1024
|
MiB = 1024*1024
|
||||||
|
|
||||||
@ -158,22 +159,22 @@ class FakeClient:
|
|||||||
self.mode = mode
|
self.mode = mode
|
||||||
self.num_servers = num_servers
|
self.num_servers = num_servers
|
||||||
if mode == "some_big_some_small":
|
if mode == "some_big_some_small":
|
||||||
self.peers = []
|
peers = []
|
||||||
for fakeid in range(num_servers):
|
for fakeid in range(num_servers):
|
||||||
if fakeid % 2:
|
if fakeid % 2:
|
||||||
self.peers.append( ("%20d" % fakeid,
|
peers.append(("%20d" % fakeid, FakeStorageServer("good")))
|
||||||
FakeStorageServer("good")) )
|
|
||||||
else:
|
else:
|
||||||
self.peers.append( ("%20d" % fakeid,
|
peers.append(("%20d" % fakeid, FakeStorageServer("small")))
|
||||||
FakeStorageServer("small")) )
|
|
||||||
else:
|
else:
|
||||||
self.peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
|
peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
|
||||||
for fakeid in range(self.num_servers) ]
|
for fakeid in range(self.num_servers) ]
|
||||||
|
self.storage_broker = StorageFarmBroker()
|
||||||
|
for (serverid, server) in peers:
|
||||||
|
self.storage_broker.add_server(serverid, server)
|
||||||
|
self.last_peers = [p[1] for p in peers]
|
||||||
|
|
||||||
def log(self, *args, **kwargs):
|
def log(self, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
def get_permuted_peers(self, storage_index, include_myself):
|
|
||||||
self.last_peers = [p[1] for p in self.peers]
|
|
||||||
return self.peers
|
|
||||||
def get_encoding_parameters(self):
|
def get_encoding_parameters(self):
|
||||||
return self.DEFAULT_ENCODING_PARAMETERS
|
return self.DEFAULT_ENCODING_PARAMETERS
|
||||||
|
|
||||||
|
@ -9,6 +9,7 @@ from twisted.python import failure, log
|
|||||||
from nevow import rend
|
from nevow import rend
|
||||||
from allmydata import interfaces, uri, webish
|
from allmydata import interfaces, uri, webish
|
||||||
from allmydata.storage.shares import get_share_file
|
from allmydata.storage.shares import get_share_file
|
||||||
|
from allmydata.storage_client import StorageFarmBroker
|
||||||
from allmydata.immutable import upload, download
|
from allmydata.immutable import upload, download
|
||||||
from allmydata.web import status, common
|
from allmydata.web import status, common
|
||||||
from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
|
from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
|
||||||
@ -64,11 +65,10 @@ class FakeClient(service.MultiService):
|
|||||||
def connected_to_introducer(self):
|
def connected_to_introducer(self):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_nickname_for_peerid(self, peerid):
|
def get_nickname_for_serverid(self, serverid):
|
||||||
return u"John Doe"
|
return u"John Doe"
|
||||||
|
|
||||||
def get_permuted_peers(self, service_name, key):
|
storage_broker = StorageFarmBroker()
|
||||||
return []
|
|
||||||
|
|
||||||
def create_node_from_uri(self, auri):
|
def create_node_from_uri(self, auri):
|
||||||
precondition(isinstance(auri, str), auri)
|
precondition(isinstance(auri, str), auri)
|
||||||
|
@ -95,7 +95,7 @@ class ResultsBase:
|
|||||||
if data["list-corrupt-shares"]:
|
if data["list-corrupt-shares"]:
|
||||||
badsharemap = []
|
badsharemap = []
|
||||||
for (serverid, si, shnum) in data["list-corrupt-shares"]:
|
for (serverid, si, shnum) in data["list-corrupt-shares"]:
|
||||||
nickname = c.get_nickname_for_peerid(serverid)
|
nickname = c.get_nickname_for_serverid(serverid)
|
||||||
badsharemap.append(T.tr[T.td["sh#%d" % shnum],
|
badsharemap.append(T.tr[T.td["sh#%d" % shnum],
|
||||||
T.td[T.div(class_="nickname")[nickname],
|
T.td[T.div(class_="nickname")[nickname],
|
||||||
T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]],
|
T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]],
|
||||||
@ -123,7 +123,7 @@ class ResultsBase:
|
|||||||
shareid_s = ""
|
shareid_s = ""
|
||||||
if i == 0:
|
if i == 0:
|
||||||
shareid_s = shareid
|
shareid_s = shareid
|
||||||
nickname = c.get_nickname_for_peerid(serverid)
|
nickname = c.get_nickname_for_serverid(serverid)
|
||||||
sharemap.append(T.tr[T.td[shareid_s],
|
sharemap.append(T.tr[T.td[shareid_s],
|
||||||
T.td[T.div(class_="nickname")[nickname],
|
T.td[T.div(class_="nickname")[nickname],
|
||||||
T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]]
|
T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]]
|
||||||
@ -137,15 +137,15 @@ class ResultsBase:
|
|||||||
add("Unrecoverable Versions", data["count-unrecoverable-versions"])
|
add("Unrecoverable Versions", data["count-unrecoverable-versions"])
|
||||||
|
|
||||||
# this table is sorted by permuted order
|
# this table is sorted by permuted order
|
||||||
|
sb = c.storage_broker
|
||||||
permuted_peer_ids = [peerid
|
permuted_peer_ids = [peerid
|
||||||
for (peerid, rref)
|
for (peerid, rref)
|
||||||
in c.get_permuted_peers("storage",
|
in sb.get_servers(cr.get_storage_index())]
|
||||||
cr.get_storage_index())]
|
|
||||||
|
|
||||||
num_shares_left = sum([len(shares) for shares in servers.values()])
|
num_shares_left = sum([len(shares) for shares in servers.values()])
|
||||||
servermap = []
|
servermap = []
|
||||||
for serverid in permuted_peer_ids:
|
for serverid in permuted_peer_ids:
|
||||||
nickname = c.get_nickname_for_peerid(serverid)
|
nickname = c.get_nickname_for_serverid(serverid)
|
||||||
shareids = servers.get(serverid, [])
|
shareids = servers.get(serverid, [])
|
||||||
shareids.reverse()
|
shareids.reverse()
|
||||||
shareids_s = [ T.tt[shareid, " "] for shareid in sorted(shareids) ]
|
shareids_s = [ T.tt[shareid, " "] for shareid in sorted(shareids) ]
|
||||||
@ -419,7 +419,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin):
|
|||||||
def render_server_problem(self, ctx, data):
|
def render_server_problem(self, ctx, data):
|
||||||
serverid = data
|
serverid = data
|
||||||
data = [idlib.shortnodeid_b2a(serverid)]
|
data = [idlib.shortnodeid_b2a(serverid)]
|
||||||
nickname = self.client.get_nickname_for_peerid(serverid)
|
nickname = self.client.get_nickname_for_serverid(serverid)
|
||||||
if nickname:
|
if nickname:
|
||||||
data.append(" (%s)" % self._html(nickname))
|
data.append(" (%s)" % self._html(nickname))
|
||||||
return ctx.tag[data]
|
return ctx.tag[data]
|
||||||
@ -433,7 +433,7 @@ class DeepCheckResults(rend.Page, ResultsBase, ReloadMixin):
|
|||||||
return self.monitor.get_status().get_corrupt_shares()
|
return self.monitor.get_status().get_corrupt_shares()
|
||||||
def render_share_problem(self, ctx, data):
|
def render_share_problem(self, ctx, data):
|
||||||
serverid, storage_index, sharenum = data
|
serverid, storage_index, sharenum = data
|
||||||
nickname = self.client.get_nickname_for_peerid(serverid)
|
nickname = self.client.get_nickname_for_serverid(serverid)
|
||||||
ctx.fillSlots("serverid", idlib.shortnodeid_b2a(serverid))
|
ctx.fillSlots("serverid", idlib.shortnodeid_b2a(serverid))
|
||||||
if nickname:
|
if nickname:
|
||||||
ctx.fillSlots("nickname", self._html(nickname))
|
ctx.fillSlots("nickname", self._html(nickname))
|
||||||
|
Loading…
Reference in New Issue
Block a user