storage: wrap buckets in a local proxy

This will make it easier to change RIBucketWriter in the future to reduce the wire
protocol to just open/write(offset,data)/close, and do all the structuring on the
client end. The ultimate goal is to store each bucket in a single file, to reduce
the considerable filesystem-quantization/inode overhead on the storage servers.
This commit is contained in:
Brian Warner 2007-07-08 23:27:46 -07:00
parent 8891544d6c
commit dce1dc2730
6 changed files with 194 additions and 76 deletions

View File

@ -7,7 +7,7 @@ from twisted.application import service
from allmydata.util import idlib, mathutil, hashutil from allmydata.util import idlib, mathutil, hashutil
from allmydata.util.assertutil import _assert from allmydata.util.assertutil import _assert
from allmydata import codec, hashtree from allmydata import codec, hashtree, storageserver
from allmydata.Crypto.Cipher import AES from allmydata.Crypto.Cipher import AES
from allmydata.uri import unpack_uri, unpack_extension from allmydata.uri import unpack_uri, unpack_extension
from allmydata.interfaces import IDownloadTarget, IDownloader from allmydata.interfaces import IDownloadTarget, IDownloader
@ -109,7 +109,7 @@ class ValidatedBucket:
# of the share hash tree to validate it from our share hash up to the # of the share hash tree to validate it from our share hash up to the
# hashroot. # hashroot.
if not self._share_hash: if not self._share_hash:
d1 = self.bucket.callRemote('get_share_hashes') d1 = self.bucket.get_share_hashes()
else: else:
d1 = defer.succeed(None) d1 = defer.succeed(None)
@ -117,12 +117,12 @@ class ValidatedBucket:
# validate the requested block up to the share hash # validate the requested block up to the share hash
needed = self.block_hash_tree.needed_hashes(blocknum) needed = self.block_hash_tree.needed_hashes(blocknum)
if needed: if needed:
# TODO: get fewer hashes, callRemote('get_block_hashes', needed) # TODO: get fewer hashes, use get_block_hashes(needed)
d2 = self.bucket.callRemote('get_block_hashes') d2 = self.bucket.get_block_hashes()
else: else:
d2 = defer.succeed([]) d2 = defer.succeed([])
d3 = self.bucket.callRemote('get_block', blocknum) d3 = self.bucket.get_block(blocknum)
d = defer.gatherResults([d1, d2, d3]) d = defer.gatherResults([d1, d2, d3])
d.addCallback(self._got_data, blocknum) d.addCallback(self._got_data, blocknum)
@ -321,8 +321,9 @@ class FileDownloader:
def _got_response(self, buckets, connection): def _got_response(self, buckets, connection):
_assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint _assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint
for sharenum, bucket in buckets.iteritems(): for sharenum, bucket in buckets.iteritems():
self.add_share_bucket(sharenum, bucket) b = storageserver.ReadBucketProxy(bucket)
self._uri_extension_sources.append(bucket) self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b)
def add_share_bucket(self, sharenum, bucket): def add_share_bucket(self, sharenum, bucket):
# this is split out for the benefit of test_encode.py # this is split out for the benefit of test_encode.py
@ -379,7 +380,8 @@ class FileDownloader:
"%s" % name) "%s" % name)
bucket = sources[0] bucket = sources[0]
sources = sources[1:] sources = sources[1:]
d = bucket.callRemote(methname, *args) #d = bucket.callRemote(methname, *args)
d = getattr(bucket, methname)(*args)
d.addCallback(validatorfunc, bucket) d.addCallback(validatorfunc, bucket)
def _bad(f): def _bad(f):
log.msg("%s from vbucket %s failed: %s" % (name, bucket, f)) # WEIRD log.msg("%s from vbucket %s failed: %s" % (name, bucket, f)) # WEIRD

View File

