deletion phase1: send renew/cancel-lease secrets, but my_secret is fake, and the StorageServer discards them

This commit is contained in:
Brian Warner 2007-08-27 17:28:51 -07:00
parent ed525f7478
commit 739ae1ccde
6 changed files with 76 additions and 15 deletions

View File

@ -16,6 +16,9 @@ URI = StringConstraint(300) # kind of arbitrary
MAX_BUCKETS = 200 # per peer MAX_BUCKETS = 200 # per peer
ShareData = StringConstraint(400000) # 1MB segment / k=3 = 334kB ShareData = StringConstraint(400000) # 1MB segment / k=3 = 334kB
URIExtensionData = StringConstraint(1000) URIExtensionData = StringConstraint(1000)
LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
class RIIntroducerClient(RemoteInterface): class RIIntroducerClient(RemoteInterface):
def new_peers(furls=SetOf(FURL)): def new_peers(furls=SetOf(FURL)):
@ -79,12 +82,26 @@ class RIBucketReader(RemoteInterface):
class RIStorageServer(RemoteInterface): class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex, def allocate_buckets(storage_index=StorageIndex,
renew_secret=LeaseRenewSecret,
cancel_secret=LeaseCancelSecret,
sharenums=SetOf(int, maxLength=MAX_BUCKETS), sharenums=SetOf(int, maxLength=MAX_BUCKETS),
allocated_size=int, canary=Referenceable): allocated_size=int, canary=Referenceable):
""" """
@param canary: If the canary is lost before close(), the bucket is deleted. @param storage_index: the index of the bucket to be created or
increfed.
@param sharenums: these are the share numbers (probably between 0 and
99) that the sender is proposing to store on this
server.
@param renew_secret: This is the secret used to protect bucket refresh
This secret is generated by the client and
stored for later comparison by the server. Each
server is given a different secret.
@param cancel_secret: Like renew_secret, but protects bucket decref.
@param canary: If the canary is lost before close(), the bucket is
deleted.
@return: tuple of (alreadygot, allocated), where alreadygot is what we @return: tuple of (alreadygot, allocated), where alreadygot is what we
already have and is what we hereby agree to accept already have and is what we hereby agree to accept. New
leases are added for shares in both lists.
""" """
return TupleOf(SetOf(int, maxLength=MAX_BUCKETS), return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS)) DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))

View File

@ -98,7 +98,9 @@ class StorageServer(service.MultiService, Referenceable):
space += bw.allocated_size() space += bw.allocated_size()
return space return space
def remote_allocate_buckets(self, storage_index, sharenums, allocated_size, def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary): canary):
alreadygot = set() alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter bucketwriters = {} # k: shnum, v: BucketWriter

View File

@ -10,6 +10,9 @@ from allmydata.util import fileutil, hashutil
from allmydata.storage import BucketWriter, BucketReader, \ from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer WriteBucketProxy, ReadBucketProxy, StorageServer
RS = hashutil.tagged_hash("blah", "foo")
CS = RS
class Bucket(unittest.TestCase): class Bucket(unittest.TestCase):
def make_workdir(self, name): def make_workdir(self, name):
@ -186,7 +189,7 @@ class Server(unittest.TestCase):
self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
canary = Referenceable() canary = Referenceable()
already,writers = ss.remote_allocate_buckets("vid", [0,1,2], already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2],
75, canary) 75, canary)
self.failUnlessEqual(already, set()) self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2])) self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
@ -205,7 +208,7 @@ class Server(unittest.TestCase):
# now if we about writing again, the server should offer those three # now if we about writing again, the server should offer those three
# buckets as already present # buckets as already present
already,writers = ss.remote_allocate_buckets("vid", [0,1,2,3,4], already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2,3,4],
75, canary) 75, canary)
self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4])) self.failUnlessEqual(set(writers.keys()), set([3,4]))
@ -214,7 +217,7 @@ class Server(unittest.TestCase):
# tell new uploaders that they already exist (so that we don't try to # tell new uploaders that they already exist (so that we don't try to
# upload into them a second time) # upload into them a second time)
already,writers = ss.remote_allocate_buckets("vid", [2,3,4,5], already,writers = ss.remote_allocate_buckets("vid", RS, CS, [2,3,4,5],
75, canary) 75, canary)
self.failUnlessEqual(already, set([2,3,4])) self.failUnlessEqual(already, set([2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5])) self.failUnlessEqual(set(writers.keys()), set([5]))
@ -223,14 +226,14 @@ class Server(unittest.TestCase):
ss = self.create("test_sizelimits", 100) ss = self.create("test_sizelimits", 100)
canary = Referenceable() canary = Referenceable()
already,writers = ss.remote_allocate_buckets("vid1", [0,1,2], already,writers = ss.remote_allocate_buckets("vid1", RS, CS, [0,1,2],
25, canary) 25, canary)
self.failUnlessEqual(len(writers), 3) self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 75 bytes provisionally allocated, # now the StorageServer should have 75 bytes provisionally allocated,
# allowing only 25 more to be claimed # allowing only 25 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3) self.failUnlessEqual(len(ss._active_writers), 3)
already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2], already2,writers2 = ss.remote_allocate_buckets("vid2", RS, CS, [0,1,2],
25, canary) 25, canary)
self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4) self.failUnlessEqual(len(ss._active_writers), 4)
@ -252,7 +255,8 @@ class Server(unittest.TestCase):
self.failUnlessEqual(len(ss._active_writers), 0) self.failUnlessEqual(len(ss._active_writers), 0)
# now there should be 25 bytes allocated, and 75 free # now there should be 25 bytes allocated, and 75 free
already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3], already3,writers3 = ss.remote_allocate_buckets("vid3", RS, CS,
[0,1,2,3],
25, canary) 25, canary)
self.failUnlessEqual(len(writers3), 3) self.failUnlessEqual(len(writers3), 3)
self.failUnlessEqual(len(ss._active_writers), 3) self.failUnlessEqual(len(ss._active_writers), 3)
@ -268,7 +272,8 @@ class Server(unittest.TestCase):
# during runtime, so if we were creating any metadata, the allocation # during runtime, so if we were creating any metadata, the allocation
# would be more than 25 bytes and this test would need to be changed. # would be more than 25 bytes and this test would need to be changed.
ss = self.create("test_sizelimits", 100) ss = self.create("test_sizelimits", 100)
already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3], already4,writers4 = ss.remote_allocate_buckets("vid4",
RS, CS, [0,1,2,3],
25, canary) 25, canary)
self.failUnlessEqual(len(writers4), 3) self.failUnlessEqual(len(writers4), 3)
self.failUnlessEqual(len(ss._active_writers), 3) self.failUnlessEqual(len(ss._active_writers), 3)

