storageserver: implement size limits. No code to enable them yet, though
This commit is contained in:
parent
9ddb929651
commit
c80ea7d693
|
@ -1,4 +1,4 @@
|
||||||
import os, re
|
import os, re, weakref
|
||||||
|
|
||||||
from foolscap import Referenceable
|
from foolscap import Referenceable
|
||||||
from twisted.application import service
|
from twisted.application import service
|
||||||
|
@ -25,15 +25,20 @@ NUM_RE=re.compile("[0-9]*")
|
||||||
class BucketWriter(Referenceable):
|
class BucketWriter(Referenceable):
|
||||||
implements(RIBucketWriter)
|
implements(RIBucketWriter)
|
||||||
|
|
||||||
def __init__(self, incominghome, finalhome, blocksize):
|
def __init__(self, ss, incominghome, finalhome, blocksize, sharesize):
|
||||||
|
self.ss = ss
|
||||||
self.incominghome = incominghome
|
self.incominghome = incominghome
|
||||||
self.finalhome = finalhome
|
self.finalhome = finalhome
|
||||||
self.blocksize = blocksize
|
self.blocksize = blocksize
|
||||||
|
self.sharesize = sharesize
|
||||||
self.closed = False
|
self.closed = False
|
||||||
self._next_segnum = 0
|
self._next_segnum = 0
|
||||||
fileutil.make_dirs(incominghome)
|
fileutil.make_dirs(incominghome)
|
||||||
self._write_file('blocksize', str(blocksize))
|
self._write_file('blocksize', str(blocksize))
|
||||||
|
|
||||||
|
def allocated_size(self):
|
||||||
|
return self.sharesize
|
||||||
|
|
||||||
def _write_file(self, fname, data):
|
def _write_file(self, fname, data):
|
||||||
open(os.path.join(self.incominghome, fname), 'wb').write(data)
|
open(os.path.join(self.incominghome, fname), 'wb').write(data)
|
||||||
|
|
||||||
|
@ -87,6 +92,7 @@ class BucketWriter(Referenceable):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
self.closed = True
|
self.closed = True
|
||||||
|
self.ss.bucket_writer_closed(self)
|
||||||
|
|
||||||
def str2l(s):
|
def str2l(s):
|
||||||
""" split string (pulled from storage) into a list of blockids """
|
""" split string (pulled from storage) into a list of blockids """
|
||||||
|
@ -128,39 +134,70 @@ class StorageServer(service.MultiService, Referenceable):
|
||||||
implements(RIStorageServer)
|
implements(RIStorageServer)
|
||||||
name = 'storageserver'
|
name = 'storageserver'
|
||||||
|
|
||||||
def __init__(self, storedir):
|
def __init__(self, storedir, sizelimit=None):
|
||||||
|
service.MultiService.__init__(self)
|
||||||
fileutil.make_dirs(storedir)
|
fileutil.make_dirs(storedir)
|
||||||
self.storedir = storedir
|
self.storedir = storedir
|
||||||
|
self.sizelimit = sizelimit
|
||||||
self.incomingdir = os.path.join(storedir, 'incoming')
|
self.incomingdir = os.path.join(storedir, 'incoming')
|
||||||
self._clean_incomplete()
|
self._clean_incomplete()
|
||||||
fileutil.make_dirs(self.incomingdir)
|
fileutil.make_dirs(self.incomingdir)
|
||||||
|
self._active_writers = weakref.WeakKeyDictionary()
|
||||||
|
|
||||||
service.MultiService.__init__(self)
|
self.measure_size()
|
||||||
|
|
||||||
def _clean_incomplete(self):
|
def _clean_incomplete(self):
|
||||||
fileutil.rm_dir(self.incomingdir)
|
fileutil.rm_dir(self.incomingdir)
|
||||||
|
|
||||||
|
def measure_size(self):
|
||||||
|
self.consumed = fileutil.du(self.storedir)
|
||||||
|
|
||||||
|
def allocated_size(self):
|
||||||
|
space = self.consumed
|
||||||
|
for bw in self._active_writers:
|
||||||
|
space += bw.allocated_size()
|
||||||
|
return space
|
||||||
|
|
||||||
def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
|
def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
|
||||||
blocksize, canary):
|
blocksize, canary):
|
||||||
alreadygot = set()
|
alreadygot = set()
|
||||||
bucketwriters = {} # k: shnum, v: BucketWriter
|
bucketwriters = {} # k: shnum, v: BucketWriter
|
||||||
|
si_s = idlib.b2a(storage_index)
|
||||||
|
space_per_bucket = sharesize
|
||||||
|
no_limits = self.sizelimit is None
|
||||||
|
yes_limits = not no_limits
|
||||||
|
if yes_limits:
|
||||||
|
remaining_space = self.sizelimit - self.allocated_size()
|
||||||
for shnum in sharenums:
|
for shnum in sharenums:
|
||||||
incominghome = os.path.join(self.incomingdir, idlib.b2a(storage_index), "%d"%shnum)
|
incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
|
||||||
finalhome = os.path.join(self.storedir, idlib.b2a(storage_index), "%d"%shnum)
|
finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
|
||||||
if os.path.exists(incominghome) or os.path.exists(finalhome):
|
if os.path.exists(incominghome) or os.path.exists(finalhome):
|
||||||
alreadygot.add(shnum)
|
alreadygot.add(shnum)
|
||||||
|
elif no_limits or remaining_space >= space_per_bucket:
|
||||||
|
bw = BucketWriter(self, incominghome, finalhome,
|
||||||
|
blocksize, space_per_bucket)
|
||||||
|
bucketwriters[shnum] = bw
|
||||||
|
self._active_writers[bw] = 1
|
||||||
|
if yes_limits:
|
||||||
|
remaining_space -= space_per_bucket
|
||||||
else:
|
else:
|
||||||
bucketwriters[shnum] = BucketWriter(incominghome, finalhome, blocksize)
|
# not enough space to accept this bucket
|
||||||
|
pass
|
||||||
|
|
||||||
return alreadygot, bucketwriters
|
return alreadygot, bucketwriters
|
||||||
|
|
||||||
|
def bucket_writer_closed(self, bw):
|
||||||
|
self.consumed += bw.allocated_size()
|
||||||
|
del self._active_writers[bw]
|
||||||
|
|
||||||
def remote_get_buckets(self, storage_index):
|
def remote_get_buckets(self, storage_index):
|
||||||
bucketreaders = {} # k: sharenum, v: BucketReader
|
bucketreaders = {} # k: sharenum, v: BucketReader
|
||||||
storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
|
storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
|
||||||
try:
|
try:
|
||||||
for f in os.listdir(storagedir):
|
for f in os.listdir(storagedir):
|
||||||
if NUM_RE.match(f):
|
if NUM_RE.match(f):
|
||||||
bucketreaders[int(f)] = BucketReader(os.path.join(storagedir, f))
|
br = BucketReader(os.path.join(storagedir, f))
|
||||||
|
bucketreaders[int(f)] = br
|
||||||
except OSError:
|
except OSError:
|
||||||
# Commonly caused by there being no buckets at all.
|
# Commonly caused by there being no buckets at all.
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -16,9 +16,12 @@ class Bucket(unittest.TestCase):
|
||||||
fileutil.make_dirs(basedir)
|
fileutil.make_dirs(basedir)
|
||||||
return incoming, final
|
return incoming, final
|
||||||
|
|
||||||
|
def bucket_writer_closed(self, bw):
|
||||||
|
pass
|
||||||
|
|
||||||
def test_create(self):
|
def test_create(self):
|
||||||
incoming, final = self.make_workdir("test_create")
|
incoming, final = self.make_workdir("test_create")
|
||||||
bw = storageserver.BucketWriter(incoming, final, 25)
|
bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
|
||||||
bw.remote_put_block(0, "a"*25)
|
bw.remote_put_block(0, "a"*25)
|
||||||
bw.remote_put_block(1, "b"*25)
|
bw.remote_put_block(1, "b"*25)
|
||||||
bw.remote_put_block(2, "c"*7) # last block may be short
|
bw.remote_put_block(2, "c"*7) # last block may be short
|
||||||
|
@ -26,7 +29,7 @@ class Bucket(unittest.TestCase):
|
||||||
|
|
||||||
def test_readwrite(self):
|
def test_readwrite(self):
|
||||||
incoming, final = self.make_workdir("test_readwrite")
|
incoming, final = self.make_workdir("test_readwrite")
|
||||||
bw = storageserver.BucketWriter(incoming, final, 25)
|
bw = storageserver.BucketWriter(self, incoming, final, 25, 57)
|
||||||
bw.remote_put_block(0, "a"*25)
|
bw.remote_put_block(0, "a"*25)
|
||||||
bw.remote_put_block(1, "b"*25)
|
bw.remote_put_block(1, "b"*25)
|
||||||
bw.remote_put_block(2, "c"*7) # last block may be short
|
bw.remote_put_block(2, "c"*7) # last block may be short
|
||||||
|
@ -52,12 +55,12 @@ class Server(unittest.TestCase):
|
||||||
return self.sparent.stopService()
|
return self.sparent.stopService()
|
||||||
|
|
||||||
def workdir(self, name):
|
def workdir(self, name):
|
||||||
basedir = os.path.join("test_storage", "Server", name)
|
basedir = os.path.join("storage", "Server", name)
|
||||||
return basedir
|
return basedir
|
||||||
|
|
||||||
def create(self, name):
|
def create(self, name, sizelimit=None):
|
||||||
workdir = self.workdir(name)
|
workdir = self.workdir(name)
|
||||||
ss = storageserver.StorageServer(workdir)
|
ss = storageserver.StorageServer(workdir, sizelimit)
|
||||||
ss.setServiceParent(self.sparent)
|
ss.setServiceParent(self.sparent)
|
||||||
return ss
|
return ss
|
||||||
|
|
||||||
|
@ -104,3 +107,48 @@ class Server(unittest.TestCase):
|
||||||
self.failUnlessEqual(already, set([2,3,4]))
|
self.failUnlessEqual(already, set([2,3,4]))
|
||||||
self.failUnlessEqual(set(writers.keys()), set([5]))
|
self.failUnlessEqual(set(writers.keys()), set([5]))
|
||||||
|
|
||||||
|
def test_sizelimits(self):
|
||||||
|
ss = self.create("test_sizelimits", 100)
|
||||||
|
canary = Referenceable()
|
||||||
|
|
||||||
|
already,writers = ss.remote_allocate_buckets("vid1", [0,1,2],
|
||||||
|
25, 5, canary)
|
||||||
|
self.failUnlessEqual(len(writers), 3)
|
||||||
|
# now the StorageServer should have 75 bytes provisionally allocated,
|
||||||
|
# allowing only 25 more to be claimed
|
||||||
|
|
||||||
|
already2,writers2 = ss.remote_allocate_buckets("vid2", [0,1,2],
|
||||||
|
25, 5, canary)
|
||||||
|
self.failUnlessEqual(len(writers2), 1)
|
||||||
|
|
||||||
|
# we abandon the first set, so their provisional allocation should be
|
||||||
|
# returned
|
||||||
|
del already
|
||||||
|
del writers
|
||||||
|
|
||||||
|
# and we close the second set, so their provisional allocation should
|
||||||
|
# become real, long-term allocation
|
||||||
|
for bw in writers2.values():
|
||||||
|
bw.remote_close()
|
||||||
|
del already2
|
||||||
|
del writers2
|
||||||
|
del bw
|
||||||
|
|
||||||
|
# now there should be 25 bytes allocated, and 75 free
|
||||||
|
already3,writers3 = ss.remote_allocate_buckets("vid3", [0,1,2,3],
|
||||||
|
25, 5, canary)
|
||||||
|
self.failUnlessEqual(len(writers3), 3)
|
||||||
|
|
||||||
|
del already3
|
||||||
|
del writers3
|
||||||
|
ss.disownServiceParent()
|
||||||
|
del ss
|
||||||
|
|
||||||
|
# creating a new StorageServer in the same directory should see the
|
||||||
|
# same usage. note that metadata will be counted at startup but not
|
||||||
|
# during runtime, so if we were creating any metadata, the allocation
|
||||||
|
# would be more than 25 bytes and this test would need to be changed.
|
||||||
|
ss = self.create("test_sizelimits", 100)
|
||||||
|
already4,writers4 = ss.remote_allocate_buckets("vid4", [0,1,2,3],
|
||||||
|
25, 5, canary)
|
||||||
|
self.failUnlessEqual(len(writers4), 3)
|
||||||
|
|
Loading…
Reference in New Issue