@ -9,7 +9,7 @@ from allmydata.Crypto.Cipher import AES
from allmydata.util import mathutil, hashutil from allmydata.util import mathutil, hashutil
from allmydata.util.assertutil import _assert from allmydata.util.assertutil import _assert
from allmydata.codec import CRSEncoder from allmydata.codec import CRSEncoder
from allmydata.interfaces import IEncoder from allmydata.interfaces import IEncoder, IStorageBucketWriter
""" """
@ -158,6 +158,7 @@ class Encoder(object):
for k in landlords: for k in landlords:
# it would be nice to: # it would be nice to:
#assert RIBucketWriter.providedBy(landlords[k]) #assert RIBucketWriter.providedBy(landlords[k])
assert IStorageBucketWriter(landlords[k])
pass pass
self.landlords = landlords.copy() self.landlords = landlords.copy()
@ -307,7 +308,7 @@ class Encoder(object):
if shareid not in self.landlords: if shareid not in self.landlords:
return defer.succeed(None) return defer.succeed(None)
sh = self.landlords[shareid] sh = self.landlords[shareid]
d = sh.callRemote("put_block", segment_num, subshare) d = sh.put_block(segment_num, subshare)
d.addErrback(self._remove_shareholder, shareid, d.addErrback(self._remove_shareholder, shareid,
"segnum=%d" % segment_num) "segnum=%d" % segment_num)
return d return d
@ -356,7 +357,7 @@ class Encoder(object):
if shareid not in self.landlords: if shareid not in self.landlords:
return defer.succeed(None) return defer.succeed(None)
sh = self.landlords[shareid] sh = self.landlords[shareid]
d = sh.callRemote("put_plaintext_hashes", all_hashes) d = sh.put_plaintext_hashes(all_hashes)
d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes") d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes")
return d return d
@ -374,7 +375,7 @@ class Encoder(object):
if shareid not in self.landlords: if shareid not in self.landlords:
return defer.succeed(None) return defer.succeed(None)
sh = self.landlords[shareid] sh = self.landlords[shareid]
d = sh.callRemote("put_crypttext_hashes", all_hashes) d = sh.put_crypttext_hashes(all_hashes)
d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes") d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
return d return d
@ -397,7 +398,7 @@ class Encoder(object):
if shareid not in self.landlords: if shareid not in self.landlords:
return defer.succeed(None) return defer.succeed(None)
sh = self.landlords[shareid] sh = self.landlords[shareid]
d = sh.callRemote("put_block_hashes", all_hashes) d = sh.put_block_hashes(all_hashes)
d.addErrback(self._remove_shareholder, shareid, "put_block_hashes") d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
return d return d
@ -427,7 +428,7 @@ class Encoder(object):
if shareid not in self.landlords: if shareid not in self.landlords:
return defer.succeed(None) return defer.succeed(None)
sh = self.landlords[shareid] sh = self.landlords[shareid]
d = sh.callRemote("put_share_hashes", needed_hashes) d = sh.put_share_hashes(needed_hashes)
d.addErrback(self._remove_shareholder, shareid, "put_share_hashes") d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
return d return d
@ -442,7 +443,7 @@ class Encoder(object):
def send_uri_extension(self, shareid, uri_extension): def send_uri_extension(self, shareid, uri_extension):
sh = self.landlords[shareid] sh = self.landlords[shareid]
d = sh.callRemote("put_uri_extension", uri_extension) d = sh.put_uri_extension(uri_extension)
d.addErrback(self._remove_shareholder, shareid, "put_uri_extension") d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
return d return d
@ -450,7 +451,7 @@ class Encoder(object):
log.msg("%s: closing shareholders" % self) log.msg("%s: closing shareholders" % self)
dl = [] dl = []
for shareid in self.landlords: for shareid in self.landlords:
d = self.landlords[shareid].callRemote("close") d = self.landlords[shareid].close()
d.addErrback(self._remove_shareholder, shareid, "close") d.addErrback(self._remove_shareholder, shareid, "close")
dl.append(d) dl.append(d)
return self._gather_responses(dl) return self._gather_responses(dl)

View File

@ -119,6 +119,42 @@ class RIStorageServer(RemoteInterface):
def get_buckets(storage_index=StorageIndex): def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
class IStorageBucketWriter(Interface):
def put_block(segmentnum, data):
pass
def put_plaintext_hashes(hashes):
pass
def put_crypttext_hashes(hashes):
pass
def put_block_hashes(blockhashes):
pass
def put_share_hashes(sharehashes):
pass
def put_uri_extension(data):
pass
def close():
pass
class IStorageBucketReader(Interface):
def get_block(blocknum):
pass
def get_plaintext_hashes():
pass
def get_crypttext_hashes():
pass
def get_block_hashes():
pass
def get_share_hashes():
pass
def get_uri_extension():
pass
# hm, we need a solution for forward references in schemas # hm, we need a solution for forward references in schemas
from foolscap.schema import Any from foolscap.schema import Any
RIMutableDirectoryNode_ = Any() # TODO: how can we avoid this? RIMutableDirectoryNode_ = Any() # TODO: how can we avoid this?

View File