View File

@ -85,8 +85,8 @@ class FakeStorageServer:
d.addCallback(lambda res: _call()) d.addCallback(lambda res: _call())
return d return d
def allocate_buckets(self, crypttext_hash, sharenums, def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
share_size, canary): sharenums, share_size, canary):
#print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size) #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
if self.mode == "full": if self.mode == "full":
return (set(), {},) return (set(), {},)

View File

@ -33,7 +33,7 @@ EXTENSION_SIZE = 1000
class PeerTracker: class PeerTracker:
def __init__(self, peerid, permutedid, connection, def __init__(self, peerid, permutedid, connection,
sharesize, blocksize, num_segments, num_share_hashes, sharesize, blocksize, num_segments, num_share_hashes,
crypttext_hash): storage_index):
self.peerid = peerid self.peerid = peerid
self.permutedid = permutedid self.permutedid = permutedid
self.connection = connection # to an RIClient self.connection = connection # to an RIClient
@ -49,9 +49,16 @@ class PeerTracker:
self.blocksize = blocksize self.blocksize = blocksize
self.num_segments = num_segments self.num_segments = num_segments
self.num_share_hashes = num_share_hashes self.num_share_hashes = num_share_hashes
self.crypttext_hash = crypttext_hash self.storage_index = storage_index
self._storageserver = None self._storageserver = None
h = hashutil.bucket_renewal_secret_hash
# XXX
self.my_secret = "secret"
self.renew_secret = h(self.my_secret, self.storage_index, self.peerid)
h = hashutil.bucket_cancel_secret_hash
self.cancel_secret = h(self.my_secret, self.storage_index, self.peerid)
def query(self, sharenums): def query(self, sharenums):
if not self._storageserver: if not self._storageserver:
d = self.connection.callRemote("get_service", "storageserver") d = self.connection.callRemote("get_service", "storageserver")
@ -64,7 +71,9 @@ class PeerTracker:
def _query(self, sharenums): def _query(self, sharenums):
#print " query", self.peerid, len(sharenums) #print " query", self.peerid, len(sharenums)
d = self._storageserver.callRemote("allocate_buckets", d = self._storageserver.callRemote("allocate_buckets",
self.crypttext_hash, self.storage_index,
self.renew_secret,
self.cancel_secret,
sharenums, sharenums,
self.allocated_size, self.allocated_size,
canary=Referenceable()) canary=Referenceable())

View File

@ -66,6 +66,34 @@ KEYLEN = 16
def random_key(): def random_key():
return os.urandom(KEYLEN) return os.urandom(KEYLEN)
def file_renewal_secret_hash(my_secret, storage_index):
my_renewal_secret = tagged_hash(my_secret, "bucket_renewal_secret")
file_renewal_secret = tagged_pair_hash("file_renewal_secret",
my_renewal_secret, storage_index)
return file_renewal_secret
def file_cancel_secret_hash(my_secret, storage_index):
my_cancel_secret = tagged_hash(my_secret, "bucket_cancel_secret")
file_cancel_secret = tagged_pair_hash("file_cancel_secret",
my_cancel_secret, storage_index)
return file_cancel_secret
def bucket_renewal_secret_hash(my_secret, storage_index, peerid):
my_renewal_secret = tagged_hash(my_secret, "bucket_renewal_secret")
file_renewal_secret = tagged_pair_hash("file_renewal_secret",
my_renewal_secret, storage_index)
bucket_renewal_secret = tagged_pair_hash("bucket_renewal_secret",
file_renewal_secret, peerid)
return bucket_renewal_secret
def bucket_cancel_secret_hash(my_secret, storage_index, peerid):
my_cancel_secret = tagged_hash(my_secret, "bucket_cancel_secret")
file_cancel_secret = tagged_pair_hash("file_cancel_secret",
my_cancel_secret, storage_index)
bucket_cancel_secret = tagged_pair_hash("bucket_cancel_secret",
file_cancel_secret, peerid)
return bucket_cancel_secret
def dir_write_enabler_hash(write_key): def dir_write_enabler_hash(write_key):
return tagged_hash("allmydata_dir_write_enabler_v1", write_key) return tagged_hash("allmydata_dir_write_enabler_v1", write_key)
def dir_read_key_hash(write_key): def dir_read_key_hash(write_key):