immutable.Downloader: pass StorageBroker to constructor, stop being a Service
child of the client, access with client.downloader instead of client.getServiceNamed("downloader"). The single "Downloader" instance is scheduled for demolition anyways, to be replaced by individual filenode.download calls.
This commit is contained in:
parent
22c962bbc2
commit
4a4a4f9520
|
@ -264,7 +264,7 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||||
"private", "cache", "download")
|
"private", "cache", "download")
|
||||||
self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
|
self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
|
||||||
self.download_cache_dirman.setServiceParent(self)
|
self.download_cache_dirman.setServiceParent(self)
|
||||||
self.add_service(Downloader(self.stats_provider))
|
self.downloader = Downloader(self.storage_broker, self.stats_provider)
|
||||||
self.init_stub_client()
|
self.init_stub_client()
|
||||||
self.init_nodemaker()
|
self.init_nodemaker()
|
||||||
|
|
||||||
|
@ -323,7 +323,7 @@ class Client(node.Node, pollmixin.PollMixin):
|
||||||
self._secret_holder,
|
self._secret_holder,
|
||||||
self.get_history(),
|
self.get_history(),
|
||||||
self.getServiceNamed("uploader"),
|
self.getServiceNamed("uploader"),
|
||||||
self.getServiceNamed("downloader"),
|
self.downloader,
|
||||||
self.download_cache_dirman,
|
self.download_cache_dirman,
|
||||||
self.get_encoding_parameters(),
|
self.get_encoding_parameters(),
|
||||||
self._key_generator)
|
self._key_generator)
|
||||||
|
|
|
@ -50,8 +50,7 @@ class ControlServer(Referenceable, service.Service):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def remote_download_from_uri_to_file(self, uri, filename):
|
def remote_download_from_uri_to_file(self, uri, filename):
|
||||||
downloader = self.parent.getServiceNamed("downloader")
|
d = self.parent.downloader.download_to_filename(uri, filename)
|
||||||
d = downloader.download_to_filename(uri, filename)
|
|
||||||
d.addCallback(lambda res: filename)
|
d.addCallback(lambda res: filename)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ import os, random, weakref, itertools, time
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.interfaces import IPushProducer, IConsumer
|
from twisted.internet.interfaces import IPushProducer, IConsumer
|
||||||
from twisted.application import service
|
|
||||||
from foolscap.api import DeadReferenceError, RemoteException, eventually
|
from foolscap.api import DeadReferenceError, RemoteException, eventually
|
||||||
|
|
||||||
from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
|
from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
|
||||||
|
@ -1185,22 +1184,19 @@ class ConsumerAdapter:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class Downloader(service.MultiService):
|
class Downloader:
|
||||||
"""I am a service that allows file downloading.
|
"""I am a service that allows file downloading.
|
||||||
"""
|
"""
|
||||||
# TODO: in fact, this service only downloads immutable files (URI:CHK:).
|
# TODO: in fact, this service only downloads immutable files (URI:CHK:).
|
||||||
# It is scheduled to go away, to be replaced by filenode.download()
|
# It is scheduled to go away, to be replaced by filenode.download()
|
||||||
implements(IDownloader)
|
implements(IDownloader)
|
||||||
name = "downloader"
|
|
||||||
|
|
||||||
def __init__(self, stats_provider=None):
|
def __init__(self, storage_broker, stats_provider):
|
||||||
service.MultiService.__init__(self)
|
self.storage_broker = storage_broker
|
||||||
self.stats_provider = stats_provider
|
self.stats_provider = stats_provider
|
||||||
self._all_downloads = weakref.WeakKeyDictionary() # for debugging
|
self._all_downloads = weakref.WeakKeyDictionary() # for debugging
|
||||||
|
|
||||||
def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
|
def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
|
||||||
assert self.parent
|
|
||||||
assert self.running
|
|
||||||
u = IFileURI(u)
|
u = IFileURI(u)
|
||||||
t = IDownloadTarget(t)
|
t = IDownloadTarget(t)
|
||||||
assert t.write
|
assert t.write
|
||||||
|
@ -1212,12 +1208,12 @@ class Downloader(service.MultiService):
|
||||||
# include LIT files
|
# include LIT files
|
||||||
self.stats_provider.count('downloader.files_downloaded', 1)
|
self.stats_provider.count('downloader.files_downloaded', 1)
|
||||||
self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
|
self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
|
||||||
storage_broker = self.parent.get_storage_broker()
|
|
||||||
|
|
||||||
target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
|
target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
|
||||||
if not monitor:
|
if not monitor:
|
||||||
monitor=Monitor()
|
monitor=Monitor()
|
||||||
dl = CiphertextDownloader(storage_broker, u.get_verify_cap(), target,
|
dl = CiphertextDownloader(self.storage_broker,
|
||||||
|
u.get_verify_cap(), target,
|
||||||
monitor=monitor)
|
monitor=monitor)
|
||||||
self._all_downloads[dl] = None
|
self._all_downloads[dl] = None
|
||||||
if history:
|
if history:
|
||||||
|
|
|
@ -1027,8 +1027,7 @@ class ShareManglingMixin(SystemTestMixin):
|
||||||
return sum_of_write_counts
|
return sum_of_write_counts
|
||||||
|
|
||||||
def _download_and_check_plaintext(self, unused=None):
|
def _download_and_check_plaintext(self, unused=None):
|
||||||
self.downloader = self.clients[1].getServiceNamed("downloader")
|
d = self.clients[1].downloader.download_to_data(self.uri)
|
||||||
d = self.downloader.download_to_data(self.uri)
|
|
||||||
|
|
||||||
def _after_download(result):
|
def _after_download(result):
|
||||||
self.failUnlessEqual(result, TEST_DATA)
|
self.failUnlessEqual(result, TEST_DATA)
|
||||||
|
|
|
@ -918,7 +918,7 @@ class Deleter(GridTestMixin, unittest.TestCase):
|
||||||
def _do_delete(ignored):
|
def _do_delete(ignored):
|
||||||
nm = UCWEingNodeMaker(c0.storage_broker, c0._secret_holder,
|
nm = UCWEingNodeMaker(c0.storage_broker, c0._secret_holder,
|
||||||
c0.get_history(), c0.getServiceNamed("uploader"),
|
c0.get_history(), c0.getServiceNamed("uploader"),
|
||||||
c0.getServiceNamed("downloader"),
|
c0.downloader,
|
||||||
c0.download_cache_dirman,
|
c0.download_cache_dirman,
|
||||||
c0.get_encoding_parameters(),
|
c0.get_encoding_parameters(),
|
||||||
c0._key_generator)
|
c0._key_generator)
|
||||||
|
|
|
@ -26,8 +26,7 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
|
||||||
d.addCallback(_then_delete_8)
|
d.addCallback(_then_delete_8)
|
||||||
|
|
||||||
def _then_download(unused=None):
|
def _then_download(unused=None):
|
||||||
self.downloader = self.clients[1].getServiceNamed("downloader")
|
d2 = self.clients[1].downloader.download_to_data(self.uri)
|
||||||
d2 = self.downloader.download_to_data(self.uri)
|
|
||||||
|
|
||||||
def _after_download_callb(result):
|
def _after_download_callb(result):
|
||||||
self.fail() # should have gotten an errback instead
|
self.fail() # should have gotten an errback instead
|
||||||
|
@ -94,8 +93,7 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
|
||||||
|
|
||||||
before_download_reads = self._count_reads()
|
before_download_reads = self._count_reads()
|
||||||
def _attempt_to_download(unused=None):
|
def _attempt_to_download(unused=None):
|
||||||
downloader = self.clients[1].getServiceNamed("downloader")
|
d2 = self.clients[1].downloader.download_to_data(self.uri)
|
||||||
d2 = downloader.download_to_data(self.uri)
|
|
||||||
|
|
||||||
def _callb(res):
|
def _callb(res):
|
||||||
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
|
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
|
||||||
|
@ -128,8 +126,7 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
|
||||||
|
|
||||||
before_download_reads = self._count_reads()
|
before_download_reads = self._count_reads()
|
||||||
def _attempt_to_download(unused=None):
|
def _attempt_to_download(unused=None):
|
||||||
downloader = self.clients[1].getServiceNamed("downloader")
|
d2 = self.clients[1].downloader.download_to_data(self.uri)
|
||||||
d2 = downloader.download_to_data(self.uri)
|
|
||||||
|
|
||||||
def _callb(res):
|
def _callb(res):
|
||||||
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
|
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
|
||||||
|
|
|
@ -135,8 +135,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
|
||||||
log.msg("upload finished: uri is %s" % (theuri,))
|
log.msg("upload finished: uri is %s" % (theuri,))
|
||||||
self.uri = theuri
|
self.uri = theuri
|
||||||
assert isinstance(self.uri, str), self.uri
|
assert isinstance(self.uri, str), self.uri
|
||||||
dl = self.clients[1].getServiceNamed("downloader")
|
self.downloader = self.clients[1].downloader
|
||||||
self.downloader = dl
|
|
||||||
d.addCallback(_upload_done)
|
d.addCallback(_upload_done)
|
||||||
|
|
||||||
def _upload_again(res):
|
def _upload_again(res):
|
||||||
|
|
Loading…
Reference in New Issue