@ -5,7 +5,7 @@ from twisted.application import service
from zope.interface import implements from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader RIBucketReader, IStorageBucketWriter, IStorageBucketReader
from allmydata import interfaces from allmydata import interfaces
from allmydata.util import bencode, fileutil, idlib from allmydata.util import bencode, fileutil, idlib
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
@ -203,3 +203,44 @@ class StorageServer(service.MultiService, Referenceable):
pass pass
return bucketreaders return bucketreaders
class WriteBucketProxy:
implements(IStorageBucketWriter)
def __init__(self, rref):
self._rref = rref
def put_block(self, segmentnum, data):
return self._rref.callRemote("put_block", segmentnum, data)
def put_plaintext_hashes(self, hashes):
return self._rref.callRemote("put_plaintext_hashes", hashes)
def put_crypttext_hashes(self, hashes):
return self._rref.callRemote("put_crypttext_hashes", hashes)
def put_block_hashes(self, blockhashes):
return self._rref.callRemote("put_block_hashes", blockhashes)
def put_share_hashes(self, sharehashes):
return self._rref.callRemote("put_share_hashes", sharehashes)
def put_uri_extension(self, data):
return self._rref.callRemote("put_uri_extension", data)
def close(self):
return self._rref.callRemote("close")
class ReadBucketProxy:
implements(IStorageBucketReader)
def __init__(self, rref):
self._rref = rref
def get_block(self, blocknum):
return self._rref.callRemote("get_block", blocknum)
def get_plaintext_hashes(self):
return self._rref.callRemote("get_plaintext_hashes")
def get_crypttext_hashes(self):
return self._rref.callRemote("get_crypttext_hashes")
def get_block_hashes(self):
return self._rref.callRemote("get_block_hashes")
def get_share_hashes(self):
return self._rref.callRemote("get_share_hashes")
def get_uri_extension(self):
return self._rref.callRemote("get_uri_extension")

View File

