storage: split WriteBucketProxy and ReadBucketProxy out into immutable/layout.py . No behavioral changes.

This commit is contained in:
Brian Warner 2008-10-09 17:08:00 -07:00
parent 86e22b8add
commit 288d55825c
7 changed files with 327 additions and 315 deletions

View File

@ -12,6 +12,7 @@ from allmydata.util.assertutil import _assert
from allmydata import codec, hashtree, storage, uri from allmydata import codec, hashtree, storage, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \ from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
IDownloadStatus, IDownloadResults IDownloadStatus, IDownloadResults
from allmydata.immutable import layout
from allmydata.immutable.encode import NotEnoughSharesError from allmydata.immutable.encode import NotEnoughSharesError
from pycryptopp.cipher.aes import AES from pycryptopp.cipher.aes import AES
@ -580,7 +581,7 @@ class FileDownloader:
(self._responses_received, (self._responses_received,
self._queries_sent)) self._queries_sent))
for sharenum, bucket in buckets.iteritems(): for sharenum, bucket in buckets.iteritems():
b = storage.ReadBucketProxy(bucket, peerid, self._si_s) b = layout.ReadBucketProxy(bucket, peerid, self._si_s)
self.add_share_bucket(sharenum, b) self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b) self._uri_extension_sources.append(b)
if self._results: if self._results:

View File

