offloaded: more test coverage on client side, change interfaces a bit
This commit is contained in:
parent
db5f58f9d5
commit
6ac01fde4c
|
@ -1278,7 +1278,7 @@ class RIEncryptedUploadable(RemoteInterface):
|
||||||
return Hash
|
return Hash
|
||||||
|
|
||||||
|
|
||||||
class RIUploadHelper(RemoteInterface):
|
class RICHKUploadHelper(RemoteInterface):
|
||||||
__remote_name__ = "RIUploadHelper.tahoe.allmydata.com"
|
__remote_name__ = "RIUploadHelper.tahoe.allmydata.com"
|
||||||
|
|
||||||
def upload(reader=RIEncryptedUploadable):
|
def upload(reader=RIEncryptedUploadable):
|
||||||
|
@ -1288,7 +1288,7 @@ class RIUploadHelper(RemoteInterface):
|
||||||
class RIHelper(RemoteInterface):
|
class RIHelper(RemoteInterface):
|
||||||
__remote_name__ = "RIHelper.tahoe.allmydata.com"
|
__remote_name__ = "RIHelper.tahoe.allmydata.com"
|
||||||
|
|
||||||
def upload(si=StorageIndex):
|
def upload_chk(si=StorageIndex):
|
||||||
"""See if a file with a given storage index needs uploading. The
|
"""See if a file with a given storage index needs uploading. The
|
||||||
helper will ask the appropriate storage servers to see if the file
|
helper will ask the appropriate storage servers to see if the file
|
||||||
has already been uploaded. If so, the helper will return a set of
|
has already been uploaded. If so, the helper will return a set of
|
||||||
|
@ -1297,10 +1297,10 @@ class RIHelper(RemoteInterface):
|
||||||
|
|
||||||
If the file has not yet been uploaded (or if it was only partially
|
If the file has not yet been uploaded (or if it was only partially
|
||||||
uploaded), the helper will return an empty upload-results dictionary
|
uploaded), the helper will return an empty upload-results dictionary
|
||||||
and also an RIUploadHelper object that will take care of the upload
|
and also an RICHKUploadHelper object that will take care of the
|
||||||
process. The client should call upload() on this object and pass it a
|
upload process. The client should call upload() on this object and
|
||||||
reference to an RIEncryptedUploadable object that will provide
|
pass it a reference to an RIEncryptedUploadable object that will
|
||||||
ciphertext. When the upload is finished, the upload() method will
|
provide ciphertext. When the upload is finished, the upload() method
|
||||||
finish and return the upload results.
|
will finish and return the upload results.
|
||||||
"""
|
"""
|
||||||
return (UploadResults, ChoiceOf(RIUploadHelper, None))
|
return (UploadResults, ChoiceOf(RICHKUploadHelper, None))
|
||||||
|
|
|
@ -13,7 +13,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||||
peer selection, encoding, and share pushing. I read ciphertext from the
|
peer selection, encoding, and share pushing. I read ciphertext from the
|
||||||
remote AssistedUploader.
|
remote AssistedUploader.
|
||||||
"""
|
"""
|
||||||
implements(interfaces.RIUploadHelper)
|
implements(interfaces.RICHKUploadHelper)
|
||||||
|
|
||||||
def __init__(self, storage_index, helper):
|
def __init__(self, storage_index, helper):
|
||||||
self._finished = False
|
self._finished = False
|
||||||
|
@ -30,7 +30,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||||
def start(self):
|
def start(self):
|
||||||
# determine if we need to upload the file. If so, return ({},self) .
|
# determine if we need to upload the file. If so, return ({},self) .
|
||||||
# If not, return (UploadResults,None) .
|
# If not, return (UploadResults,None) .
|
||||||
return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
|
#return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
|
||||||
|
return ({}, self)
|
||||||
|
|
||||||
def remote_upload(self, reader):
|
def remote_upload(self, reader):
|
||||||
# reader is an RIEncryptedUploadable. I am specified to return an
|
# reader is an RIEncryptedUploadable. I am specified to return an
|
||||||
|
@ -39,6 +40,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||||
eu = CiphertextReader(reader, self._storage_index)
|
eu = CiphertextReader(reader, self._storage_index)
|
||||||
d = self.start_encrypted(eu)
|
d = self.start_encrypted(eu)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
|
self.finished(self._storage_index)
|
||||||
(uri_extension_hash, needed_shares, total_shares, size) = res
|
(uri_extension_hash, needed_shares, total_shares, size) = res
|
||||||
return {'uri_extension_hash': uri_extension_hash}
|
return {'uri_extension_hash': uri_extension_hash}
|
||||||
d.addCallback(_done)
|
d.addCallback(_done)
|
||||||
|
@ -78,8 +80,10 @@ class CiphertextReader:
|
||||||
class Helper(Referenceable, service.MultiService):
|
class Helper(Referenceable, service.MultiService):
|
||||||
implements(interfaces.RIHelper)
|
implements(interfaces.RIHelper)
|
||||||
# this is the non-distributed version. When we need to have multiple
|
# this is the non-distributed version. When we need to have multiple
|
||||||
# helpers, this object will query the farm to see if anyone has the
|
# helpers, this object will become the HelperCoordinator, and will query
|
||||||
# storage_index of interest, and send the request off to them.
|
# the farm of Helpers to see if anyone has the storage_index of interest,
|
||||||
|
# and send the request off to them. If nobody has it, we'll choose a
|
||||||
|
# helper at random.
|
||||||
|
|
||||||
chk_upload_helper_class = CHKUploadHelper
|
chk_upload_helper_class = CHKUploadHelper
|
||||||
|
|
||||||
|
@ -93,7 +97,7 @@ class Helper(Referenceable, service.MultiService):
|
||||||
kwargs['facility'] = "helper"
|
kwargs['facility'] = "helper"
|
||||||
return self.parent.log(msg, **kwargs)
|
return self.parent.log(msg, **kwargs)
|
||||||
|
|
||||||
def remote_upload(self, storage_index):
|
def remote_upload_chk(self, storage_index):
|
||||||
# TODO: look on disk
|
# TODO: look on disk
|
||||||
if storage_index in self._active_uploads:
|
if storage_index in self._active_uploads:
|
||||||
uh = self._active_uploads[storage_index]
|
uh = self._active_uploads[storage_index]
|
||||||
|
|
|
@ -8,12 +8,20 @@ from foolscap.logging import log
|
||||||
from allmydata import upload, offloaded
|
from allmydata import upload, offloaded
|
||||||
from allmydata.util import hashutil
|
from allmydata.util import hashutil
|
||||||
|
|
||||||
class FakeCHKUploadHelper(offloaded.CHKUploadHelper):
|
class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
|
||||||
def remote_upload(self, reader):
|
def start_encrypted(self, eu):
|
||||||
return {'uri_extension_hash': hashutil.uri_extension_hash("")}
|
needed_shares, happy, total_shares = self._encoding_parameters
|
||||||
|
d = eu.get_size()
|
||||||
|
def _got_size(size):
|
||||||
|
return (hashutil.uri_extension_hash(""),
|
||||||
|
needed_shares, total_shares, size)
|
||||||
|
d.addCallback(_got_size)
|
||||||
|
return d
|
||||||
|
|
||||||
class FakeHelper(offloaded.Helper):
|
class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
|
||||||
chk_upload_helper_class = FakeCHKUploadHelper
|
def start(self):
|
||||||
|
res = {'uri_extension_hash': hashutil.uri_extension_hash("")}
|
||||||
|
return (res, None)
|
||||||
|
|
||||||
class FakeClient(service.MultiService):
|
class FakeClient(service.MultiService):
|
||||||
def log(self, msg, **kwargs):
|
def log(self, msg, **kwargs):
|
||||||
|
@ -42,8 +50,8 @@ class AssistedUpload(unittest.TestCase):
|
||||||
# bogus host/port
|
# bogus host/port
|
||||||
t.setLocation("bogus:1234")
|
t.setLocation("bogus:1234")
|
||||||
|
|
||||||
|
self.helper = h = offloaded.Helper(".")
|
||||||
h = FakeHelper(".")
|
h.chk_upload_helper_class = CHKUploadHelper_fake
|
||||||
h.setServiceParent(self.s)
|
h.setServiceParent(self.s)
|
||||||
self.helper_furl = t.registerReference(h)
|
self.helper_furl = t.registerReference(h)
|
||||||
|
|
||||||
|
@ -75,3 +83,26 @@ class AssistedUpload(unittest.TestCase):
|
||||||
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def test_already_uploaded(self):
|
||||||
|
self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
|
||||||
|
u = upload.Uploader(self.helper_furl)
|
||||||
|
u.setServiceParent(self.s)
|
||||||
|
|
||||||
|
# wait a few turns
|
||||||
|
d = eventual.fireEventually()
|
||||||
|
d.addCallback(eventual.fireEventually)
|
||||||
|
d.addCallback(eventual.fireEventually)
|
||||||
|
|
||||||
|
def _ready(res):
|
||||||
|
assert u._helper
|
||||||
|
|
||||||
|
DATA = "I need help\n" * 1000
|
||||||
|
return u.upload_data(DATA)
|
||||||
|
d.addCallback(_ready)
|
||||||
|
def _uploaded(uri):
|
||||||
|
assert "CHK" in uri
|
||||||
|
d.addCallback(_uploaded)
|
||||||
|
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
|
@ -611,16 +611,19 @@ class AssistedUploader:
|
||||||
self._storage_index = storage_index
|
self._storage_index = storage_index
|
||||||
|
|
||||||
def _contact_helper(self, res):
|
def _contact_helper(self, res):
|
||||||
d = self._helper.callRemote("upload", self._storage_index)
|
self.log("contacting helper..")
|
||||||
|
d = self._helper.callRemote("upload_chk", self._storage_index)
|
||||||
d.addCallback(self._contacted_helper)
|
d.addCallback(self._contacted_helper)
|
||||||
return d
|
return d
|
||||||
def _contacted_helper(self, (upload_results, upload_helper)):
|
def _contacted_helper(self, (upload_results, upload_helper)):
|
||||||
if upload_helper:
|
if upload_helper:
|
||||||
|
self.log("helper says we need to upload")
|
||||||
# we need to upload the file
|
# we need to upload the file
|
||||||
reu = RemoteEncryptedUploabable(self._encuploadable)
|
reu = RemoteEncryptedUploabable(self._encuploadable)
|
||||||
d = upload_helper.callRemote("upload", reu)
|
d = upload_helper.callRemote("upload", reu)
|
||||||
# this Deferred will fire with the upload results
|
# this Deferred will fire with the upload results
|
||||||
return d
|
return d
|
||||||
|
self.log("helper says file is already uploaded")
|
||||||
return upload_results
|
return upload_results
|
||||||
|
|
||||||
def _build_readcap(self, upload_results):
|
def _build_readcap(self, upload_results):
|
||||||
|
|
Loading…
Reference in New Issue