de-Service-ify Helper, pass in storage_broker and secret_holder directly.
This makes it more obvious that the Helper currently generates leases with the Helper's own secrets, rather than getting values from the client, which is arguably a bug that will likely be resolved with the Accounting project.
This commit is contained in:
parent
4a4a4f9520
commit
5283d4c19e
|
@ -111,6 +111,7 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||||
self.init_lease_secret()
|
self.init_lease_secret()
|
||||||
self.init_storage()
|
self.init_storage()
|
||||||
self.init_control()
|
self.init_control()
|
||||||
|
self.helper = None
|
||||||
if self.get_config("helper", "enabled", False, boolean=True):
|
if self.get_config("helper", "enabled", False, boolean=True):
|
||||||
self.init_helper()
|
self.init_helper()
|
||||||
self._key_generator = KeyGenerator()
|
self._key_generator = KeyGenerator()
|
||||||
|
@ -345,9 +346,9 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||||
def init_helper(self):
|
def init_helper(self):
|
||||||
d = self.when_tub_ready()
|
d = self.when_tub_ready()
|
||||||
def _publish(self):
|
def _publish(self):
|
||||||
h = Helper(os.path.join(self.basedir, "helper"),
|
self.helper = Helper(os.path.join(self.basedir, "helper"),
|
||||||
self.stats_provider, self.history)
|
self.storage_broker, self._secret_holder,
|
||||||
h.setServiceParent(self)
|
self.stats_provider, self.history)
|
||||||
# TODO: this is confusing. BASEDIR/private/helper.furl is created
|
# TODO: this is confusing. BASEDIR/private/helper.furl is created
|
||||||
# by the helper. BASEDIR/helper.furl is consumed by the client
|
# by the helper. BASEDIR/helper.furl is consumed by the client
|
||||||
# who wants to use the helper. I like having the filename be the
|
# who wants to use the helper. I like having the filename be the
|
||||||
|
@ -355,7 +356,7 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||||
# between config inputs and generated outputs is hard to see.
|
# between config inputs and generated outputs is hard to see.
|
||||||
helper_furlfile = os.path.join(self.basedir,
|
helper_furlfile = os.path.join(self.basedir,
|
||||||
"private", "helper.furl")
|
"private", "helper.furl")
|
||||||
self.tub.registerReference(h, furlFile=helper_furlfile)
|
self.tub.registerReference(self.helper, furlFile=helper_furlfile)
|
||||||
d.addCallback(_publish)
|
d.addCallback(_publish)
|
||||||
d.addErrback(log.err, facility="tahoe.init",
|
d.addErrback(log.err, facility="tahoe.init",
|
||||||
level=log.BAD, umid="K0mW5w")
|
level=log.BAD, umid="K0mW5w")
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
|
|
||||||
import os, stat, time, weakref
|
import os, stat, time, weakref
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
from twisted.application import service
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from foolscap.api import Referenceable, DeadReferenceError, eventually
|
from foolscap.api import Referenceable, DeadReferenceError, eventually
|
||||||
import allmydata # for __full_version__
|
import allmydata # for __full_version__
|
||||||
|
@ -135,7 +134,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||||
"application-version": str(allmydata.__full_version__),
|
"application-version": str(allmydata.__full_version__),
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, storage_index, helper,
|
def __init__(self, storage_index,
|
||||||
|
helper, storage_broker, secret_holder,
|
||||||
incoming_file, encoding_file,
|
incoming_file, encoding_file,
|
||||||
results, log_number):
|
results, log_number):
|
||||||
self._storage_index = storage_index
|
self._storage_index = storage_index
|
||||||
|
@ -153,9 +153,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||||
self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
|
self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
|
||||||
parent=log_number)
|
parent=log_number)
|
||||||
|
|
||||||
client = helper.parent
|
self._storage_broker = storage_broker
|
||||||
self._storage_broker = client.get_storage_broker()
|
self._secret_holder = secret_holder
|
||||||
self._secret_holder = client._secret_holder
|
|
||||||
self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
|
self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
|
||||||
self._log_number)
|
self._log_number)
|
||||||
self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
|
self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
|
||||||
|
@ -479,7 +478,7 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Helper(Referenceable, service.MultiService):
|
class Helper(Referenceable):
|
||||||
implements(interfaces.RIHelper, interfaces.IStatsProducer)
|
implements(interfaces.RIHelper, interfaces.IStatsProducer)
|
||||||
# this is the non-distributed version. When we need to have multiple
|
# this is the non-distributed version. When we need to have multiple
|
||||||
# helpers, this object will become the HelperCoordinator, and will query
|
# helpers, this object will become the HelperCoordinator, and will query
|
||||||
|
@ -495,8 +494,11 @@ class Helper(Referenceable, service.MultiService):
|
||||||
chk_upload_helper_class = CHKUploadHelper
|
chk_upload_helper_class = CHKUploadHelper
|
||||||
MAX_UPLOAD_STATUSES = 10
|
MAX_UPLOAD_STATUSES = 10
|
||||||
|
|
||||||
def __init__(self, basedir, stats_provider=None, history=None):
|
def __init__(self, basedir, storage_broker, secret_holder,
|
||||||
|
stats_provider, history):
|
||||||
self._basedir = basedir
|
self._basedir = basedir
|
||||||
|
self._storage_broker = storage_broker
|
||||||
|
self._secret_holder = secret_holder
|
||||||
self._chk_incoming = os.path.join(basedir, "CHK_incoming")
|
self._chk_incoming = os.path.join(basedir, "CHK_incoming")
|
||||||
self._chk_encoding = os.path.join(basedir, "CHK_encoding")
|
self._chk_encoding = os.path.join(basedir, "CHK_encoding")
|
||||||
fileutil.make_dirs(self._chk_incoming)
|
fileutil.make_dirs(self._chk_incoming)
|
||||||
|
@ -514,15 +516,11 @@ class Helper(Referenceable, service.MultiService):
|
||||||
"chk_upload_helper.encoded_bytes": 0,
|
"chk_upload_helper.encoded_bytes": 0,
|
||||||
}
|
}
|
||||||
self._history = history
|
self._history = history
|
||||||
service.MultiService.__init__(self)
|
|
||||||
|
|
||||||
def setServiceParent(self, parent):
|
|
||||||
service.MultiService.setServiceParent(self, parent)
|
|
||||||
|
|
||||||
def log(self, *args, **kwargs):
|
def log(self, *args, **kwargs):
|
||||||
if 'facility' not in kwargs:
|
if 'facility' not in kwargs:
|
||||||
kwargs['facility'] = "tahoe.helper"
|
kwargs['facility'] = "tahoe.helper"
|
||||||
return self.parent.log(*args, **kwargs)
|
return log.msg(*args, **kwargs)
|
||||||
|
|
||||||
def count(self, key, value=1):
|
def count(self, key, value=1):
|
||||||
if self.stats_provider:
|
if self.stats_provider:
|
||||||
|
@ -602,6 +600,8 @@ class Helper(Referenceable, service.MultiService):
|
||||||
else:
|
else:
|
||||||
self.log("creating new upload helper", parent=lp)
|
self.log("creating new upload helper", parent=lp)
|
||||||
uh = self.chk_upload_helper_class(storage_index, self,
|
uh = self.chk_upload_helper_class(storage_index, self,
|
||||||
|
self._storage_broker,
|
||||||
|
self._secret_holder,
|
||||||
incoming_file, encoding_file,
|
incoming_file, encoding_file,
|
||||||
r, lp)
|
r, lp)
|
||||||
self._active_uploads[storage_index] = uh
|
self._active_uploads[storage_index] = uh
|
||||||
|
@ -619,7 +619,7 @@ class Helper(Referenceable, service.MultiService):
|
||||||
# see if this file is already in the grid
|
# see if this file is already in the grid
|
||||||
lp2 = self.log("doing a quick check+UEBfetch",
|
lp2 = self.log("doing a quick check+UEBfetch",
|
||||||
parent=lp, level=log.NOISY)
|
parent=lp, level=log.NOISY)
|
||||||
sb = self.parent.get_storage_broker()
|
sb = self._storage_broker
|
||||||
c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
|
c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
|
||||||
d = c.check()
|
d = c.check()
|
||||||
def _checked(res):
|
def _checked(res):
|
||||||
|
|
|
@ -3,7 +3,6 @@ from twisted.trial import unittest
|
||||||
from twisted.application import service
|
from twisted.application import service
|
||||||
|
|
||||||
from foolscap.api import Tub, fireEventually, flushEventualQueue
|
from foolscap.api import Tub, fireEventually, flushEventualQueue
|
||||||
from foolscap.logging import log
|
|
||||||
|
|
||||||
from allmydata.storage.server import si_b2a
|
from allmydata.storage.server import si_b2a
|
||||||
from allmydata.storage_client import StorageFarmBroker
|
from allmydata.storage_client import StorageFarmBroker
|
||||||
|
@ -62,15 +61,9 @@ class FakeClient(service.MultiService):
|
||||||
"n": 100,
|
"n": 100,
|
||||||
"max_segment_size": 1*MiB,
|
"max_segment_size": 1*MiB,
|
||||||
}
|
}
|
||||||
stats_provider = None
|
|
||||||
storage_broker = StorageFarmBroker(None, True)
|
|
||||||
_secret_holder = client.SecretHolder("lease secret")
|
|
||||||
def log(self, *args, **kwargs):
|
|
||||||
return log.msg(*args, **kwargs)
|
|
||||||
def get_encoding_parameters(self):
|
def get_encoding_parameters(self):
|
||||||
return self.DEFAULT_ENCODING_PARAMETERS
|
return self.DEFAULT_ENCODING_PARAMETERS
|
||||||
def get_storage_broker(self):
|
|
||||||
return self.storage_broker
|
|
||||||
|
|
||||||
def flush_but_dont_ignore(res):
|
def flush_but_dont_ignore(res):
|
||||||
d = flushEventualQueue()
|
d = flushEventualQueue()
|
||||||
|
@ -96,6 +89,8 @@ class AssistedUpload(unittest.TestCase):
|
||||||
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
|
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.s = FakeClient()
|
self.s = FakeClient()
|
||||||
|
self.storage_broker = StorageFarmBroker(None, True)
|
||||||
|
self.secret_holder = client.SecretHolder("lease secret")
|
||||||
self.s.startService()
|
self.s.startService()
|
||||||
|
|
||||||
self.tub = t = Tub()
|
self.tub = t = Tub()
|
||||||
|
@ -108,9 +103,11 @@ class AssistedUpload(unittest.TestCase):
|
||||||
|
|
||||||
def setUpHelper(self, basedir):
|
def setUpHelper(self, basedir):
|
||||||
fileutil.make_dirs(basedir)
|
fileutil.make_dirs(basedir)
|
||||||
self.helper = h = offloaded.Helper(basedir)
|
self.helper = h = offloaded.Helper(basedir,
|
||||||
|
self.storage_broker,
|
||||||
|
self.secret_holder,
|
||||||
|
None, None)
|
||||||
h.chk_upload_helper_class = CHKUploadHelper_fake
|
h.chk_upload_helper_class = CHKUploadHelper_fake
|
||||||
h.setServiceParent(self.s)
|
|
||||||
self.helper_furl = self.tub.registerReference(h)
|
self.helper_furl = self.tub.registerReference(h)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
|
|
@ -86,6 +86,7 @@ class FakeClient(service.MultiService):
|
||||||
self.nodemaker = FakeNodeMaker(None, None, None,
|
self.nodemaker = FakeNodeMaker(None, None, None,
|
||||||
self.uploader, None, None,
|
self.uploader, None, None,
|
||||||
None, None)
|
None, None)
|
||||||
|
self.helper = None
|
||||||
|
|
||||||
nodeid = "fake_nodeid"
|
nodeid = "fake_nodeid"
|
||||||
nickname = "fake_nickname"
|
nickname = "fake_nickname"
|
||||||
|
|
|
@ -160,11 +160,7 @@ class Root(rend.Page):
|
||||||
def child_helper_status(self, ctx):
|
def child_helper_status(self, ctx):
|
||||||
# the Helper isn't attached until after the Tub starts, so this child
|
# the Helper isn't attached until after the Tub starts, so this child
|
||||||
# needs to created on each request
|
# needs to created on each request
|
||||||
try:
|
return status.HelperStatus(self.client.helper)
|
||||||
helper = self.client.getServiceNamed("helper")
|
|
||||||
except KeyError:
|
|
||||||
helper = None
|
|
||||||
return status.HelperStatus(helper)
|
|
||||||
|
|
||||||
child_webform_css = webform.defaultCSS
|
child_webform_css = webform.defaultCSS
|
||||||
child_tahoe_css = nevow_File(resource_filename('allmydata.web', 'tahoe.css'))
|
child_tahoe_css = nevow_File(resource_filename('allmydata.web', 'tahoe.css'))
|
||||||
|
@ -204,12 +200,11 @@ class Root(rend.Page):
|
||||||
except KeyError:
|
except KeyError:
|
||||||
ul[T.li["Not running storage server"]]
|
ul[T.li["Not running storage server"]]
|
||||||
|
|
||||||
try:
|
if self.client.helper:
|
||||||
h = self.client.getServiceNamed("helper")
|
stats = self.client.helper.get_stats()
|
||||||
stats = h.get_stats()
|
|
||||||
active_uploads = stats["chk_upload_helper.active_uploads"]
|
active_uploads = stats["chk_upload_helper.active_uploads"]
|
||||||
ul[T.li["Helper: %d active uploads" % (active_uploads,)]]
|
ul[T.li["Helper: %d active uploads" % (active_uploads,)]]
|
||||||
except KeyError:
|
else:
|
||||||
ul[T.li["Not running helper"]]
|
ul[T.li["Not running helper"]]
|
||||||
|
|
||||||
return ctx.tag[ul]
|
return ctx.tag[ul]
|
||||||
|
|
Loading…
Reference in New Issue