@ -1,4 +1,5 @@
from zope.interface import implements
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -7,6 +8,7 @@ from allmydata import encode, download, hashtree
from allmydata.util import hashutil from allmydata.util import hashutil
from allmydata.uri import pack_uri from allmydata.uri import pack_uri
from allmydata.Crypto.Cipher import AES from allmydata.Crypto.Cipher import AES
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
from cStringIO import StringIO from cStringIO import StringIO
class FakePeer: class FakePeer:
@ -48,6 +50,7 @@ def flip_bit(good): # flips the last bit
return good[:-1] + chr(ord(good[-1]) ^ 0x01) return good[:-1] + chr(ord(good[-1]) ^ 0x01)
class FakeBucketWriter: class FakeBucketWriter:
implements(IStorageBucketWriter, IStorageBucketReader)
# these are used for both reading and writing # these are used for both reading and writing
def __init__(self, mode="good"): def __init__(self, mode="good"):
self.mode = mode self.mode = mode
@ -59,75 +62,105 @@ class FakeBucketWriter:
self.closed = False self.closed = False
def callRemote(self, methname, *args, **kwargs): def callRemote(self, methname, *args, **kwargs):
# this allows FakeBucketWriter to be used either as an
# IStorageBucketWriter or as the remote reference that it wraps. This
# should be cleaned up eventually when we change RIBucketWriter to
# have just write(offset, data) and close()
def _call(): def _call():
meth = getattr(self, methname) meth = getattr(self, methname)
return meth(*args, **kwargs) return meth(*args, **kwargs)
return defer.maybeDeferred(_call) d = eventual.fireEventually()
d.addCallback(lambda res: _call())
return d
def put_block(self, segmentnum, data): def put_block(self, segmentnum, data):
def _try():
assert not self.closed assert not self.closed
assert segmentnum not in self.blocks assert segmentnum not in self.blocks
if self.mode == "lost" and segmentnum >= 1: if self.mode == "lost" and segmentnum >= 1:
raise LostPeerError("I'm going away now") raise LostPeerError("I'm going away now")
self.blocks[segmentnum] = data self.blocks[segmentnum] = data
return defer.maybeDeferred(_try)
def put_plaintext_hashes(self, hashes): def put_plaintext_hashes(self, hashes):
def _try():
assert not self.closed assert not self.closed
assert self.plaintext_hashes is None assert self.plaintext_hashes is None
self.plaintext_hashes = hashes self.plaintext_hashes = hashes
return defer.maybeDeferred(_try)
def put_crypttext_hashes(self, hashes): def put_crypttext_hashes(self, hashes):
def _try():
assert not self.closed assert not self.closed
assert self.crypttext_hashes is None assert self.crypttext_hashes is None
self.crypttext_hashes = hashes self.crypttext_hashes = hashes
return defer.maybeDeferred(_try)
def put_block_hashes(self, blockhashes): def put_block_hashes(self, blockhashes):
def _try():
assert not self.closed assert not self.closed
assert self.block_hashes is None assert self.block_hashes is None
self.block_hashes = blockhashes self.block_hashes = blockhashes
return defer.maybeDeferred(_try)
def put_share_hashes(self, sharehashes): def put_share_hashes(self, sharehashes):
def _try():
assert not self.closed assert not self.closed
assert self.share_hashes is None assert self.share_hashes is None
self.share_hashes = sharehashes self.share_hashes = sharehashes
return defer.maybeDeferred(_try)
def put_uri_extension(self, uri_extension): def put_uri_extension(self, uri_extension):
def _try():
assert not self.closed assert not self.closed
self.uri_extension = uri_extension self.uri_extension = uri_extension
return defer.maybeDeferred(_try)
def close(self): def close(self):
def _try():
assert not self.closed assert not self.closed
self.closed = True self.closed = True
return defer.maybeDeferred(_try)
def get_block(self, blocknum): def get_block(self, blocknum):
def _try():
assert isinstance(blocknum, (int, long)) assert isinstance(blocknum, (int, long))
if self.mode == "bad block": if self.mode == "bad block":
return flip_bit(self.blocks[blocknum]) return flip_bit(self.blocks[blocknum])
return self.blocks[blocknum] return self.blocks[blocknum]
return defer.maybeDeferred(_try)
def get_plaintext_hashes(self): def get_plaintext_hashes(self):
def _try():
hashes = self.plaintext_hashes[:] hashes = self.plaintext_hashes[:]
if self.mode == "bad plaintext hashroot": if self.mode == "bad plaintext hashroot":
hashes[0] = flip_bit(hashes[0]) hashes[0] = flip_bit(hashes[0])
if self.mode == "bad plaintext hash": if self.mode == "bad plaintext hash":
hashes[1] = flip_bit(hashes[1]) hashes[1] = flip_bit(hashes[1])
return hashes return hashes
return defer.maybeDeferred(_try)
def get_crypttext_hashes(self): def get_crypttext_hashes(self):
def _try():
hashes = self.crypttext_hashes[:] hashes = self.crypttext_hashes[:]
if self.mode == "bad crypttext hashroot": if self.mode == "bad crypttext hashroot":
hashes[0] = flip_bit(hashes[0]) hashes[0] = flip_bit(hashes[0])
if self.mode == "bad crypttext hash": if self.mode == "bad crypttext hash":
hashes[1] = flip_bit(hashes[1]) hashes[1] = flip_bit(hashes[1])
return hashes return hashes
return defer.maybeDeferred(_try)
def get_block_hashes(self): def get_block_hashes(self):
def _try():
if self.mode == "bad blockhash": if self.mode == "bad blockhash":
hashes = self.block_hashes[:] hashes = self.block_hashes[:]
hashes[1] = flip_bit(hashes[1]) hashes[1] = flip_bit(hashes[1])
return hashes return hashes
return self.block_hashes return self.block_hashes
return defer.maybeDeferred(_try)
def get_share_hashes(self): def get_share_hashes(self):
def _try():
if self.mode == "bad sharehash": if self.mode == "bad sharehash":
hashes = self.share_hashes[:] hashes = self.share_hashes[:]
hashes[1] = (hashes[1][0], flip_bit(hashes[1][1])) hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
@ -138,11 +171,14 @@ class FakeBucketWriter:
# download.py is supposed to guard against this case. # download.py is supposed to guard against this case.
return [] return []
return self.share_hashes return self.share_hashes
return defer.maybeDeferred(_try)
def get_uri_extension(self): def get_uri_extension(self):
def _try():
if self.mode == "bad uri_extension": if self.mode == "bad uri_extension":
return flip_bit(self.uri_extension) return flip_bit(self.uri_extension)
return self.uri_extension return self.uri_extension
return defer.maybeDeferred(_try)
def make_data(length): def make_data(length):

View File

@ -5,7 +5,7 @@ from twisted.application import service
from foolscap import Referenceable from foolscap import Referenceable
from allmydata.util import idlib, hashutil from allmydata.util import idlib, hashutil
from allmydata import encode from allmydata import encode, storageserver
from allmydata.uri import pack_uri from allmydata.uri import pack_uri
from allmydata.interfaces import IUploadable, IUploader from allmydata.interfaces import IUploadable, IUploader
from allmydata.Crypto.Cipher import AES from allmydata.Crypto.Cipher import AES
@ -53,8 +53,10 @@ class PeerTracker:
def _got_reply(self, (alreadygot, buckets)): def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
self.buckets.update(buckets) b = dict( [ (sharenum, storageserver.WriteBucketProxy(rref))
return (alreadygot, set(buckets.keys())) for sharenum, rref in buckets.iteritems() ] )
self.buckets.update(b)
return (alreadygot, set(b.keys()))
class FileUploader: class FileUploader: