Rewrite immutable downloader (#798). This patch rearranges the rest of src/allmydata/immutable/ .

This commit is contained in:
Brian Warner 2010-08-04 00:26:39 -07:00
parent 22a07e9bbe
commit 797828f47f
6 changed files with 834 additions and 544 deletions

View File

@ -1,16 +1,444 @@
from zope.interface import implements
from twisted.internet import defer
from foolscap.api import DeadReferenceError, RemoteException from foolscap.api import DeadReferenceError, RemoteException
from allmydata import hashtree, codec, uri
from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
from allmydata.hashtree import IncompleteHashTree from allmydata.hashtree import IncompleteHashTree
from allmydata.check_results import CheckResults from allmydata.check_results import CheckResults
from allmydata.immutable import download
from allmydata.uri import CHKFileVerifierURI from allmydata.uri import CHKFileVerifierURI
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util import base32, idlib, deferredutil, dictutil, log from allmydata.util import base32, idlib, deferredutil, dictutil, log, mathutil
from allmydata.util.hashutil import file_renewal_secret_hash, \ from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \ file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
block_hash
from allmydata.immutable import layout from allmydata.immutable import layout
class IntegrityCheckReject(Exception):
pass
class BadURIExtension(IntegrityCheckReject):
pass
class BadURIExtensionHashValue(IntegrityCheckReject):
pass
class BadOrMissingHash(IntegrityCheckReject):
pass
class UnsupportedErasureCodec(BadURIExtension):
pass
class ValidatedExtendedURIProxy:
implements(IValidatedThingProxy)
""" I am a front-end for a remote UEB (using a local ReadBucketProxy),
responsible for retrieving and validating the elements from the UEB."""
def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
# fetch_failures is for debugging -- see test_encode.py
self._fetch_failures = fetch_failures
self._readbucketproxy = readbucketproxy
precondition(IVerifierURI.providedBy(verifycap), verifycap)
self._verifycap = verifycap
# required
self.segment_size = None
self.crypttext_root_hash = None
self.share_root_hash = None
# computed
self.block_size = None
self.share_size = None
self.num_segments = None
self.tail_data_size = None
self.tail_segment_size = None
# optional
self.crypttext_hash = None
def __str__(self):
return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
def _check_integrity(self, data):
h = uri_extension_hash(data)
if h != self._verifycap.uri_extension_hash:
msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
(self._readbucketproxy,
base32.b2a(self._verifycap.uri_extension_hash),
base32.b2a(h)))
if self._fetch_failures is not None:
self._fetch_failures["uri_extension"] += 1
raise BadURIExtensionHashValue(msg)
else:
return data
def _parse_and_validate(self, data):
self.share_size = mathutil.div_ceil(self._verifycap.size,
self._verifycap.needed_shares)
d = uri.unpack_extension(data)
# There are several kinds of things that can be found in a UEB.
# First, things that we really need to learn from the UEB in order to
# do this download. Next: things which are optional but not redundant
# -- if they are present in the UEB they will get used. Next, things
# that are optional and redundant. These things are required to be
# consistent: they don't have to be in the UEB, but if they are in
# the UEB then they will be checked for consistency with the
# already-known facts, and if they are inconsistent then an exception
# will be raised. These things aren't actually used -- they are just
# tested for consistency and ignored. Finally: things which are
# deprecated -- they ought not be in the UEB at all, and if they are
# present then a warning will be logged but they are otherwise
# ignored.
# First, things that we really need to learn from the UEB:
# segment_size, crypttext_root_hash, and share_root_hash.
self.segment_size = d['segment_size']
self.block_size = mathutil.div_ceil(self.segment_size,
self._verifycap.needed_shares)
self.num_segments = mathutil.div_ceil(self._verifycap.size,
self.segment_size)
self.tail_data_size = self._verifycap.size % self.segment_size
if not self.tail_data_size:
self.tail_data_size = self.segment_size
# padding for erasure code
self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
self._verifycap.needed_shares)
# Ciphertext hash tree root is mandatory, so that there is at most
# one ciphertext that matches this read-cap or verify-cap. The
# integrity check on the shares is not sufficient to prevent the
# original encoder from creating some shares of file A and other
# shares of file B.
self.crypttext_root_hash = d['crypttext_root_hash']
self.share_root_hash = d['share_root_hash']
# Next: things that are optional and not redundant: crypttext_hash
if d.has_key('crypttext_hash'):
self.crypttext_hash = d['crypttext_hash']
if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
# Next: things that are optional, redundant, and required to be
# consistent: codec_name, codec_params, tail_codec_params,
# num_segments, size, needed_shares, total_shares
if d.has_key('codec_name'):
if d['codec_name'] != "crs":
raise UnsupportedErasureCodec(d['codec_name'])
if d.has_key('codec_params'):
ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
if ucpss != self.segment_size:
raise BadURIExtension("inconsistent erasure code params: "
"ucpss: %s != self.segment_size: %s" %
(ucpss, self.segment_size))
if ucpns != self._verifycap.needed_shares:
raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
"self._verifycap.needed_shares: %s" %
(ucpns, self._verifycap.needed_shares))
if ucpts != self._verifycap.total_shares:
raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
"self._verifycap.total_shares: %s" %
(ucpts, self._verifycap.total_shares))
if d.has_key('tail_codec_params'):
utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
if utcpss != self.tail_segment_size:
raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
"self.tail_segment_size: %s, self._verifycap.size: %s, "
"self.segment_size: %s, self._verifycap.needed_shares: %s"
% (utcpss, self.tail_segment_size, self._verifycap.size,
self.segment_size, self._verifycap.needed_shares))
if utcpns != self._verifycap.needed_shares:
raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
"self._verifycap.needed_shares: %s" % (utcpns,
self._verifycap.needed_shares))
if utcpts != self._verifycap.total_shares:
raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
"self._verifycap.total_shares: %s" % (utcpts,
self._verifycap.total_shares))
if d.has_key('num_segments'):
if d['num_segments'] != self.num_segments:
raise BadURIExtension("inconsistent num_segments: size: %s, "
"segment_size: %s, computed_num_segments: %s, "
"ueb_num_segments: %s" % (self._verifycap.size,
self.segment_size,
self.num_segments, d['num_segments']))
if d.has_key('size'):
if d['size'] != self._verifycap.size:
raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
(self._verifycap.size, d['size']))
if d.has_key('needed_shares'):
if d['needed_shares'] != self._verifycap.needed_shares:
raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
"needed shares: %s" % (self._verifycap.total_shares,
d['needed_shares']))
if d.has_key('total_shares'):
if d['total_shares'] != self._verifycap.total_shares:
raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
"total shares: %s" % (self._verifycap.total_shares,
d['total_shares']))
# Finally, things that are deprecated and ignored: plaintext_hash,
# plaintext_root_hash
if d.get('plaintext_hash'):
log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
"and is no longer used. Ignoring. %s" % (self,))
if d.get('plaintext_root_hash'):
log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
"reasons and is no longer used. Ignoring. %s" % (self,))
return self
def start(self):
"""Fetch the UEB from bucket, compare its hash to the hash from
verifycap, then parse it. Returns a deferred which is called back
with self once the fetch is successful, or is erred back if it
fails."""
d = self._readbucketproxy.get_uri_extension()
d.addCallback(self._check_integrity)
d.addCallback(self._parse_and_validate)
return d
class ValidatedReadBucketProxy(log.PrefixingLogMixin):
"""I am a front-end for a remote storage bucket, responsible for
retrieving and validating data from that bucket.
My get_block() method is used by BlockDownloaders.
"""
def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
block_size, share_size):
""" share_hash_tree is required to have already been initialized with
the root hash (the number-0 hash), using the share_root_hash from the
UEB"""
precondition(share_hash_tree[0] is not None, share_hash_tree)
prefix = "%d-%s-%s" % (sharenum, bucket,
base32.b2a_l(share_hash_tree[0][:8], 60))
log.PrefixingLogMixin.__init__(self,
facility="tahoe.immutable.download",
prefix=prefix)
self.sharenum = sharenum
self.bucket = bucket
self.share_hash_tree = share_hash_tree
self.num_blocks = num_blocks
self.block_size = block_size
self.share_size = share_size
self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
def get_all_sharehashes(self):
"""Retrieve and validate all the share-hash-tree nodes that are
included in this share, regardless of whether we need them to
validate the share or not. Each share contains a minimal Merkle tree
chain, but there is lots of overlap, so usually we'll be using hashes
from other shares and not reading every single hash from this share.
The Verifier uses this function to read and validate every single
hash from this share.
Call this (and wait for the Deferred it returns to fire) before
calling get_block() for the first time: this lets us check that the
share share contains enough hashes to validate its own data, and
avoids downloading any share hash twice.
I return a Deferred which errbacks upon failure, probably with
BadOrMissingHash."""
d = self.bucket.get_share_hashes()
def _got_share_hashes(sh):
sharehashes = dict(sh)
try:
self.share_hash_tree.set_hashes(sharehashes)
except IndexError, le:
raise BadOrMissingHash(le)
except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
raise BadOrMissingHash(le)
d.addCallback(_got_share_hashes)
return d
def get_all_blockhashes(self):
"""Retrieve and validate all the block-hash-tree nodes that are
included in this share. Each share contains a full Merkle tree, but
we usually only fetch the minimal subset necessary for any particular
block. This function fetches everything at once. The Verifier uses
this function to validate the block hash tree.
Call this (and wait for the Deferred it returns to fire) after
calling get_all_sharehashes() and before calling get_block() for the
first time: this lets us check that the share contains all block
hashes and avoids downloading them multiple times.
I return a Deferred which errbacks upon failure, probably with
BadOrMissingHash.
"""
# get_block_hashes(anything) currently always returns everything
needed = list(range(len(self.block_hash_tree)))
d = self.bucket.get_block_hashes(needed)
def _got_block_hashes(blockhashes):
if len(blockhashes) < len(self.block_hash_tree):
raise BadOrMissingHash()
bh = dict(enumerate(blockhashes))
try:
self.block_hash_tree.set_hashes(bh)
except IndexError, le:
raise BadOrMissingHash(le)
except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
raise BadOrMissingHash(le)
d.addCallback(_got_block_hashes)
return d
def get_all_crypttext_hashes(self, crypttext_hash_tree):
"""Retrieve and validate all the crypttext-hash-tree nodes that are
in this share. Normally we don't look at these at all: the download
process fetches them incrementally as needed to validate each segment
of ciphertext. But this is a convenient place to give the Verifier a
function to validate all of these at once.
Call this with a new hashtree object for each share, initialized with
the crypttext hash tree root. I return a Deferred which errbacks upon
failure, probably with BadOrMissingHash.
"""
# get_crypttext_hashes() always returns everything
d = self.bucket.get_crypttext_hashes()
def _got_crypttext_hashes(hashes):
if len(hashes) < len(crypttext_hash_tree):
raise BadOrMissingHash()
ct_hashes = dict(enumerate(hashes))
try:
crypttext_hash_tree.set_hashes(ct_hashes)
except IndexError, le:
raise BadOrMissingHash(le)
except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
raise BadOrMissingHash(le)
d.addCallback(_got_crypttext_hashes)
return d
def get_block(self, blocknum):
# the first time we use this bucket, we need to fetch enough elements
# of the share hash tree to validate it from our share hash up to the
# hashroot.
if self.share_hash_tree.needed_hashes(self.sharenum):
d1 = self.bucket.get_share_hashes()
else:
d1 = defer.succeed([])
# We might need to grab some elements of our block hash tree, to
# validate the requested block up to the share hash.
blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
# We don't need the root of the block hash tree, as that comes in the
# share tree.
blockhashesneeded.discard(0)
d2 = self.bucket.get_block_hashes(blockhashesneeded)
if blocknum < self.num_blocks-1:
thisblocksize = self.block_size
else:
thisblocksize = self.share_size % self.block_size
if thisblocksize == 0:
thisblocksize = self.block_size
d3 = self.bucket.get_block_data(blocknum,
self.block_size, thisblocksize)
dl = deferredutil.gatherResults([d1, d2, d3])
dl.addCallback(self._got_data, blocknum)
return dl
def _got_data(self, results, blocknum):
precondition(blocknum < self.num_blocks,
self, blocknum, self.num_blocks)
sharehashes, blockhashes, blockdata = results
try:
sharehashes = dict(sharehashes)
except ValueError, le:
le.args = tuple(le.args + (sharehashes,))
raise
blockhashes = dict(enumerate(blockhashes))
candidate_share_hash = None # in case we log it in the except block below
blockhash = None # in case we log it in the except block below
try:
if self.share_hash_tree.needed_hashes(self.sharenum):
# This will raise exception if the values being passed do not
# match the root node of self.share_hash_tree.
try:
self.share_hash_tree.set_hashes(sharehashes)
except IndexError, le:
# Weird -- sharehashes contained index numbers outside of
# the range that fit into this hash tree.
raise BadOrMissingHash(le)
# To validate a block we need the root of the block hash tree,
# which is also one of the leafs of the share hash tree, and is
# called "the share hash".
if not self.block_hash_tree[0]: # empty -- no root node yet
# Get the share hash from the share hash tree.
share_hash = self.share_hash_tree.get_leaf(self.sharenum)
if not share_hash:
# No root node in block_hash_tree and also the share hash
# wasn't sent by the server.
raise hashtree.NotEnoughHashesError
self.block_hash_tree.set_hashes({0: share_hash})
if self.block_hash_tree.needed_hashes(blocknum):
self.block_hash_tree.set_hashes(blockhashes)
blockhash = block_hash(blockdata)
self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
#self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
# "%r .. %r: %s" %
# (self.sharenum, blocknum, len(blockdata),
# blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
# log.WEIRD: indicates undetected disk/network error, or more
# likely a programming error
self.log("hash failure in block=%d, shnum=%d on %s" %
(blocknum, self.sharenum, self.bucket))
if self.block_hash_tree.needed_hashes(blocknum):
self.log(""" failure occurred when checking the block_hash_tree.
This suggests that either the block data was bad, or that the
block hashes we received along with it were bad.""")
else:
self.log(""" the failure probably occurred when checking the
share_hash_tree, which suggests that the share hashes we
received from the remote peer were bad.""")
self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
self.log(" block length: %d" % len(blockdata))
self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
if len(blockdata) < 100:
self.log(" block data: %r" % (blockdata,))
else:
self.log(" block data start/end: %r .. %r" %
(blockdata[:50], blockdata[-50:]))
self.log(" share hash tree:\n" + self.share_hash_tree.dump())
self.log(" block hash tree:\n" + self.block_hash_tree.dump())
lines = []
for i,h in sorted(sharehashes.items()):
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
lines = []
for i,h in blockhashes.items():
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
raise BadOrMissingHash(le)
# If we made it here, the block is good. If the hash trees didn't
# like what they saw, they would have raised a BadHashError, causing
# our caller to see a Failure and thus ignore this block (as well as
# dropping this bucket).
return blockdata
class Checker(log.PrefixingLogMixin): class Checker(log.PrefixingLogMixin):
"""I query all servers to see if M uniquely-numbered shares are """I query all servers to see if M uniquely-numbered shares are
available. available.
@ -85,7 +513,9 @@ class Checker(log.PrefixingLogMixin):
level = log.WEIRD level = log.WEIRD
if f.check(DeadReferenceError): if f.check(DeadReferenceError):
level = log.UNUSUAL level = log.UNUSUAL
self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ") self.log("failure from server on 'get_buckets' the REMOTE failure was:",
facility="tahoe.immutable.checker",
failure=f, level=level, umid="AX7wZQ")
return ({}, serverid, False) return ({}, serverid, False)
d.addCallbacks(_wrap_results, _trap_errs) d.addCallbacks(_wrap_results, _trap_errs)
@ -146,14 +576,14 @@ class Checker(log.PrefixingLogMixin):
vcap = self._verifycap vcap = self._verifycap
b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index()) b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index())
veup = download.ValidatedExtendedURIProxy(b, vcap) veup = ValidatedExtendedURIProxy(b, vcap)
d = veup.start() d = veup.start()
def _got_ueb(vup): def _got_ueb(vup):
share_hash_tree = IncompleteHashTree(vcap.total_shares) share_hash_tree = IncompleteHashTree(vcap.total_shares)
share_hash_tree.set_hashes({0: vup.share_root_hash}) share_hash_tree.set_hashes({0: vup.share_root_hash})
vrbp = download.ValidatedReadBucketProxy(sharenum, b, vrbp = ValidatedReadBucketProxy(sharenum, b,
share_hash_tree, share_hash_tree,
vup.num_segments, vup.num_segments,
vup.block_size, vup.block_size,
@ -216,8 +646,8 @@ class Checker(log.PrefixingLogMixin):
return (False, sharenum, 'incompatible') return (False, sharenum, 'incompatible')
elif f.check(layout.LayoutInvalid, elif f.check(layout.LayoutInvalid,
layout.RidiculouslyLargeURIExtensionBlock, layout.RidiculouslyLargeURIExtensionBlock,
download.BadOrMissingHash, BadOrMissingHash,
download.BadURIExtensionHashValue): BadURIExtensionHashValue):
return (False, sharenum, 'corrupt') return (False, sharenum, 'corrupt')
# if it wasn't one of those reasons, re-raise the error # if it wasn't one of those reasons, re-raise the error

View File

@ -1,21 +1,232 @@
import copy, os.path, stat
from cStringIO import StringIO import binascii
import copy
import time
now = time.time
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.interfaces import IPushProducer from twisted.internet.interfaces import IConsumer
from twisted.protocols import basic
from foolscap.api import eventually
from allmydata.interfaces import IImmutableFileNode, ICheckable, \
IDownloadTarget, IUploadResults
from allmydata.util import dictutil, log, base32
from allmydata.uri import CHKFileURI, LiteralFileURI
from allmydata.immutable.checker import Checker
from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.immutable.repairer import Repairer
from allmydata.immutable import download
class _ImmutableFileNodeBase(object): from allmydata.interfaces import IImmutableFileNode, IUploadResults
implements(IImmutableFileNode, ICheckable) from allmydata import uri
from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.util.dictutil import DictOfSets
from pycryptopp.cipher.aes import AES
# local imports
from allmydata.immutable.checker import Checker
from allmydata.immutable.repairer import Repairer
from allmydata.immutable.downloader.node import DownloadNode
from allmydata.immutable.downloader.status import DownloadStatus
class CiphertextFileNode:
def __init__(self, verifycap, storage_broker, secret_holder,
terminator, history, download_status=None):
assert isinstance(verifycap, uri.CHKFileVerifierURI)
self._verifycap = verifycap
self._storage_broker = storage_broker
self._secret_holder = secret_holder
if download_status is None:
ds = DownloadStatus(verifycap.storage_index, verifycap.size)
if history:
history.add_download(ds)
download_status = ds
self._node = DownloadNode(verifycap, storage_broker, secret_holder,
terminator, history, download_status)
def read(self, consumer, offset=0, size=None, read_ev=None):
"""I am the main entry point, from which FileNode.read() can get
data. I feed the consumer with the desired range of ciphertext. I
return a Deferred that fires (with the consumer) when the read is
finished."""
return self._node.read(consumer, offset, size, read_ev)
def get_segment(self, segnum):
"""Begin downloading a segment. I return a tuple (d, c): 'd' is a
Deferred that fires with (offset,data) when the desired segment is
available, and c is an object on which c.cancel() can be called to
disavow interest in the segment (after which 'd' will never fire).
You probably need to know the segment size before calling this,
unless you want the first few bytes of the file. If you ask for a
segment number which turns out to be too large, the Deferred will
errback with BadSegmentNumberError.
The Deferred fires with the offset of the first byte of the data
segment, so that you can call get_segment() before knowing the
segment size, and still know which data you received.
"""
return self._node.get_segment(segnum)
def get_segment_size(self):
# return a Deferred that fires with the file's real segment size
return self._node.get_segsize()
def get_storage_index(self):
return self._verifycap.storage_index
def get_verify_cap(self):
return self._verifycap
def get_size(self):
return self._verifycap.size
def raise_error(self):
pass
def check_and_repair(self, monitor, verify=False, add_lease=False):
verifycap = self._verifycap
storage_index = verifycap.storage_index
sb = self._storage_broker
servers = sb.get_all_servers()
sh = self._secret_holder
c = Checker(verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, secret_holder=sh,
monitor=monitor)
d = c.start()
def _maybe_repair(cr):
crr = CheckAndRepairResults(storage_index)
crr.pre_repair_results = cr
if cr.is_healthy():
crr.post_repair_results = cr
return defer.succeed(crr)
else:
crr.repair_attempted = True
crr.repair_successful = False # until proven successful
def _gather_repair_results(ur):
assert IUploadResults.providedBy(ur), ur
# clone the cr (check results) to form the basis of the
# prr (post-repair results)
prr = CheckResults(cr.uri, cr.storage_index)
prr.data = copy.deepcopy(cr.data)
sm = prr.data['sharemap']
assert isinstance(sm, DictOfSets), sm
sm.update(ur.sharemap)
servers_responding = set(prr.data['servers-responding'])
servers_responding.union(ur.sharemap.iterkeys())
prr.data['servers-responding'] = list(servers_responding)
prr.data['count-shares-good'] = len(sm)
prr.data['count-good-share-hosts'] = len(sm)
is_healthy = bool(len(sm) >= verifycap.total_shares)
is_recoverable = bool(len(sm) >= verifycap.needed_shares)
prr.set_healthy(is_healthy)
prr.set_recoverable(is_recoverable)
crr.repair_successful = is_healthy
prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
crr.post_repair_results = prr
return crr
def _repair_error(f):
# as with mutable repair, I'm not sure if I want to pass
# through a failure or not. TODO
crr.repair_successful = False
crr.repair_failure = f
return f
r = Repairer(self, storage_broker=sb, secret_holder=sh,
monitor=monitor)
d = r.start()
d.addCallbacks(_gather_repair_results, _repair_error)
return d
d.addCallback(_maybe_repair)
return d
def check(self, monitor, verify=False, add_lease=False):
verifycap = self._verifycap
sb = self._storage_broker
servers = sb.get_all_servers()
sh = self._secret_holder
v = Checker(verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, secret_holder=sh,
monitor=monitor)
return v.start()
class DecryptingConsumer:
"""I sit between a CiphertextDownloader (which acts as a Producer) and
the real Consumer, decrypting everything that passes by. The real
Consumer sees the real Producer, but the Producer sees us instead of the
real consumer."""
implements(IConsumer)
def __init__(self, consumer, readkey, offset, read_event):
self._consumer = consumer
self._read_event = read_event
# TODO: pycryptopp CTR-mode needs random-access operations: I want
# either a=AES(readkey, offset) or better yet both of:
# a=AES(readkey, offset=0)
# a.process(ciphertext, offset=xyz)
# For now, we fake it with the existing iv= argument.
offset_big = offset // 16
offset_small = offset % 16
iv = binascii.unhexlify("%032x" % offset_big)
self._decryptor = AES(readkey, iv=iv)
self._decryptor.process("\x00"*offset_small)
def registerProducer(self, producer, streaming):
# this passes through, so the real consumer can flow-control the real
# producer. Therefore we don't need to provide any IPushProducer
# methods. We implement all the IConsumer methods as pass-throughs,
# and only intercept write() to perform decryption.
self._consumer.registerProducer(producer, streaming)
def unregisterProducer(self):
self._consumer.unregisterProducer()
def write(self, ciphertext):
started = now()
plaintext = self._decryptor.process(ciphertext)
elapsed = now() - started
self._read_event.update(0, elapsed, 0)
self._consumer.write(plaintext)
class ImmutableFileNode:
implements(IImmutableFileNode)
# I wrap a CiphertextFileNode with a decryption key
def __init__(self, filecap, storage_broker, secret_holder, terminator,
history):
assert isinstance(filecap, uri.CHKFileURI)
verifycap = filecap.get_verify_cap()
ds = DownloadStatus(verifycap.storage_index, verifycap.size)
if history:
history.add_download(ds)
self._download_status = ds
self._cnode = CiphertextFileNode(verifycap, storage_broker,
secret_holder, terminator, history, ds)
assert isinstance(filecap, uri.CHKFileURI)
self.u = filecap
self._readkey = filecap.key
# TODO: I'm not sure about this.. what's the use case for node==node? If
# we keep it here, we should also put this on CiphertextFileNode
def __hash__(self):
return self.u.__hash__()
def __eq__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return False
def __ne__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return True
def read(self, consumer, offset=0, size=None):
actual_size = size
if actual_size == None:
actual_size = self.u.size
actual_size = actual_size - offset
read_ev = self._download_status.add_read_event(offset,actual_size,
now())
decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev)
d = self._cnode.read(decryptor, offset, size, read_ev)
d.addCallback(lambda dc: consumer)
return d
def raise_error(self):
pass
def get_write_uri(self): def get_write_uri(self):
return None return None
@ -23,6 +234,26 @@ class _ImmutableFileNodeBase(object):
def get_readonly_uri(self): def get_readonly_uri(self):
return self.get_uri() return self.get_uri()
def get_uri(self):
return self.u.to_string()
def get_cap(self):
return self.u
def get_readcap(self):
return self.u.get_readonly()
def get_verify_cap(self):
return self.u.get_verify_cap()
def get_repair_cap(self):
# CHK files can be repaired with just the verifycap
return self.u.get_verify_cap()
def get_storage_index(self):
return self.u.get_storage_index()
def get_size(self):
return self.u.get_size()
def get_current_size(self):
return defer.succeed(self.get_size())
def is_mutable(self): def is_mutable(self):
return False return False
@ -35,341 +266,7 @@ class _ImmutableFileNodeBase(object):
def is_allowed_in_immutable_directory(self): def is_allowed_in_immutable_directory(self):
return True return True
def raise_error(self):
pass
def __hash__(self):
return self.u.__hash__()
def __eq__(self, other):
if isinstance(other, _ImmutableFileNodeBase):
return self.u.__eq__(other.u)
else:
return False
def __ne__(self, other):
if isinstance(other, _ImmutableFileNodeBase):
return self.u.__eq__(other.u)
else:
return True
class PortionOfFile:
# like a list slice (things[2:14]), but for a file on disk
def __init__(self, fn, offset=0, size=None):
self.f = open(fn, "rb")
self.f.seek(offset)
self.bytes_left = size
def read(self, size=None):
# bytes_to_read = min(size, self.bytes_left), but None>anything
if size is None:
bytes_to_read = self.bytes_left
elif self.bytes_left is None:
bytes_to_read = size
else:
bytes_to_read = min(size, self.bytes_left)
data = self.f.read(bytes_to_read)
if self.bytes_left is not None:
self.bytes_left -= len(data)
return data
class DownloadCache:
implements(IDownloadTarget)
def __init__(self, filecap, storage_index, downloader,
cachedirectorymanager):
self._downloader = downloader
self._uri = filecap
self._storage_index = storage_index
self.milestones = set() # of (offset,size,Deferred)
self.cachedirectorymanager = cachedirectorymanager
self.cachefile = None
self.download_in_progress = False
# five states:
# new ImmutableFileNode, no downloads ever performed
# new ImmutableFileNode, leftover file (partial)
# new ImmutableFileNode, leftover file (whole)
# download in progress, not yet complete
# download complete
def when_range_available(self, offset, size):
assert isinstance(offset, (int,long))
assert isinstance(size, (int,long))
d = defer.Deferred()
self.milestones.add( (offset,size,d) )
self._check_milestones()
if self.milestones and not self.download_in_progress:
self.download_in_progress = True
log.msg(format=("immutable filenode read [%(si)s]: " +
"starting download"),
si=base32.b2a(self._storage_index),
umid="h26Heg", level=log.OPERATIONAL)
d2 = self._downloader.download(self._uri, self)
d2.addBoth(self._download_done)
d2.addErrback(self._download_failed)
d2.addErrback(log.err, umid="cQaM9g")
return d
def read(self, consumer, offset, size):
assert offset+size <= self.get_filesize()
if not self.cachefile:
self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
f = PortionOfFile(self.cachefile.get_filename(), offset, size)
d = basic.FileSender().beginFileTransfer(f, consumer)
d.addCallback(lambda lastSent: consumer)
return d
def _download_done(self, res):
# clear download_in_progress, so failed downloads can be re-tried
self.download_in_progress = False
return res
def _download_failed(self, f):
# tell anyone who's waiting that we failed
for m in self.milestones:
(offset,size,d) = m
eventually(d.errback, f)
self.milestones.clear()
def _check_milestones(self):
current_size = self.get_filesize()
for m in list(self.milestones):
(offset,size,d) = m
if offset+size <= current_size:
log.msg(format=("immutable filenode read [%(si)s] " +
"%(offset)d+%(size)d vs %(filesize)d: " +
"done"),
si=base32.b2a(self._storage_index),
offset=offset, size=size, filesize=current_size,
umid="nuedUg", level=log.NOISY)
self.milestones.discard(m)
eventually(d.callback, None)
else:
log.msg(format=("immutable filenode read [%(si)s] " +
"%(offset)d+%(size)d vs %(filesize)d: " +
"still waiting"),
si=base32.b2a(self._storage_index),
offset=offset, size=size, filesize=current_size,
umid="8PKOhg", level=log.NOISY)
def get_filesize(self):
if not self.cachefile:
self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
try:
filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
except OSError:
filesize = 0
return filesize
def open(self, size):
if not self.cachefile:
self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
self.f = open(self.cachefile.get_filename(), "wb")
def write(self, data):
self.f.write(data)
self._check_milestones()
def close(self):
self.f.close()
self._check_milestones()
def fail(self, why):
pass
def register_canceller(self, cb):
pass
def finish(self):
return None
# The following methods are just because the target might be a
# repairer.DownUpConnector, and just because the current CHKUpload object
# expects to find the storage index and encoding parameters in its
# Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
pass
class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
def __init__(self, filecap, storage_broker, secret_holder,
downloader, history, cachedirectorymanager):
assert isinstance(filecap, CHKFileURI)
self.u = filecap
self._storage_broker = storage_broker
self._secret_holder = secret_holder
self._downloader = downloader
self._history = history
storage_index = self.get_storage_index()
self.download_cache = DownloadCache(filecap, storage_index, downloader,
cachedirectorymanager)
prefix = self.u.get_verify_cap().to_string()
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
self.log("starting", level=log.OPERATIONAL)
def get_size(self):
return self.u.get_size()
def get_current_size(self):
return defer.succeed(self.get_size())
def get_cap(self):
return self.u
def get_readcap(self):
return self.u.get_readonly()
def get_verify_cap(self):
return self.u.get_verify_cap()
def get_repair_cap(self):
# CHK files can be repaired with just the verifycap
return self.u.get_verify_cap()
def get_uri(self):
return self.u.to_string()
def get_storage_index(self):
return self.u.get_storage_index()
def check_and_repair(self, monitor, verify=False, add_lease=False): def check_and_repair(self, monitor, verify=False, add_lease=False):
verifycap = self.get_verify_cap() return self._cnode.check_and_repair(monitor, verify, add_lease)
sb = self._storage_broker
servers = sb.get_all_servers()
sh = self._secret_holder
c = Checker(verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, secret_holder=sh,
monitor=monitor)
d = c.start()
def _maybe_repair(cr):
crr = CheckAndRepairResults(self.u.get_storage_index())
crr.pre_repair_results = cr
if cr.is_healthy():
crr.post_repair_results = cr
return defer.succeed(crr)
else:
crr.repair_attempted = True
crr.repair_successful = False # until proven successful
def _gather_repair_results(ur):
assert IUploadResults.providedBy(ur), ur
# clone the cr -- check results to form the basic of the prr -- post-repair results
prr = CheckResults(cr.uri, cr.storage_index)
prr.data = copy.deepcopy(cr.data)
sm = prr.data['sharemap']
assert isinstance(sm, dictutil.DictOfSets), sm
sm.update(ur.sharemap)
servers_responding = set(prr.data['servers-responding'])
servers_responding.union(ur.sharemap.iterkeys())
prr.data['servers-responding'] = list(servers_responding)
prr.data['count-shares-good'] = len(sm)
prr.data['count-good-share-hosts'] = len(sm)
is_healthy = bool(len(sm) >= self.u.total_shares)
is_recoverable = bool(len(sm) >= self.u.needed_shares)
prr.set_healthy(is_healthy)
prr.set_recoverable(is_recoverable)
crr.repair_successful = is_healthy
prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
crr.post_repair_results = prr
return crr
def _repair_error(f):
# as with mutable repair, I'm not sure if I want to pass
# through a failure or not. TODO
crr.repair_successful = False
crr.repair_failure = f
return f
r = Repairer(storage_broker=sb, secret_holder=sh,
verifycap=verifycap, monitor=monitor)
d = r.start()
d.addCallbacks(_gather_repair_results, _repair_error)
return d
d.addCallback(_maybe_repair)
return d
def check(self, monitor, verify=False, add_lease=False): def check(self, monitor, verify=False, add_lease=False):
verifycap = self.get_verify_cap() return self._cnode.check(monitor, verify, add_lease)
sb = self._storage_broker
servers = sb.get_all_servers()
sh = self._secret_holder
v = Checker(verifycap=verifycap, servers=servers,
verify=verify, add_lease=add_lease, secret_holder=sh,
monitor=monitor)
return v.start()
def read(self, consumer, offset=0, size=None):
self.log("read", offset=offset, size=size,
umid="UPP8FA", level=log.OPERATIONAL)
if size is None:
size = self.get_size() - offset
size = min(size, self.get_size() - offset)
if offset == 0 and size == self.get_size():
# don't use the cache, just do a normal streaming download
self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
target = download.ConsumerAdapter(consumer)
return self._downloader.download(self.get_cap(), target,
self._parentmsgid,
history=self._history)
d = self.download_cache.when_range_available(offset, size)
d.addCallback(lambda res:
self.download_cache.read(consumer, offset, size))
return d
class LiteralProducer:
implements(IPushProducer)
def resumeProducing(self):
pass
def stopProducing(self):
pass
class LiteralFileNode(_ImmutableFileNodeBase):
def __init__(self, filecap):
assert isinstance(filecap, LiteralFileURI)
self.u = filecap
def get_size(self):
return len(self.u.data)
def get_current_size(self):
return defer.succeed(self.get_size())
def get_cap(self):
return self.u
def get_readcap(self):
return self.u
def get_verify_cap(self):
return None
def get_repair_cap(self):
return None
def get_uri(self):
return self.u.to_string()
def get_storage_index(self):
return None
def check(self, monitor, verify=False, add_lease=False):
return defer.succeed(None)
def check_and_repair(self, monitor, verify=False, add_lease=False):
return defer.succeed(None)
def read(self, consumer, offset=0, size=None):
if size is None:
data = self.u.data[offset:]
else:
data = self.u.data[offset:offset+size]
# We use twisted.protocols.basic.FileSender, which only does
# non-streaming, i.e. PullProducer, where the receiver/consumer must
# ask explicitly for each chunk of data. There are only two places in
# the Twisted codebase that can't handle streaming=False, both of
# which are in the upload path for an FTP/SFTP server
# (protocols.ftp.FileConsumer and
# vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
# likely to be used as the target for a Tahoe download.
d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
d.addCallback(lambda lastSent: consumer)
return d

View File

@ -74,12 +74,16 @@ limitations described in #346.
# they are still provided when writing so that older versions of Tahoe can # they are still provided when writing so that older versions of Tahoe can
# read them. # read them.
FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
def make_write_bucket_proxy(rref, data_size, block_size, num_segments, def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max, nodeid): num_share_hashes, uri_extension_size_max, nodeid):
# Use layout v1 for small files, so they'll be readable by older versions # Use layout v1 for small files, so they'll be readable by older versions
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable # (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
# by tahoe-1.3.0 or later. # by tahoe-1.3.0 or later.
try: try:
if FORCE_V2:
raise FileTooLargeError
wbp = WriteBucketProxy(rref, data_size, block_size, num_segments, wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max, nodeid) num_share_hashes, uri_extension_size_max, nodeid)
except FileTooLargeError: except FileTooLargeError:

