upload: use WriteBucketProxy_v2 when uploading a large file (with shares larger than 4GiB). This finally closes #346. I think we can now handle immutable files up to 48EiB.
This commit is contained in:
parent
bf56e2bb51
commit
cc50e2f4aa
|
@ -66,17 +66,25 @@ limitations described in #346.
|
|||
: rest of share is the same as v1, above
|
||||
... ...
|
||||
? : start of uri_extension_length (eight-byte big-endian value)
|
||||
? : start of uri_extension
|
||||
"""
|
||||
|
||||
# Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but they are still
|
||||
# provided when writing so that older versions of Tahoe can read them.
|
||||
# Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but
|
||||
# they are still provided when writing so that older versions of Tahoe can
|
||||
# read them.
|
||||
|
||||
def allocated_size(data_size, num_segments, num_share_hashes,
|
||||
uri_extension_size_max):
|
||||
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
|
||||
uri_extension_size_max, None)
|
||||
uri_extension_starts_at = wbp._offsets['uri_extension']
|
||||
return uri_extension_starts_at + wbp.fieldsize + uri_extension_size_max
|
||||
def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, nodeid):
|
||||
# Use layout v1 for small files, so they'll be readable by older versions
|
||||
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
|
||||
# by tahoe-1.3.0 or later.
|
||||
try:
|
||||
wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, nodeid)
|
||||
except FileTooLargeError:
|
||||
wbp = WriteBucketProxy_v2(rref, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, nodeid)
|
||||
return wbp
|
||||
|
||||
class WriteBucketProxy:
|
||||
implements(IStorageBucketWriter)
|
||||
|
@ -101,6 +109,10 @@ class WriteBucketProxy:
|
|||
|
||||
self._create_offsets(block_size, data_size)
|
||||
|
||||
def get_allocated_size(self):
|
||||
return (self._offsets['uri_extension'] + self.fieldsize +
|
||||
self._uri_extension_size_max)
|
||||
|
||||
def _create_offsets(self, block_size, data_size):
|
||||
if block_size >= 2**32 or data_size >= 2**32:
|
||||
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
|
||||
|
|
|
@ -77,11 +77,13 @@ class PeerTracker:
|
|||
self._storageserver = storage_server # to an RIStorageServer
|
||||
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
||||
self.sharesize = sharesize
|
||||
self.allocated_size = layout.allocated_size(sharesize,
|
||||
num_segments,
|
||||
num_share_hashes,
|
||||
EXTENSION_SIZE)
|
||||
|
||||
wbp = layout.make_write_bucket_proxy(None, sharesize,
|
||||
blocksize, num_segments,
|
||||
num_share_hashes,
|
||||
EXTENSION_SIZE, peerid)
|
||||
self.wbp_class = wbp.__class__ # to create more of them
|
||||
self.allocated_size = wbp.get_allocated_size()
|
||||
self.blocksize = blocksize
|
||||
self.num_segments = num_segments
|
||||
self.num_share_hashes = num_share_hashes
|
||||
|
@ -110,12 +112,12 @@ class PeerTracker:
|
|||
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
|
||||
b = {}
|
||||
for sharenum, rref in buckets.iteritems():
|
||||
bp = layout.WriteBucketProxy(rref, self.sharesize,
|
||||
self.blocksize,
|
||||
self.num_segments,
|
||||
self.num_share_hashes,
|
||||
EXTENSION_SIZE,
|
||||
self.peerid)
|
||||
bp = self.wbp_class(rref, self.sharesize,
|
||||
self.blocksize,
|
||||
self.num_segments,
|
||||
self.num_share_hashes,
|
||||
EXTENSION_SIZE,
|
||||
self.peerid)
|
||||
b[sharenum] = bp
|
||||
self.buckets.update(b)
|
||||
return (alreadygot, set(b.keys()))
|
||||
|
@ -171,10 +173,11 @@ class Tahoe2PeerSelector:
|
|||
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
|
||||
|
||||
# figure out how much space to ask for
|
||||
allocated_size = layout.allocated_size(share_size,
|
||||
num_segments,
|
||||
num_share_hashes,
|
||||
EXTENSION_SIZE)
|
||||
wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
|
||||
num_share_hashes, EXTENSION_SIZE,
|
||||
None)
|
||||
allocated_size = wbp.get_allocated_size()
|
||||
|
||||
# filter the list of peers according to which ones can accomodate
|
||||
# this request. This excludes older peers (which used a 4-byte size
|
||||
# field) from getting large shares (for files larger than about
|
||||
|
|
|
@ -241,17 +241,17 @@ class GoodServer(unittest.TestCase, ShouldFailMixin):
|
|||
return DATA[:size]
|
||||
|
||||
def test_too_large(self):
|
||||
# we currently impose a sizelimit on uploaded files, because of
|
||||
# limitations in the share format (see ticket #346 for details). The
|
||||
# limit is set to ensure that no share is larger than 4GiB. Make sure
|
||||
# that we reject files larger than that.
|
||||
# we've removed the 4GiB share size limit (see ticket #346 for
|
||||
# details), but still have an 8-byte field, so the limit is now
|
||||
# 2**64, so make sure we reject files larger than that.
|
||||
k = 3; happy = 7; n = 10
|
||||
self.set_encoding_parameters(k, happy, n)
|
||||
data1 = GiganticUploadable(k*4*1024*1024*1024)
|
||||
big = k*(2**64)
|
||||
data1 = GiganticUploadable(big)
|
||||
d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
|
||||
"This file is too large to be uploaded (data_size)",
|
||||
self.u.upload, data1)
|
||||
data2 = GiganticUploadable(k*4*1024*1024*1024-3)
|
||||
data2 = GiganticUploadable(big-3)
|
||||
d.addCallback(lambda res:
|
||||
self.shouldFail(FileTooLargeError,
|
||||
"test_too_large-data2",
|
||||
|
|
Loading…
Reference in New Issue