@ -0,0 +1,301 @@
import struct
from zope.interface import implements
from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, idlib
from allmydata.util.assertutil import _assert, precondition
"""
Share data is written into a single file. At the start of the file, there is
a series of four-byte big-endian offset values, which indicate where each
section starts. Each offset is measured from the beginning of the file.
0x00: version number (=00 00 00 01)
0x04: segment size
0x08: data size
0x0c: offset of data (=00 00 00 24)
0x10: offset of plaintext_hash_tree
0x14: offset of crypttext_hash_tree
0x18: offset of block_hashes
0x1c: offset of share_hashes
0x20: offset of uri_extension_length + uri_extension
0x24: start of data
? : start of plaintext_hash_tree
? : start of crypttext_hash_tree
? : start of block_hashes
? : start of share_hashes
each share_hash is written as a two-byte (big-endian) hashnum
followed by the 32-byte SHA-256 hash. We only store the hashes
necessary to validate the share hash root
? : start of uri_extension_length (four-byte big-endian value)
? : start of uri_extension
"""
def allocated_size(data_size, num_segments, num_share_hashes,
uri_extension_size):
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
uri_extension_size, None)
uri_extension_starts_at = wbp._offsets['uri_extension']
return uri_extension_starts_at + 4 + uri_extension_size
class WriteBucketProxy:
implements(IStorageBucketWriter)
def __init__(self, rref, data_size, segment_size, num_segments,
num_share_hashes, uri_extension_size, nodeid):
self._rref = rref
self._data_size = data_size
self._segment_size = segment_size
self._num_segments = num_segments
self._nodeid = nodeid
if segment_size >= 2**32 or data_size >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
# we commit to not sending a uri extension larger than this
self._uri_extension_size = uri_extension_size
offsets = self._offsets = {}
x = 0x24
offsets['data'] = x
x += data_size
offsets['plaintext_hash_tree'] = x
x += self._segment_hash_size
offsets['crypttext_hash_tree'] = x
x += self._segment_hash_size
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hash_size
offsets['uri_extension'] = x
if x >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (offsets).")
offset_data = struct.pack(">LLLLLLLLL",
1, # version number
segment_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'],
offsets['crypttext_hash_tree'],
offsets['block_hashes'],
offsets['share_hashes'],
offsets['uri_extension'],
)
assert len(offset_data) == 0x24
self._offset_data = offset_data
def __repr__(self):
if self._nodeid:
nodeid_s = idlib.nodeid_b2a(self._nodeid)
else:
nodeid_s = "[None]"
return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
def start(self):
return self._write(0, self._offset_data)
def put_block(self, segmentnum, data):
offset = self._offsets['data'] + segmentnum * self._segment_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
if segmentnum < self._num_segments-1:
precondition(len(data) == self._segment_size,
len(data), self._segment_size)
else:
precondition(len(data) == (self._data_size -
(self._segment_size *
(self._num_segments - 1))),
len(data), self._segment_size)
return self._write(offset, data)
def put_plaintext_hashes(self, hashes):
offset = self._offsets['plaintext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
offset, len(data), offset+len(data),
self._offsets['crypttext_hash_tree'])
return self._write(offset, data)
def put_crypttext_hashes(self, hashes):
offset = self._offsets['crypttext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['block_hashes'],
offset, len(data), offset+len(data),
self._offsets['block_hashes'])
return self._write(offset, data)
def put_block_hashes(self, blockhashes):
offset = self._offsets['block_hashes']
assert isinstance(blockhashes, list)
data = "".join(blockhashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['share_hashes'],
offset, len(data), offset+len(data),
self._offsets['share_hashes'])
return self._write(offset, data)
def put_share_hashes(self, sharehashes):
# sharehashes is a list of (index, hash) tuples, so they get stored
# as 2+32=34 bytes each
offset = self._offsets['share_hashes']
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
precondition(len(data) == self._share_hash_size,
len(data), self._share_hash_size)
precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data),
self._offsets['uri_extension'])
return self._write(offset, data)
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, str)
precondition(len(data) <= self._uri_extension_size,
len(data), self._uri_extension_size)
length = struct.pack(">L", len(data))
return self._write(offset, length+data)
def _write(self, offset, data):
# TODO: for small shares, buffer the writes and do just a single call
return self._rref.callRemote("write", offset, data)
def close(self):
return self._rref.callRemote("close")
def abort(self):
return self._rref.callRemoteOnly("abort")
class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref, peerid=None, storage_index_s=None):
self._rref = rref
self._peerid = peerid
self._si_s = storage_index_s
self._started = False
def get_peerid(self):
return self._peerid
def __repr__(self):
peerid_s = idlib.shortnodeid_b2a(self._peerid)
return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
self._si_s)
def startIfNecessary(self):
if self._started:
return defer.succeed(self)
d = self.start()
d.addCallback(lambda res: self)
return d
def start(self):
# TODO: for small shares, read the whole bucket in start()
d = self._read(0, 0x24)
d.addCallback(self._parse_offsets)
def _started(res):
self._started = True
return res
d.addCallback(_started)
return d
def _parse_offsets(self, data):
precondition(len(data) == 0x24)
self._offsets = {}
(version, self._segment_size, self._data_size) = \
struct.unpack(">LLL", data[0:0xc])
_assert(version == 1)
x = 0x0c
for field in ( 'data',
'plaintext_hash_tree',
'crypttext_hash_tree',
'block_hashes',
'share_hashes',
'uri_extension',
):
offset = struct.unpack(">L", data[x:x+4])[0]
x += 4
self._offsets[field] = offset
return self._offsets
def get_block(self, blocknum):
num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
if blocknum < num_segments-1:
size = self._segment_size
else:
size = self._data_size % self._segment_size
if size == 0:
size = self._segment_size
offset = self._offsets['data'] + blocknum * self._segment_size
return self._read(offset, size)
def _str2l(self, s):
""" split string (pulled from storage) into a list of blockids """
return [ s[i:i+HASH_SIZE]
for i in range(0, len(s), HASH_SIZE) ]
def get_plaintext_hashes(self):
offset = self._offsets['plaintext_hash_tree']
size = self._offsets['crypttext_hash_tree'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_crypttext_hashes(self):
offset = self._offsets['crypttext_hash_tree']
size = self._offsets['block_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_block_hashes(self):
offset = self._offsets['block_hashes']
size = self._offsets['share_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_share_hashes(self):
offset = self._offsets['share_hashes']
size = self._offsets['uri_extension'] - offset
assert size % (2+HASH_SIZE) == 0
d = self._read(offset, size)
def _unpack_share_hashes(data):
assert len(data) == size
hashes = []
for i in range(0, size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
hashes.append( (hashnum, hashvalue) )
return hashes
d.addCallback(_unpack_share_hashes)
return d
def get_uri_extension(self):
offset = self._offsets['uri_extension']
d = self._read(offset, 4)
def _got_length(data):
length = struct.unpack(">L", data)[0]
return self._read(offset+4, length)
d.addCallback(_got_length)
return d
def _read(self, offset, length):
return self._rref.callRemote("read", offset, length)

View File

@ -18,6 +18,7 @@ from allmydata.util import base32, idlib, mathutil
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES from pycryptopp.cipher.aes import AES
from cStringIO import StringIO from cStringIO import StringIO
@ -76,7 +77,7 @@ class PeerTracker:
self._storageserver = storage_server # to an RIStorageServer self._storageserver = storage_server # to an RIStorageServer
self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize self.sharesize = sharesize
self.allocated_size = storage.allocated_size(sharesize, self.allocated_size = layout.allocated_size(sharesize,
num_segments, num_segments,
num_share_hashes, num_share_hashes,
EXTENSION_SIZE) EXTENSION_SIZE)
@ -109,7 +110,7 @@ class PeerTracker:
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {} b = {}
for sharenum, rref in buckets.iteritems(): for sharenum, rref in buckets.iteritems():
bp = storage.WriteBucketProxy(rref, self.sharesize, bp = layout.WriteBucketProxy(rref, self.sharesize,
self.blocksize, self.blocksize,
self.num_segments, self.num_segments,
self.num_share_hashes, self.num_share_hashes,

View File

@ -7,6 +7,7 @@ from foolscap import Referenceable, DeadReferenceError
from foolscap.eventual import eventually from foolscap.eventual import eventually
from allmydata import interfaces, storage, uri from allmydata import interfaces, storage, uri
from allmydata.immutable import upload from allmydata.immutable import upload
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import idlib, log, observer, fileutil, hashutil from allmydata.util import idlib, log, observer, fileutil, hashutil
@ -85,8 +86,7 @@ class CHKCheckerAndUEBFetcher:
self.log("no readers, so no UEB", level=log.NOISY) self.log("no readers, so no UEB", level=log.NOISY)
return return
b,peerid = self._readers.pop() b,peerid = self._readers.pop()
rbp = storage.ReadBucketProxy(b, peerid, rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
storage.si_b2a(self._storage_index))
d = rbp.startIfNecessary() d = rbp.startIfNecessary()
d.addCallback(lambda res: rbp.get_uri_extension()) d.addCallback(lambda res: rbp.get_uri_extension())
d.addCallback(self._got_uri_extension) d.addCallback(self._got_uri_extension)

View File

@ -47,11 +47,12 @@ def dump_share(options):
def dump_immutable_share(options): def dump_immutable_share(options):
from allmydata import uri, storage from allmydata import uri, storage
from allmydata.util import base32 from allmydata.util import base32
from allmydata.immutable.layout import ReadBucketProxy
out = options.stdout out = options.stdout
f = storage.ShareFile(options['filename']) f = storage.ShareFile(options['filename'])
# use a ReadBucketProxy to parse the bucket and find the uri extension # use a ReadBucketProxy to parse the bucket and find the uri extension
bp = storage.ReadBucketProxy(None) bp = ReadBucketProxy(None)
offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
seek = offsets['uri_extension'] seek = offsets['uri_extension']
length = struct.unpack(">L", f.read_share_data(seek, 4))[0] length = struct.unpack(">L", f.read_share_data(seek, 4))[0]
@ -516,6 +517,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri, storage from allmydata import uri, storage
from allmydata.mutable.layout import unpack_share from allmydata.mutable.layout import unpack_share
from allmydata.mutable.common import NeedMoreDataError from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32 from allmydata.util import base32
import struct import struct
@ -569,7 +571,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
sf = storage.ShareFile(abs_sharefile) sf = storage.ShareFile(abs_sharefile)
# use a ReadBucketProxy to parse the bucket and find the uri extension # use a ReadBucketProxy to parse the bucket and find the uri extension
bp = storage.ReadBucketProxy(None) bp = ReadBucketProxy(None)
offsets = bp._parse_offsets(sf.read_share_data(0, 0x24)) offsets = bp._parse_offsets(sf.read_share_data(0, 0x24))
seek = offsets['uri_extension'] seek = offsets['uri_extension']
length = struct.unpack(">L", sf.read_share_data(seek, 4))[0] length = struct.unpack(">L", sf.read_share_data(seek, 4))[0]
@ -689,7 +691,7 @@ def corrupt_share(options):
else: else:
# otherwise assume it's immutable # otherwise assume it's immutable
f = storage.ShareFile(fn) f = storage.ShareFile(fn)
bp = storage.ReadBucketProxy(None) bp = ReadBucketProxy(None)
offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"] start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"] end = f._data_offset + offsets["plaintext_hash_tree"]

View File

@ -3,14 +3,12 @@ from distutils.version import LooseVersion
from foolscap import Referenceable from foolscap import Referenceable
from twisted.application import service from twisted.application import service
from twisted.internet import defer
from zope.interface import implements from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \ RIBucketReader, BadWriteEnablerError, IStatsProducer
BadWriteEnablerError, IStatsProducer, FileTooLargeError from allmydata.util import base32, fileutil, idlib, log
from allmydata.util import base32, fileutil, idlib, mathutil, log from allmydata.util.assertutil import precondition
from allmydata.util.assertutil import precondition, _assert
import allmydata # for __version__ import allmydata # for __version__
class DataTooLargeError(Exception): class DataTooLargeError(Exception):
@ -38,7 +36,7 @@ NUM_RE=re.compile("^[0-9]+$")
# 0x00: share file version number, four bytes, current version is 1 # 0x00: share file version number, four bytes, current version is 1
# 0x04: share data length, four bytes big-endian = A # 0x04: share data length, four bytes big-endian = A
# 0x08: number of leases, four bytes big-endian # 0x08: number of leases, four bytes big-endian
# 0x0c: beginning of share data (described below, at WriteBucketProxy) # 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
# A+0x0c = B: first lease. Lease format is: # A+0x0c = B: first lease. Lease format is:
# B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner # B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
# B+0x04: renew secret, 32 bytes (SHA256) # B+0x04: renew secret, 32 bytes (SHA256)
@ -1210,295 +1208,3 @@ class StorageServer(service.MultiService, Referenceable):
# the code before here runs on the storage server, not the client # the code before here runs on the storage server, not the client
# the code beyond here runs on the client, not the storage server
"""
Share data is written into a single file. At the start of the file, there is
a series of four-byte big-endian offset values, which indicate where each
section starts. Each offset is measured from the beginning of the file.
0x00: version number (=00 00 00 01)
0x04: segment size
0x08: data size
0x0c: offset of data (=00 00 00 24)
0x10: offset of plaintext_hash_tree
0x14: offset of crypttext_hash_tree
0x18: offset of block_hashes
0x1c: offset of share_hashes
0x20: offset of uri_extension_length + uri_extension
0x24: start of data
? : start of plaintext_hash_tree
? : start of crypttext_hash_tree
? : start of block_hashes
? : start of share_hashes
each share_hash is written as a two-byte (big-endian) hashnum
followed by the 32-byte SHA-256 hash. We only store the hashes
necessary to validate the share hash root
? : start of uri_extension_length (four-byte big-endian value)
? : start of uri_extension
"""
def allocated_size(data_size, num_segments, num_share_hashes,
uri_extension_size):
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
uri_extension_size, None)
uri_extension_starts_at = wbp._offsets['uri_extension']
return uri_extension_starts_at + 4 + uri_extension_size
class WriteBucketProxy:
implements(IStorageBucketWriter)
def __init__(self, rref, data_size, segment_size, num_segments,
num_share_hashes, uri_extension_size, nodeid):
self._rref = rref
self._data_size = data_size
self._segment_size = segment_size
self._num_segments = num_segments
self._nodeid = nodeid
if segment_size >= 2**32 or data_size >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
# we commit to not sending a uri extension larger than this
self._uri_extension_size = uri_extension_size
offsets = self._offsets = {}
x = 0x24
offsets['data'] = x
x += data_size
offsets['plaintext_hash_tree'] = x
x += self._segment_hash_size
offsets['crypttext_hash_tree'] = x
x += self._segment_hash_size
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hash_size
offsets['uri_extension'] = x
if x >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (offsets).")
offset_data = struct.pack(">LLLLLLLLL",
1, # version number
segment_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'],
offsets['crypttext_hash_tree'],
offsets['block_hashes'],
offsets['share_hashes'],
offsets['uri_extension'],
)
assert len(offset_data) == 0x24
self._offset_data = offset_data
def __repr__(self):
if self._nodeid:
nodeid_s = idlib.nodeid_b2a(self._nodeid)
else:
nodeid_s = "[None]"
return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
def start(self):
return self._write(0, self._offset_data)
def put_block(self, segmentnum, data):
offset = self._offsets['data'] + segmentnum * self._segment_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
if segmentnum < self._num_segments-1:
precondition(len(data) == self._segment_size,
len(data), self._segment_size)
else:
precondition(len(data) == (self._data_size -
(self._segment_size *
(self._num_segments - 1))),
len(data), self._segment_size)
return self._write(offset, data)
def put_plaintext_hashes(self, hashes):
offset = self._offsets['plaintext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
offset, len(data), offset+len(data),
self._offsets['crypttext_hash_tree'])
return self._write(offset, data)
def put_crypttext_hashes(self, hashes):
offset = self._offsets['crypttext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['block_hashes'],
offset, len(data), offset+len(data),
self._offsets['block_hashes'])
return self._write(offset, data)
def put_block_hashes(self, blockhashes):
offset = self._offsets['block_hashes']
assert isinstance(blockhashes, list)
data = "".join(blockhashes)
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['share_hashes'],
offset, len(data), offset+len(data),
self._offsets['share_hashes'])
return self._write(offset, data)
def put_share_hashes(self, sharehashes):
# sharehashes is a list of (index, hash) tuples, so they get stored
# as 2+32=34 bytes each
offset = self._offsets['share_hashes']
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
precondition(len(data) == self._share_hash_size,
len(data), self._share_hash_size)
precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data),
self._offsets['uri_extension'])
return self._write(offset, data)
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, str)
precondition(len(data) <= self._uri_extension_size,
len(data), self._uri_extension_size)
length = struct.pack(">L", len(data))
return self._write(offset, length+data)
def _write(self, offset, data):
# TODO: for small shares, buffer the writes and do just a single call
return self._rref.callRemote("write", offset, data)
def close(self):
return self._rref.callRemote("close")
def abort(self):
return self._rref.callRemoteOnly("abort")
class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref, peerid=None, storage_index_s=None):
self._rref = rref
self._peerid = peerid
self._si_s = storage_index_s
self._started = False
def get_peerid(self):
return self._peerid
def __repr__(self):
peerid_s = idlib.shortnodeid_b2a(self._peerid)
return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
self._si_s)
def startIfNecessary(self):
if self._started:
return defer.succeed(self)
d = self.start()
d.addCallback(lambda res: self)
return d
def start(self):
# TODO: for small shares, read the whole bucket in start()
d = self._read(0, 0x24)
d.addCallback(self._parse_offsets)
def _started(res):
self._started = True
return res
d.addCallback(_started)
return d
def _parse_offsets(self, data):
precondition(len(data) == 0x24)
self._offsets = {}
(version, self._segment_size, self._data_size) = \
struct.unpack(">LLL", data[0:0xc])
_assert(version == 1)
x = 0x0c
for field in ( 'data',
'plaintext_hash_tree',
'crypttext_hash_tree',
'block_hashes',
'share_hashes',
'uri_extension',
):
offset = struct.unpack(">L", data[x:x+4])[0]
x += 4
self._offsets[field] = offset
return self._offsets
def get_block(self, blocknum):
num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
if blocknum < num_segments-1:
size = self._segment_size
else:
size = self._data_size % self._segment_size
if size == 0:
size = self._segment_size
offset = self._offsets['data'] + blocknum * self._segment_size
return self._read(offset, size)
def _str2l(self, s):
""" split string (pulled from storage) into a list of blockids """
return [ s[i:i+HASH_SIZE]
for i in range(0, len(s), HASH_SIZE) ]
def get_plaintext_hashes(self):
offset = self._offsets['plaintext_hash_tree']
size = self._offsets['crypttext_hash_tree'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_crypttext_hashes(self):
offset = self._offsets['crypttext_hash_tree']
size = self._offsets['block_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_block_hashes(self):
offset = self._offsets['block_hashes']
size = self._offsets['share_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_share_hashes(self):
offset = self._offsets['share_hashes']
size = self._offsets['uri_extension'] - offset
assert size % (2+HASH_SIZE) == 0
d = self._read(offset, size)
def _unpack_share_hashes(data):
assert len(data) == size
hashes = []
for i in range(0, size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
hashes.append( (hashnum, hashvalue) )
return hashes
d.addCallback(_unpack_share_hashes)
return d
def get_uri_extension(self):
offset = self._offsets['uri_extension']
d = self._read(offset, 4)
def _got_length(data):
length = struct.unpack(">L", data)[0]
return self._read(offset+4, length)
d.addCallback(_got_length)
return d
def _read(self, offset, length):
return self._rref.callRemote("read", offset, length)

View File

@ -7,8 +7,9 @@ import itertools
from allmydata import interfaces from allmydata import interfaces
from allmydata.util import fileutil, hashutil from allmydata.util import fileutil, hashutil
from allmydata.storage import BucketWriter, BucketReader, \ from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \ StorageServer, MutableShareFile, \
storage_index_to_dir, DataTooLargeError, LeaseInfo storage_index_to_dir, DataTooLargeError, LeaseInfo
from allmydata.immutable.layout import WriteBucketProxy, ReadBucketProxy
from allmydata.interfaces import BadWriteEnablerError from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent from allmydata.test.common import LoggingServiceParent