View File

@ -0,0 +1,104 @@
from cStringIO import StringIO
from zope.interface import implements
from twisted.internet import defer
from twisted.internet.interfaces import IPushProducer
from twisted.protocols import basic
from allmydata.interfaces import IImmutableFileNode, ICheckable
from allmydata.uri import LiteralFileURI
class _ImmutableFileNodeBase(object):
implements(IImmutableFileNode, ICheckable)
def get_write_uri(self):
return None
def get_readonly_uri(self):
return self.get_uri()
def is_mutable(self):
return False
def is_readonly(self):
return True
def is_unknown(self):
return False
def is_allowed_in_immutable_directory(self):
return True
def raise_error(self):
pass
def __hash__(self):
return self.u.__hash__()
def __eq__(self, other):
if isinstance(other, _ImmutableFileNodeBase):
return self.u.__eq__(other.u)
else:
return False
def __ne__(self, other):
if isinstance(other, _ImmutableFileNodeBase):
return self.u.__eq__(other.u)
else:
return True
class LiteralProducer:
implements(IPushProducer)
def resumeProducing(self):
pass
def stopProducing(self):
pass
class LiteralFileNode(_ImmutableFileNodeBase):
def __init__(self, filecap):
assert isinstance(filecap, LiteralFileURI)
self.u = filecap
def get_size(self):
return len(self.u.data)
def get_current_size(self):
return defer.succeed(self.get_size())
def get_cap(self):
return self.u
def get_readcap(self):
return self.u
def get_verify_cap(self):
return None
def get_repair_cap(self):
return None
def get_uri(self):
return self.u.to_string()
def get_storage_index(self):
return None
def check(self, monitor, verify=False, add_lease=False):
return defer.succeed(None)
def check_and_repair(self, monitor, verify=False, add_lease=False):
return defer.succeed(None)
def read(self, consumer, offset=0, size=None):
if size is None:
data = self.u.data[offset:]
else:
data = self.u.data[offset:offset+size]
# We use twisted.protocols.basic.FileSender, which only does
# non-streaming, i.e. PullProducer, where the receiver/consumer must
# ask explicitly for each chunk of data. There are only two places in
# the Twisted codebase that can't handle streaming=False, both of
# which are in the upload path for an FTP/SFTP server
# (protocols.ftp.FileConsumer and
# vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
# likely to be used as the target for a Tahoe download.
d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
d.addCallback(lambda lastSent: consumer)
return d

View File

@ -1,17 +1,14 @@
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from allmydata.storage.server import si_b2a from allmydata.storage.server import si_b2a
from allmydata.util import log, observer from allmydata.util import log, consumer
from allmydata.util.assertutil import precondition, _assert from allmydata.util.assertutil import precondition
from allmydata.uri import CHKFileVerifierURI from allmydata.interfaces import IEncryptedUploadable
from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
from twisted.internet.interfaces import IConsumer
from allmydata.immutable import download, upload from allmydata.immutable import upload
import collections
class Repairer(log.PrefixingLogMixin): class Repairer(log.PrefixingLogMixin):
implements(IEncryptedUploadable)
"""I generate any shares which were not available and upload them to """I generate any shares which were not available and upload them to
servers. servers.
@ -43,195 +40,51 @@ class Repairer(log.PrefixingLogMixin):
cancelled (by invoking its raise_if_cancelled() method). cancelled (by invoking its raise_if_cancelled() method).
""" """
def __init__(self, storage_broker, secret_holder, verifycap, monitor): def __init__(self, filenode, storage_broker, secret_holder, monitor):
assert precondition(isinstance(verifycap, CHKFileVerifierURI)) logprefix = si_b2a(filenode.get_storage_index())[:5]
logprefix = si_b2a(verifycap.get_storage_index())[:5]
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
prefix=logprefix) prefix=logprefix)
self._filenode = filenode
self._storage_broker = storage_broker self._storage_broker = storage_broker
self._secret_holder = secret_holder self._secret_holder = secret_holder
self._verifycap = verifycap
self._monitor = monitor self._monitor = monitor
self._offset = 0
def start(self): def start(self):
self.log("starting repair") self.log("starting repair")
duc = DownUpConnector() d = self._filenode.get_segment_size()
dl = download.CiphertextDownloader(self._storage_broker, def _got_segsize(segsize):
self._verifycap, target=duc, vcap = self._filenode.get_verify_cap()
monitor=self._monitor) k = vcap.needed_shares
N = vcap.total_shares
happy = upload.BaseUploadable.default_encoding_param_happy
self._encodingparams = (k, happy, N, segsize)
ul = upload.CHKUploader(self._storage_broker, self._secret_holder) ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
return ul.start(self) # I am the IEncryptedUploadable
d = defer.Deferred() d.addCallback(_got_segsize)
# If the upload or the download fails or is stopped, then the repair
# failed.
def _errb(f):
d.errback(f)
return None
# If the upload succeeds, then the repair has succeeded.
def _cb(res):
d.callback(res)
ul.start(duc).addCallbacks(_cb, _errb)
# If the download fails or is stopped, then the repair failed.
d2 = dl.start()
d2.addErrback(_errb)
# We ignore the callback from d2. Is this right? Ugh.
return d return d
class DownUpConnector(log.PrefixingLogMixin):
implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
"""I act like an 'encrypted uploadable' -- something that a local
uploader can read ciphertext from in order to upload the ciphertext.
However, unbeknownst to the uploader, I actually download the ciphertext
from a CiphertextDownloader instance as it is needed.
On the other hand, I act like a 'download target' -- something that a
local downloader can write ciphertext to as it downloads the ciphertext.
That downloader doesn't realize, of course, that I'm just turning around
and giving the ciphertext to the uploader."""
# The theory behind this class is nice: just satisfy two separate
# interfaces. The implementation is slightly horrible, because of
# "impedance mismatch" -- the downloader expects to be able to
# synchronously push data in, and the uploader expects to be able to read
# data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
# The two interfaces have different APIs for pausing/unpausing. The
# uploader requests metadata like size and encodingparams which the
# downloader provides either eventually or not at all (okay I just now
# extended the downloader to provide encodingparams). Most of this
# slightly horrible code would disappear if CiphertextDownloader just
# used this object as an IConsumer (plus maybe a couple of other methods)
# and if the Uploader simply expected to be treated as an IConsumer (plus
# maybe a couple of other things).
def __init__(self, buflim=2**19):
"""If we're already holding at least buflim bytes, then tell the
downloader to pause until we have less than buflim bytes."""
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
self.buflim = buflim
self.bufs = collections.deque() # list of strings
self.bufsiz = 0 # how many bytes total in bufs
# list of deferreds which will fire with the requested ciphertext
self.next_read_ds = collections.deque()
# how many bytes of ciphertext were requested by each deferred
self.next_read_lens = collections.deque()
self._size_osol = observer.OneShotObserverList()
self._encodingparams_osol = observer.OneShotObserverList()
self._storageindex_osol = observer.OneShotObserverList()
self._closed_to_pusher = False
# once seg size is available, the following attribute will be created
# to hold it:
# self.encodingparams # (provided by the object which is pushing data
# into me, required by the object which is pulling data out of me)
# open() will create the following attribute:
# self.size # size of the whole file (provided by the object which is
# pushing data into me, required by the object which is pulling data
# out of me)
# set_upload_status() will create the following attribute:
# self.upload_status # XXX do we need to actually update this? Is
# anybody watching the results during a repair?
def _satisfy_reads_if_possible(self):
assert bool(self.next_read_ds) == bool(self.next_read_lens)
while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
or self._closed_to_pusher):
nrd = self.next_read_ds.popleft()
nrl = self.next_read_lens.popleft()
# Pick out the requested number of bytes from self.bufs, turn it
# into a string, and callback the deferred with that.
res = []
ressize = 0
while ressize < nrl and self.bufs:
nextbuf = self.bufs.popleft()
res.append(nextbuf)
ressize += len(nextbuf)
if ressize > nrl:
extra = ressize - nrl
self.bufs.appendleft(nextbuf[:-extra])
res[-1] = nextbuf[:-extra]
assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl)
assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl)
self.bufsiz -= nrl
if self.bufsiz < self.buflim and self.producer:
self.producer.resumeProducing()
nrd.callback(res)
# methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
# the perspective of a downloader I am an IDownloadTarget and an
# IConsumer.)
def registerProducer(self, producer, streaming):
assert streaming # We know how to handle only streaming producers.
self.producer = producer # the downloader
def unregisterProducer(self):
self.producer = None
def open(self, size):
self.size = size
self._size_osol.fire(self.size)
def set_encodingparams(self, encodingparams):
self.encodingparams = encodingparams
self._encodingparams_osol.fire(self.encodingparams)
def set_storageindex(self, storageindex):
self.storageindex = storageindex
self._storageindex_osol.fire(self.storageindex)
def write(self, data):
precondition(data) # please don't write empty strings
self.bufs.append(data)
self.bufsiz += len(data)
self._satisfy_reads_if_possible()
if self.bufsiz >= self.buflim and self.producer:
self.producer.pauseProducing()
def finish(self):
pass
def close(self):
self._closed_to_pusher = True
# Any reads which haven't been satisfied by now are going to
# have to be satisfied with short reads.
self._satisfy_reads_if_possible()
# methods to satisfy the IEncryptedUploader interface # methods to satisfy the IEncryptedUploader interface
# (From the perspective of an uploader I am an IEncryptedUploadable.) # (From the perspective of an uploader I am an IEncryptedUploadable.)
def set_upload_status(self, upload_status): def set_upload_status(self, upload_status):
self.upload_status = upload_status self.upload_status = upload_status
def get_size(self): def get_size(self):
if hasattr(self, 'size'): # attribute created by self.open() size = self._filenode.get_size()
return defer.succeed(self.size) assert size is not None
else: return defer.succeed(size)
return self._size_osol.when_fired()
def get_all_encoding_parameters(self): def get_all_encoding_parameters(self):
# We have to learn the encoding params from pusher. return defer.succeed(self._encodingparams)
if hasattr(self, 'encodingparams'):
# attribute created by self.set_encodingparams()
return defer.succeed(self.encodingparams)
else:
return self._encodingparams_osol.when_fired()
def read_encrypted(self, length, hash_only): def read_encrypted(self, length, hash_only):
"""Returns a deferred which eventually fired with the requested """Returns a deferred which eventually fires with the requested
ciphertext.""" ciphertext, as a list of strings."""
precondition(length) # please don't ask to read 0 bytes precondition(length) # please don't ask to read 0 bytes
d = defer.Deferred() mc = consumer.MemoryConsumer()
self.next_read_ds.append(d) d = self._filenode.read(mc, self._offset, length)
self.next_read_lens.append(length) self._offset += length
self._satisfy_reads_if_possible() d.addCallback(lambda ign: mc.chunks)
return d return d
def get_storage_index(self): def get_storage_index(self):
# We have to learn the storage index from pusher. return self._filenode.get_storage_index()
if hasattr(self, 'storageindex'): def close(self):
# attribute created by self.set_storageindex() pass
return defer.succeed(self.storageindex)
else:
return self._storageindex.when_fired()

View File

@ -20,7 +20,8 @@ from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE
from allmydata.immutable import layout from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES from pycryptopp.cipher.aes import AES
@ -1234,7 +1235,8 @@ class AssistedUploader:
return self._upload_status return self._upload_status
class BaseUploadable: class BaseUploadable:
default_max_segment_size = 128*KiB # overridden by max_segment_size # this is overridden by max_segment_size
default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
default_encoding_param_k = 3 # overridden by encoding_parameters default_encoding_param_k = 3 # overridden by encoding_parameters
default_encoding_param_happy = 7 default_encoding_param_happy = 7
default_encoding_param_n = 10 default_encoding_param_n = 10