simplify when_connected_enough()

This seems happier as a method on StorageBroker, rather than a
completely separate helper class.
This commit is contained in:
Brian Warner 2016-07-21 17:23:22 -07:00
parent c15d706faf
commit 1b64ab5e85
3 changed files with 35 additions and 50 deletions

View File

@ -491,14 +491,12 @@ class Client(node.Node, pollmixin.PollMixin):
s.setServiceParent(self)
s.startService()
# start processing the upload queue when we've connected to enough servers
connection_threshold = min(self.encoding_params["k"],
self.encoding_params["happy"] + 1)
connected = storage_client.ConnectedEnough(
self.storage_broker,
connection_threshold,
)
connected.when_connected_enough().addCallback(lambda ign: s.ready())
# start processing the upload queue when we've connected to
# enough servers
threshold = min(self.encoding_params["k"],
self.encoding_params["happy"] + 1)
d = self.storage_broker.when_connected_enough(threshold)
d.addCallback(lambda ign: s.ready())
def _check_exit_trigger(self, exit_trigger_file):
if os.path.exists(exit_trigger_file):

View File

@ -38,7 +38,7 @@ from foolscap.api import Tub, eventually
from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.observer import OneShotObserverList, ObserverList
from allmydata.util.observer import ObserverList
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import sha1
@ -59,41 +59,6 @@ from allmydata.util.hashutil import sha1
# don't pass signatures: only pass validated blessed-objects
class ConnectedEnough(object):
def __init__(self, storage_farm_broker, threshold):
self._broker = storage_farm_broker
self._threshold = int(threshold)
if self._threshold <= 0:
raise ValueError("threshold must be positive")
self._threshold_passed = False
self._observers = OneShotObserverList()
self._broker.on_servers_changed(self._check_enough_connected)
def when_connected_enough(self):
"""
:returns: a Deferred that fires if/when our high water mark for
number of connected servers becomes (or ever was) above
"threshold".
"""
if self._threshold_passed:
return defer.succeed(None)
return self._observers.when_fired()
def _check_enough_connected(self):
"""
internal helper
"""
if self._threshold_passed:
return
num_servers = len(self._broker.get_connected_servers())
if num_servers >= self._threshold:
self._threshold_passed = True
self._observers.fire(None)
class StorageFarmBroker(service.MultiService):
implements(IStorageBroker)
"""I live on the client, and know about storage servers. For each server
@ -115,10 +80,19 @@ class StorageFarmBroker(service.MultiService):
# them for it.
self.servers = {}
self.introducer_client = None
self._server_listeners = ObserverList()
self._threshold_listeners = [] # tuples of (threshold, Deferred)
self._connected_high_water_mark = 0
def on_servers_changed(self, callback):
self._server_listeners.subscribe(callback)
def when_connected_enough(self, threshold):
"""
:returns: a Deferred that fires if/when our high water mark for
number of connected servers becomes (or ever was) above
"threshold".
"""
d = defer.Deferred()
self._threshold_listeners.append( (threshold, d) )
self._check_connected_high_water_mark()
return d
# these two are used in unit tests
def test_add_rref(self, serverid, rref, ann):
@ -137,7 +111,20 @@ class StorageFarmBroker(service.MultiService):
def _got_connection(self):
# this is called by NativeStorageClient when it is connected
self._server_listeners.notify()
self._check_connected_high_water_mark()
def _check_connected_high_water_mark(self):
current = len(self.get_connected_servers())
if current > self._connected_high_water_mark:
self._connected_high_water_mark = current
remaining = []
for threshold, d in self._threshold_listeners:
if self._connected_high_water_mark >= threshold:
eventually(d.callback, None)
else:
remaining.append( (threshold, d) )
self._threshold_listeners = remaining
def _got_announcement(self, key_s, ann):
if key_s is not None:

View File

@ -5,7 +5,7 @@ from twisted.trial import unittest
from twisted.internet.defer import succeed, inlineCallbacks
from allmydata.storage_client import NativeStorageServer
from allmydata.storage_client import StorageFarmBroker, ConnectedEnough
from allmydata.storage_client import StorageFarmBroker
class NativeStorageServerWithVersion(NativeStorageServer):
@ -42,7 +42,7 @@ class TestStorageFarmBroker(unittest.TestCase):
def test_threshold_reached(self):
introducer = Mock()
broker = StorageFarmBroker(True)
done = ConnectedEnough(broker, 5).when_connected_enough()
done = broker.when_connected_enough(5)
broker.use_introducer(introducer)
# subscribes to "storage" to learn of new storage nodes
subscribe = introducer.mock_calls[0]