Tue Aug 9 13:39:10 MDT 2011 wilcoxjg@gmail.com * storage: add tests of the new feature of having the storage backend in a separate object from the server Tue Aug 9 14:09:29 MDT 2011 wilcoxjg@gmail.com * Added directories and new modules for the null backend Tue Aug 9 14:12:49 MDT 2011 wilcoxjg@gmail.com * changes to null/core.py and storage/common.py necessary for test with null backend to pass Tue Aug 9 14:16:47 MDT 2011 wilcoxjg@gmail.com * change storage/server.py to new "backend pluggable" version Tue Aug 9 14:18:22 MDT 2011 wilcoxjg@gmail.com * modify null/core.py such that the correct interfaces are implemented Tue Aug 9 14:22:32 MDT 2011 wilcoxjg@gmail.com * make changes to storage/immutable.py most changes are part of movement to DAS specific backend. Tue Aug 9 14:26:20 MDT 2011 wilcoxjg@gmail.com * creates backends/das/core.py Tue Aug 9 14:31:23 MDT 2011 wilcoxjg@gmail.com * change backends/das/core.py to correct which interfaces are implemented Tue Aug 9 14:33:21 MDT 2011 wilcoxjg@gmail.com * util/fileutil.py now expects and manipulates twisted.python.filepath.FilePath objects Tue Aug 9 14:35:19 MDT 2011 wilcoxjg@gmail.com * add expirer.py Tue Aug 9 14:38:11 MDT 2011 wilcoxjg@gmail.com * Changes I have made that aren't necessary for the test_backends.py suite to pass. Tue Aug 9 21:37:51 MDT 2011 wilcoxjg@gmail.com * add __init__.py to backend and core and null Wed Aug 10 11:08:47 MDT 2011 wilcoxjg@gmail.com * whitespace-cleanup Wed Aug 10 11:38:49 MDT 2011 wilcoxjg@gmail.com * das/__init__.py Wed Aug 10 14:10:41 MDT 2011 wilcoxjg@gmail.com * test_backends.py: cleaned whitespace and removed unused variables Mon Aug 29 12:48:34 MDT 2011 wilcoxjg@gmail.com * test_backends.py, backends/das -> backends/disk: renaming backend das to disk Mon Aug 29 15:36:31 MDT 2011 wilcoxjg@gmail.com * disk/core.py: slips past pyflakes without causing errors Mon Aug 29 15:48:16 MDT 2011 wilcoxjg@gmail.com * null/core.py, storage/common.py, storage/immutable.py: pyflaked clean New patches: [storage: add tests of the new feature of having the storage backend in a separate object from the server wilcoxjg@gmail.com**20110809193910 Ignore-this: 72b64dab1a9ce668607a4ece4429e29a ] { addfile ./src/allmydata/test/test_backends.py hunk ./src/allmydata/test/test_backends.py 1 +import os, stat +from twisted.trial import unittest +from allmydata.util.log import msg +from allmydata.test.common_util import ReallyEqualMixin +import mock +# This is the code that we're going to be testing. +from allmydata.storage.server import StorageServer +from allmydata.storage.backends.das.core import DASCore +from allmydata.storage.backends.null.core import NullCore +from allmydata.storage.common import si_si2dir +# The following share file content was generated with +# storage.immutable.ShareFile from Tahoe-LAFS v1.8.2 +# with share data == 'a'. The total size of this input +# is 85 bytes. +shareversionnumber = '\x00\x00\x00\x01' +sharedatalength = '\x00\x00\x00\x01' +numberofleases = '\x00\x00\x00\x01' +shareinputdata = 'a' +ownernumber = '\x00\x00\x00\x00' +renewsecret = 'x'*32 +cancelsecret = 'y'*32 +expirationtime = '\x00(\xde\x80' +nextlease = '' +containerdata = shareversionnumber + sharedatalength + numberofleases +client_data = shareinputdata + ownernumber + renewsecret + \ + cancelsecret + expirationtime + nextlease +share_data = containerdata + client_data +testnodeid = 'testnodeidxxxxxxxxxx' +expiration_policy = {'enabled' : False, + 'mode' : 'age', + 'override_lease_duration' : None, + 'cutoff_date' : None, + 'sharetypes' : None} + + +class MockFileSystem(unittest.TestCase): + """ I simulate a filesystem that the code under test can use. I simulate + just the parts of the filesystem that the current implementation of DAS + backend needs. """ + def setUp(self): + # Make patcher, patch, and make effects for fs using functions. + msg( "%s.setUp()" % (self,)) + self.mockedfilepaths = {} + #keys are pathnames, values are MockFilePath objects. This is necessary because + #MockFilePath behavior sometimes depends on the filesystem. Where it does, + #self.mockedfilepaths has the relevent info. + self.storedir = MockFilePath('teststoredir', self.mockedfilepaths) + self.basedir = self.storedir.child('shares') + self.baseincdir = self.basedir.child('incoming') + self.sharedirfinalname = self.basedir.child('or').child('orsxg5dtorxxeylhmvpws3temv4a') + self.sharedirincomingname = self.baseincdir.child('or').child('orsxg5dtorxxeylhmvpws3temv4a') + self.shareincomingname = self.sharedirincomingname.child('0') + self.sharefinalname = self.sharedirfinalname.child('0') + + self.FilePathFake = mock.patch('allmydata.storage.backends.das.core.FilePath', new = MockFilePath ) + FakePath = self.FilePathFake.__enter__() + + self.BCountingCrawler = mock.patch('allmydata.storage.backends.das.core.BucketCountingCrawler') + FakeBCC = self.BCountingCrawler.__enter__() + FakeBCC.side_effect = self.call_FakeBCC + + self.LeaseCheckingCrawler = mock.patch('allmydata.storage.backends.das.core.LeaseCheckingCrawler') + FakeLCC = self.LeaseCheckingCrawler.__enter__() + FakeLCC.side_effect = self.call_FakeLCC + + self.get_available_space = mock.patch('allmydata.util.fileutil.get_available_space') + GetSpace = self.get_available_space.__enter__() + GetSpace.side_effect = self.call_get_available_space + + self.statforsize = mock.patch('allmydata.storage.backends.das.core.filepath.stat') + getsize = self.statforsize.__enter__() + getsize.side_effect = self.call_statforsize + + def call_FakeBCC(self, StateFile): + return MockBCC() + + def call_FakeLCC(self, StateFile, HistoryFile, ExpirationPolicy): + return MockLCC() + + def call_get_available_space(self, storedir, reservedspace): + # The input vector has an input size of 85. + return 85 - reservedspace + + def call_statforsize(self, fakefpname): + return self.mockedfilepaths[fakefpname].fileobject.size() + + def tearDown(self): + msg( "%s.tearDown()" % (self,)) + FakePath = self.FilePathFake.__exit__() + self.mockedfilepaths = {} + + +class MockFilePath: + def __init__(self, pathstring, ffpathsenvironment, existance=False): + # I can't jsut make the values MockFileObjects because they may be directories. + self.mockedfilepaths = ffpathsenvironment + self.path = pathstring + self.existance = existance + if not self.mockedfilepaths.has_key(self.path): + # The first MockFilePath object is special + self.mockedfilepaths[self.path] = self + self.fileobject = None + else: + self.fileobject = self.mockedfilepaths[self.path].fileobject + self.spawn = {} + self.antecedent = os.path.dirname(self.path) + + def setContent(self, contentstring): + # This method rewrites the data in the file that corresponds to its path + # name whether it preexisted or not. + self.fileobject = MockFileObject(contentstring) + self.existance = True + self.mockedfilepaths[self.path].fileobject = self.fileobject + self.mockedfilepaths[self.path].existance = self.existance + self.setparents() + + def create(self): + # This method chokes if there's a pre-existing file! + if self.mockedfilepaths[self.path].fileobject: + raise OSError + else: + self.fileobject = MockFileObject(contentstring) + self.existance = True + self.mockedfilepaths[self.path].fileobject = self.fileobject + self.mockedfilepaths[self.path].existance = self.existance + self.setparents() + + def open(self, mode='r'): + # XXX Makes no use of mode. + if not self.mockedfilepaths[self.path].fileobject: + # If there's no fileobject there already then make one and put it there. + self.fileobject = MockFileObject() + self.existance = True + self.mockedfilepaths[self.path].fileobject = self.fileobject + self.mockedfilepaths[self.path].existance = self.existance + else: + # Otherwise get a ref to it. + self.fileobject = self.mockedfilepaths[self.path].fileobject + self.existance = self.mockedfilepaths[self.path].existance + return self.fileobject.open(mode) + + def child(self, childstring): + arg2child = os.path.join(self.path, childstring) + child = MockFilePath(arg2child, self.mockedfilepaths) + return child + + def children(self): + childrenfromffs = [ffp for ffp in self.mockedfilepaths.values() if ffp.path.startswith(self.path)] + childrenfromffs = [ffp for ffp in childrenfromffs if not ffp.path.endswith(self.path)] + childrenfromffs = [ffp for ffp in childrenfromffs if ffp.exists()] + self.spawn = frozenset(childrenfromffs) + return self.spawn + + def parent(self): + if self.mockedfilepaths.has_key(self.antecedent): + parent = self.mockedfilepaths[self.antecedent] + else: + parent = MockFilePath(self.antecedent, self.mockedfilepaths) + return parent + + def parents(self): + antecedents = [] + def f(fps, antecedents): + newfps = os.path.split(fps)[0] + if newfps: + antecedents.append(newfps) + f(newfps, antecedents) + f(self.path, antecedents) + return antecedents + + def setparents(self): + for fps in self.parents(): + if not self.mockedfilepaths.has_key(fps): + self.mockedfilepaths[fps] = MockFilePath(fps, self.mockedfilepaths, exists=True) + + def basename(self): + return os.path.split(self.path)[1] + + def moveTo(self, newffp): + # XXX Makes no distinction between file and directory arguments, this is deviation from filepath.moveTo + if self.mockedfilepaths[newffp.path].exists(): + raise OSError + else: + self.mockedfilepaths[newffp.path] = self + self.path = newffp.path + + def getsize(self): + return self.fileobject.getsize() + + def exists(self): + return self.existance + + def isdir(self): + return True + + def makedirs(self): + # XXX These methods assume that fp_ functions in fileutil will be tested elsewhere! + pass + + def remove(self): + pass + + +class MockFileObject: + def __init__(self, contentstring=''): + self.buffer = contentstring + self.pos = 0 + def open(self, mode='r'): + return self + def write(self, instring): + begin = self.pos + padlen = begin - len(self.buffer) + if padlen > 0: + self.buffer += '\x00' * padlen + end = self.pos + len(instring) + self.buffer = self.buffer[:begin]+instring+self.buffer[end:] + self.pos = end + def close(self): + self.pos = 0 + def seek(self, pos): + self.pos = pos + def read(self, numberbytes): + return self.buffer[self.pos:self.pos+numberbytes] + def tell(self): + return self.pos + def size(self): + # XXX This method A: Is not to be found in a real file B: Is part of a wild-mung-up of filepath.stat! + # XXX Finally we shall hopefully use a getsize method soon, must consult first though. + # Hmmm... perhaps we need to sometimes stat the address when there's not a mockfileobject present? + return {stat.ST_SIZE:len(self.buffer)} + def getsize(self): + return len(self.buffer) + +class MockBCC: + def setServiceParent(self, Parent): + pass + + +class MockLCC: + def setServiceParent(self, Parent): + pass + + +class TestServerWithNullBackend(unittest.TestCase, ReallyEqualMixin): + """ NullBackend is just for testing and executable documentation, so + this test is actually a test of StorageServer in which we're using + NullBackend as helper code for the test, rather than a test of + NullBackend. """ + def setUp(self): + self.ss = StorageServer(testnodeid, backend=NullCore()) + + @mock.patch('os.mkdir') + @mock.patch('__builtin__.open') + @mock.patch('os.listdir') + @mock.patch('os.path.isdir') + def test_write_share(self, mockisdir, mocklistdir, mockopen, mockmkdir): + """ Write a new share. """ + + alreadygot, bs = self.ss.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, set((0,)), 1, mock.Mock()) + bs[0].remote_write(0, 'a') + self.failIf(mockisdir.called) + self.failIf(mocklistdir.called) + self.failIf(mockopen.called) + self.failIf(mockmkdir.called) + + +class TestServerConstruction(MockFileSystem, ReallyEqualMixin): + def test_create_server_fs_backend(self): + """ This tests whether a server instance can be constructed with a + filesystem backend. To pass the test, it mustn't use the filesystem + outside of its configured storedir. """ + StorageServer(testnodeid, backend=DASCore(self.storedir, expiration_policy)) + + +class TestServerAndFSBackend(MockFileSystem, ReallyEqualMixin): + """ This tests both the StorageServer and the DAS backend together. """ + def setUp(self): + MockFileSystem.setUp(self) + try: + self.backend = DASCore(self.storedir, expiration_policy) + self.ss = StorageServer(testnodeid, self.backend) + + self.backendwithreserve = DASCore(self.storedir, expiration_policy, reserved_space = 1) + self.sswithreserve = StorageServer(testnodeid, self.backendwithreserve) + except: + MockFileSystem.tearDown(self) + raise + + @mock.patch('time.time') + @mock.patch('allmydata.util.fileutil.get_available_space') + def test_out_of_space(self, mockget_available_space, mocktime): + mocktime.return_value = 0 + + def call_get_available_space(dir, reserve): + return 0 + + mockget_available_space.side_effect = call_get_available_space + alreadygotc, bsc = self.sswithreserve.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, set((0,)), 1, mock.Mock()) + self.failUnlessReallyEqual(bsc, {}) + + @mock.patch('time.time') + def test_write_and_read_share(self, mocktime): + """ + Write a new share, read it, and test the server's (and FS backend's) + handling of simultaneous and successive attempts to write the same + share. + """ + mocktime.return_value = 0 + # Inspect incoming and fail unless it's empty. + incomingset = self.ss.backend.get_incoming_shnums('teststorage_index') + + self.failUnlessReallyEqual(incomingset, frozenset()) + + # Populate incoming with the sharenum: 0. + alreadygot, bs = self.ss.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, frozenset((0,)), 1, mock.Mock()) + + # This is a transparent-box test: Inspect incoming and fail unless the sharenum: 0 is listed there. + self.failUnlessReallyEqual(self.ss.backend.get_incoming_shnums('teststorage_index'), frozenset((0,))) + + + + # Attempt to create a second share writer with the same sharenum. + alreadygota, bsa = self.ss.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, frozenset((0,)), 1, mock.Mock()) + + # Show that no sharewriter results from a remote_allocate_buckets + # with the same si and sharenum, until BucketWriter.remote_close() + # has been called. + self.failIf(bsa) + + # Test allocated size. + spaceint = self.ss.allocated_size() + self.failUnlessReallyEqual(spaceint, 1) + + # Write 'a' to shnum 0. Only tested together with close and read. + bs[0].remote_write(0, 'a') + + # Preclose: Inspect final, failUnless nothing there. + self.failUnlessReallyEqual(len(list(self.backend.get_shares('teststorage_index'))), 0) + bs[0].remote_close() + + # Postclose: (Omnibus) failUnless written data is in final. + sharesinfinal = list(self.backend.get_shares('teststorage_index')) + self.failUnlessReallyEqual(len(sharesinfinal), 1) + contents = sharesinfinal[0].read_share_data(0, 73) + self.failUnlessReallyEqual(contents, client_data) + + # Exercise the case that the share we're asking to allocate is + # already (completely) uploaded. + self.ss.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, set((0,)), 1, mock.Mock()) + + + def test_read_old_share(self): + """ This tests whether the code correctly finds and reads + shares written out by old (Tahoe-LAFS <= v1.8.2) + servers. There is a similar test in test_download, but that one + is from the perspective of the client and exercises a deeper + stack of code. This one is for exercising just the + StorageServer object. """ + # Contruct a file with the appropriate contents in the mockfilesystem. + datalen = len(share_data) + finalhome = si_si2dir(self.basedir, 'teststorage_index').child(str(0)) + finalhome.setContent(share_data) + + # Now begin the test. + bs = self.ss.remote_get_buckets('teststorage_index') + + self.failUnlessEqual(len(bs), 1) + b = bs['0'] + # These should match by definition, the next two cases cover cases without (completely) unambiguous behaviors. + self.failUnlessReallyEqual(b.remote_read(0, datalen), client_data) + # If you try to read past the end you get the as much data as is there. + self.failUnlessReallyEqual(b.remote_read(0, datalen+20), client_data) + # If you start reading past the end of the file you get the empty string. + self.failUnlessReallyEqual(b.remote_read(datalen+1, 3), '') } [Added directories and new modules for the null backend wilcoxjg@gmail.com**20110809200929 Ignore-this: f5dfa418afced5141eb9247a9908109e ] { hunk ./src/allmydata/interfaces.py 274 store that on disk. """ +class IStorageBackend(Interface): + """ + Objects of this kind live on the server side and are used by the + storage server object. + """ + def get_available_space(self, reserved_space): + """ Returns available space for share storage in bytes, or + None if this information is not available or if the available + space is unlimited. + + If the backend is configured for read-only mode then this will + return 0. + + reserved_space is how many bytes to subtract from the answer, so + you can pass how many bytes you would like to leave unused on this + filesystem as reserved_space. """ + + def get_bucket_shares(self): + """XXX""" + + def get_share(self): + """XXX""" + + def make_bucket_writer(self): + """XXX""" + +class IStorageBackendShare(Interface): + """ + This object contains as much as all of the share data. It is intended + for lazy evaluation such that in many use cases substantially less than + all of the share data will be accessed. + """ + def is_complete(self): + """ + Returns the share state, or None if the share does not exist. + """ + class IStorageBucketWriter(Interface): """ Objects of this kind live on the client side. adddir ./src/allmydata/storage/backends addfile ./src/allmydata/storage/backends/base.py hunk ./src/allmydata/storage/backends/base.py 1 +from twisted.application import service + +class Backend(service.MultiService): + def __init__(self): + service.MultiService.__init__(self) adddir ./src/allmydata/storage/backends/null addfile ./src/allmydata/storage/backends/null/core.py hunk ./src/allmydata/storage/backends/null/core.py 1 +from allmydata.storage.backends.base import Backend +from allmydata.storage.immutable import BucketWriter, BucketReader + +class NullCore(Backend): + def __init__(self): + Backend.__init__(self) + + def get_available_space(self): + return None + + def get_shares(self, storage_index): + return set() + + def get_share(self, storage_index, sharenum): + return None + + def make_bucket_writer(self, storageindex, shnum, max_space_per_bucket, lease_info, canary): + immutableshare = ImmutableShare() + return BucketWriter(self.ss, immutableshare, max_space_per_bucket, lease_info, canary) + + def set_storage_server(self, ss): + self.ss = ss + + def get_incoming_shnums(self, storageindex): + return frozenset() + +class ImmutableShare: + sharetype = "immutable" + + def __init__(self): + """ If max_size is not None then I won't allow more than + max_size to be written to me. If create=True then max_size + must not be None. """ + pass + + def get_shnum(self): + return self.shnum + + def unlink(self): + os.unlink(self.fname) + + def read_share_data(self, offset, length): + precondition(offset >= 0) + # Reads beyond the end of the data are truncated. Reads that start + # beyond the end of the data return an empty string. + seekpos = self._data_offset+offset + fsize = os.path.getsize(self.fname) + actuallength = max(0, min(length, fsize-seekpos)) + if actuallength == 0: + return "" + f = open(self.fname, 'rb') + f.seek(seekpos) + return f.read(actuallength) + + def write_share_data(self, offset, data): + pass + + def _write_lease_record(self, f, lease_number, lease_info): + offset = self._lease_offset + lease_number * self.LEASE_SIZE + f.seek(offset) + assert f.tell() == offset + f.write(lease_info.to_immutable_data()) + + def _read_num_leases(self, f): + f.seek(0x08) + (num_leases,) = struct.unpack(">L", f.read(4)) + return num_leases + + def _write_num_leases(self, f, num_leases): + f.seek(0x08) + f.write(struct.pack(">L", num_leases)) + + def _truncate_leases(self, f, num_leases): + f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) + + def get_leases(self): + """Yields a LeaseInfo instance for all leases.""" + f = open(self.fname, 'rb') + (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc)) + f.seek(self._lease_offset) + for i in range(num_leases): + data = f.read(self.LEASE_SIZE) + if data: + yield LeaseInfo().from_immutable_data(data) + + def add_lease(self, lease): + pass + + def renew_lease(self, renew_secret, new_expire_time): + for i,lease in enumerate(self.get_leases()): + if constant_time_compare(lease.renew_secret, renew_secret): + # yup. See if we need to update the owner time. + if new_expire_time > lease.expiration_time: + # yes + lease.expiration_time = new_expire_time + f = open(self.fname, 'rb+') + self._write_lease_record(f, i, lease) + f.close() + return + raise IndexError("unable to renew non-existent lease") + + def add_or_renew_lease(self, lease_info): + try: + self.renew_lease(lease_info.renew_secret, + lease_info.expiration_time) + except IndexError: + self.add_lease(lease_info) + + + def cancel_lease(self, cancel_secret): + """Remove a lease with the given cancel_secret. If the last lease is + cancelled, the file will be removed. Return the number of bytes that + were freed (by truncating the list of leases, and possibly by + deleting the file. Raise IndexError if there was no lease with the + given cancel_secret. + """ + + leases = list(self.get_leases()) + num_leases_removed = 0 + for i,lease in enumerate(leases): + if constant_time_compare(lease.cancel_secret, cancel_secret): + leases[i] = None + num_leases_removed += 1 + if not num_leases_removed: + raise IndexError("unable to find matching lease to cancel") + if num_leases_removed: + # pack and write out the remaining leases. We write these out in + # the same order as they were added, so that if we crash while + # doing this, we won't lose any non-cancelled leases. + leases = [l for l in leases if l] # remove the cancelled leases + f = open(self.fname, 'rb+') + for i,lease in enumerate(leases): + self._write_lease_record(f, i, lease) + self._write_num_leases(f, len(leases)) + self._truncate_leases(f, len(leases)) + f.close() + space_freed = self.LEASE_SIZE * num_leases_removed + if not len(leases): + space_freed += os.stat(self.fname)[stat.ST_SIZE] + self.unlink() + return space_freed } [changes to null/core.py and storage/common.py necessary for test with null backend to pass wilcoxjg@gmail.com**20110809201249 Ignore-this: 9ddcd79f9962550ed20518ae85b6b6b2 ] { hunk ./src/allmydata/storage/backends/null/core.py 3 from allmydata.storage.backends.base import Backend from allmydata.storage.immutable import BucketWriter, BucketReader +from zope.interface import implements class NullCore(Backend): hunk ./src/allmydata/storage/backends/null/core.py 6 + implements(IStorageBackend) def __init__(self): Backend.__init__(self) hunk ./src/allmydata/storage/backends/null/core.py 30 return frozenset() class ImmutableShare: + implements(IStorageBackendShare) sharetype = "immutable" def __init__(self): hunk ./src/allmydata/storage/common.py 19 def si_a2b(ascii_storageindex): return base32.a2b(ascii_storageindex) -def storage_index_to_dir(storageindex): +def si_si2dir(startfp, storageindex): sia = si_b2a(storageindex) hunk ./src/allmydata/storage/common.py 21 - return os.path.join(sia[:2], sia) + newfp = startfp.child(sia[:2]) + return newfp.child(sia) } [change storage/server.py to new "backend pluggable" version wilcoxjg@gmail.com**20110809201647 Ignore-this: 1b0c5f9e831641287992bf45af55246e ] { hunk ./src/allmydata/storage/server.py 1 -import os, re, weakref, struct, time +import os, weakref, struct, time from foolscap.api import Referenceable from twisted.application import service hunk ./src/allmydata/storage/server.py 11 from allmydata.util import fileutil, idlib, log, time_format import allmydata # for __full_version__ -from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir -_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported +from allmydata.storage.common import si_b2a, si_a2b, si_si2dir +_pyflakes_hush = [si_b2a, si_a2b, si_si2dir] # re-exported from allmydata.storage.lease import LeaseInfo from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ create_mutable_sharefile hunk ./src/allmydata/storage/server.py 16 -from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader -from allmydata.storage.crawler import BucketCountingCrawler -from allmydata.storage.expirer import LeaseCheckingCrawler - -# storage/ -# storage/shares/incoming -# incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will -# be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success -# storage/shares/$START/$STORAGEINDEX -# storage/shares/$START/$STORAGEINDEX/$SHARENUM - -# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2 -# base-32 chars). - -# $SHARENUM matches this regex: -NUM_RE=re.compile("^[0-9]+$") - - class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer, IStatsProducer) hunk ./src/allmydata/storage/server.py 20 name = 'storage' - LeaseCheckerClass = LeaseCheckingCrawler hunk ./src/allmydata/storage/server.py 21 - def __init__(self, storedir, nodeid, reserved_space=0, - discard_storage=False, readonly_storage=False, - stats_provider=None, - expiration_enabled=False, - expiration_mode="age", - expiration_override_lease_duration=None, - expiration_cutoff_date=None, - expiration_sharetypes=("mutable", "immutable")): + def __init__(self, nodeid, backend, reserved_space=0, + readonly_storage=False, + stats_provider=None ): service.MultiService.__init__(self) assert isinstance(nodeid, str) assert len(nodeid) == 20 hunk ./src/allmydata/storage/server.py 28 self.my_nodeid = nodeid - self.storedir = storedir - sharedir = os.path.join(storedir, "shares") - fileutil.make_dirs(sharedir) - self.sharedir = sharedir - # we don't actually create the corruption-advisory dir until necessary - self.corruption_advisory_dir = os.path.join(storedir, - "corruption-advisories") - self.reserved_space = int(reserved_space) - self.no_storage = discard_storage - self.readonly_storage = readonly_storage self.stats_provider = stats_provider if self.stats_provider: self.stats_provider.register_producer(self) hunk ./src/allmydata/storage/server.py 31 - self.incomingdir = os.path.join(sharedir, 'incoming') - self._clean_incomplete() - fileutil.make_dirs(self.incomingdir) self._active_writers = weakref.WeakKeyDictionary() hunk ./src/allmydata/storage/server.py 32 + self.backend = backend + self.backend.setServiceParent(self) + self.backend.set_storage_server(self) log.msg("StorageServer created", facility="tahoe.storage") hunk ./src/allmydata/storage/server.py 37 - if reserved_space: - if self.get_available_space() is None: - log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored", - umin="0wZ27w", level=log.UNUSUAL) - self.latencies = {"allocate": [], # immutable "write": [], "close": [], hunk ./src/allmydata/storage/server.py 48 "renew": [], "cancel": [], } - self.add_bucket_counter() - - statefile = os.path.join(self.storedir, "lease_checker.state") - historyfile = os.path.join(self.storedir, "lease_checker.history") - klass = self.LeaseCheckerClass - self.lease_checker = klass(self, statefile, historyfile, - expiration_enabled, expiration_mode, - expiration_override_lease_duration, - expiration_cutoff_date, - expiration_sharetypes) - self.lease_checker.setServiceParent(self) def __repr__(self): return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) hunk ./src/allmydata/storage/server.py 52 - def add_bucket_counter(self): - statefile = os.path.join(self.storedir, "bucket_counter.state") - self.bucket_counter = BucketCountingCrawler(self, statefile) - self.bucket_counter.setServiceParent(self) - def count(self, name, delta=1): if self.stats_provider: self.stats_provider.count("storage_server." + name, delta) hunk ./src/allmydata/storage/server.py 66 """Return a dict, indexed by category, that contains a dict of latency numbers for each category. If there are sufficient samples for unambiguous interpretation, each dict will contain the - following keys: mean, 01_0_percentile, 10_0_percentile, + following keys: samplesize, mean, 01_0_percentile, 10_0_percentile, 50_0_percentile (median), 90_0_percentile, 95_0_percentile, 99_0_percentile, 99_9_percentile. If there are insufficient samples for a given percentile to be interpreted unambiguously hunk ./src/allmydata/storage/server.py 88 else: stats["mean"] = None - orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\ - (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\ - (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\ + orderstatlist = [(0.1, "10_0_percentile", 10), (0.5, "50_0_percentile", 10), \ + (0.9, "90_0_percentile", 10), (0.95, "95_0_percentile", 20), \ + (0.01, "01_0_percentile", 100), (0.99, "99_0_percentile", 100),\ (0.999, "99_9_percentile", 1000)] for percentile, percentilestring, minnumtoobserve in orderstatlist: hunk ./src/allmydata/storage/server.py 107 kwargs["facility"] = "tahoe.storage" return log.msg(*args, **kwargs) - def _clean_incomplete(self): - fileutil.rm_dir(self.incomingdir) - def get_stats(self): # remember: RIStatsProvider requires that our return dict hunk ./src/allmydata/storage/server.py 109 - # contains numeric values. + # contains numeric, or None values. stats = { 'storage_server.allocated': self.allocated_size(), } stats['storage_server.reserved_space'] = self.reserved_space for category,ld in self.get_latencies().items(): merger 0.0 ( hunk ./src/allmydata/storage/server.py 149 - return fileutil.get_available_space(self.storedir, self.reserved_space) + return fileutil.get_available_space(self.sharedir, self.reserved_space) hunk ./src/allmydata/storage/server.py 143 - def get_available_space(self): - """Returns available space for share storage in bytes, or None if no - API to get this information is available.""" - - if self.readonly_storage: - return 0 - return fileutil.get_available_space(self.storedir, self.reserved_space) - ) hunk ./src/allmydata/storage/server.py 158 return space def remote_get_version(self): - remaining_space = self.get_available_space() + remaining_space = self.backend.get_available_space() if remaining_space is None: # We're on a platform that has no API to get disk stats. remaining_space = 2**64 hunk ./src/allmydata/storage/server.py 172 } return version - def remote_allocate_buckets(self, storage_index, + def remote_allocate_buckets(self, storageindex, renew_secret, cancel_secret, sharenums, allocated_size, canary, owner_num=0): hunk ./src/allmydata/storage/server.py 181 # to a particular owner. start = time.time() self.count("allocate") - alreadygot = set() + incoming = set() bucketwriters = {} # k: shnum, v: BucketWriter hunk ./src/allmydata/storage/server.py 183 - si_dir = storage_index_to_dir(storage_index) - si_s = si_b2a(storage_index) hunk ./src/allmydata/storage/server.py 184 + si_s = si_b2a(storageindex) log.msg("storage: allocate_buckets %s" % si_s) # in this implementation, the lease information (including secrets) hunk ./src/allmydata/storage/server.py 198 max_space_per_bucket = allocated_size - remaining_space = self.get_available_space() + remaining_space = self.backend.get_available_space() limited = remaining_space is not None if limited: # this is a bit conservative, since some of this allocated_size() hunk ./src/allmydata/storage/server.py 207 remaining_space -= self.allocated_size() # self.readonly_storage causes remaining_space <= 0 - # fill alreadygot with all shares that we have, not just the ones + # Fill alreadygot with all shares that we have, not just the ones # they asked about: this will save them a lot of work. Add or update # leases for all of them: if they want us to hold shares for this hunk ./src/allmydata/storage/server.py 210 - # file, they'll want us to hold leases for this file. - for (shnum, fn) in self._get_bucket_shares(storage_index): - alreadygot.add(shnum) - sf = ShareFile(fn) - sf.add_or_renew_lease(lease_info) + # file, they'll want us to hold leases for all the shares of it. + alreadygot = set() + for share in self.backend.get_shares(storageindex): + share.add_or_renew_lease(lease_info) + alreadygot.add(share.shnum) hunk ./src/allmydata/storage/server.py 216 - for shnum in sharenums: - incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum) - finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum) - if os.path.exists(finalhome): - # great! we already have it. easy. - pass - elif os.path.exists(incominghome): - # Note that we don't create BucketWriters for shnums that - # have a partial share (in incoming/), so if a second upload - # occurs while the first is still in progress, the second - # uploader will use different storage servers. - pass - elif (not limited) or (remaining_space >= max_space_per_bucket): - # ok! we need to create the new share file. - bw = BucketWriter(self, incominghome, finalhome, - max_space_per_bucket, lease_info, canary) - if self.no_storage: - bw.throw_out_all_data = True + # all share numbers that are incoming + incoming = self.backend.get_incoming_shnums(storageindex) + + for shnum in ((sharenums - alreadygot) - incoming): + if (not limited) or (remaining_space >= max_space_per_bucket): + bw = self.backend.make_bucket_writer(storageindex, shnum, max_space_per_bucket, lease_info, canary) bucketwriters[shnum] = bw self._active_writers[bw] = 1 if limited: hunk ./src/allmydata/storage/server.py 227 remaining_space -= max_space_per_bucket else: - # bummer! not enough space to accept this bucket + # Bummer not enough space to accept this share. pass hunk ./src/allmydata/storage/server.py 230 - if bucketwriters: - fileutil.make_dirs(os.path.join(self.sharedir, si_dir)) - self.add_latency("allocate", time.time() - start) return alreadygot, bucketwriters hunk ./src/allmydata/storage/server.py 233 - def _iter_share_files(self, storage_index): - for shnum, filename in self._get_bucket_shares(storage_index): + def _iter_share_files(self, storageindex): + for shnum, filename in self._get_shares(storageindex): f = open(filename, 'rb') header = f.read(32) f.close() hunk ./src/allmydata/storage/server.py 239 if header[:32] == MutableShareFile.MAGIC: + # XXX Can I exploit this code? sf = MutableShareFile(filename, self) # note: if the share has been migrated, the renew_lease() # call will throw an exception, with information to help the hunk ./src/allmydata/storage/server.py 245 # client update the lease. elif header[:4] == struct.pack(">L", 1): + # Check if version number is "1". + # XXX WHAT ABOUT OTHER VERSIONS!!!!!!!? sf = ShareFile(filename) else: continue # non-sharefile hunk ./src/allmydata/storage/server.py 252 yield sf - def remote_add_lease(self, storage_index, renew_secret, cancel_secret, + def remote_add_lease(self, storageindex, renew_secret, cancel_secret, owner_num=1): start = time.time() self.count("add-lease") hunk ./src/allmydata/storage/server.py 260 lease_info = LeaseInfo(owner_num, renew_secret, cancel_secret, new_expire_time, self.my_nodeid) - for sf in self._iter_share_files(storage_index): + for sf in self._iter_share_files(storageindex): sf.add_or_renew_lease(lease_info) self.add_latency("add-lease", time.time() - start) return None hunk ./src/allmydata/storage/server.py 265 - def remote_renew_lease(self, storage_index, renew_secret): + def remote_renew_lease(self, storageindex, renew_secret): start = time.time() self.count("renew") new_expire_time = time.time() + 31*24*60*60 hunk ./src/allmydata/storage/server.py 270 found_buckets = False - for sf in self._iter_share_files(storage_index): + for sf in self._iter_share_files(storageindex): found_buckets = True sf.renew_lease(renew_secret, new_expire_time) self.add_latency("renew", time.time() - start) hunk ./src/allmydata/storage/server.py 277 if not found_buckets: raise IndexError("no such lease to renew") - def remote_cancel_lease(self, storage_index, cancel_secret): + def remote_cancel_lease(self, storageindex, cancel_secret): start = time.time() self.count("cancel") hunk ./src/allmydata/storage/server.py 283 total_space_freed = 0 found_buckets = False - for sf in self._iter_share_files(storage_index): + for sf in self._iter_share_files(storageindex): # note: if we can't find a lease on one share, we won't bother # looking in the others. Unless something broke internally # (perhaps we ran out of disk space while adding a lease), the hunk ./src/allmydata/storage/server.py 293 total_space_freed += sf.cancel_lease(cancel_secret) if found_buckets: - storagedir = os.path.join(self.sharedir, - storage_index_to_dir(storage_index)) - if not os.listdir(storagedir): - os.rmdir(storagedir) + # XXX Yikes looks like code that shouldn't be in the server! + storagedir = si_si2dir(self.sharedir, storageindex) + fp_rmdir_if_empty(storagedir) if self.stats_provider: self.stats_provider.count('storage_server.bytes_freed', hunk ./src/allmydata/storage/server.py 309 self.stats_provider.count('storage_server.bytes_added', consumed_size) del self._active_writers[bw] - def _get_bucket_shares(self, storage_index): - """Return a list of (shnum, pathname) tuples for files that hold - shares for this storage_index. In each tuple, 'shnum' will always be - the integer form of the last component of 'pathname'.""" - storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index)) - try: - for f in os.listdir(storagedir): - if NUM_RE.match(f): - filename = os.path.join(storagedir, f) - yield (int(f), filename) - except OSError: - # Commonly caused by there being no buckets at all. - pass - - def remote_get_buckets(self, storage_index): + def remote_get_buckets(self, storageindex): start = time.time() self.count("get") hunk ./src/allmydata/storage/server.py 312 - si_s = si_b2a(storage_index) + si_s = si_b2a(storageindex) log.msg("storage: get_buckets %s" % si_s) bucketreaders = {} # k: sharenum, v: BucketReader hunk ./src/allmydata/storage/server.py 315 - for shnum, filename in self._get_bucket_shares(storage_index): - bucketreaders[shnum] = BucketReader(self, filename, - storage_index, shnum) + self.backend.set_storage_server(self) + for share in self.backend.get_shares(storageindex): + bucketreaders[share.get_shnum()] = self.backend.make_bucket_reader(share) self.add_latency("get", time.time() - start) return bucketreaders hunk ./src/allmydata/storage/server.py 321 - def get_leases(self, storage_index): + def get_leases(self, storageindex): """Provide an iterator that yields all of the leases attached to this bucket. Each lease is returned as a LeaseInfo instance. hunk ./src/allmydata/storage/server.py 331 # since all shares get the same lease data, we just grab the leases # from the first share try: - shnum, filename = self._get_bucket_shares(storage_index).next() + shnum, filename = self._get_shares(storageindex).next() sf = ShareFile(filename) return sf.get_leases() except StopIteration: hunk ./src/allmydata/storage/server.py 337 return iter([]) - def remote_slot_testv_and_readv_and_writev(self, storage_index, + # XXX As far as Zancas' grockery has gotten. + def remote_slot_testv_and_readv_and_writev(self, storageindex, secrets, test_and_write_vectors, read_vector): hunk ./src/allmydata/storage/server.py 344 start = time.time() self.count("writev") - si_s = si_b2a(storage_index) + si_s = si_b2a(storageindex) log.msg("storage: slot_writev %s" % si_s) hunk ./src/allmydata/storage/server.py 346 - si_dir = storage_index_to_dir(storage_index) + (write_enabler, renew_secret, cancel_secret) = secrets # shares exist if there is a file for them hunk ./src/allmydata/storage/server.py 349 - bucketdir = os.path.join(self.sharedir, si_dir) + bucketdir = si_si2dir(self.sharedir, storageindex) shares = {} if os.path.isdir(bucketdir): for sharenum_s in os.listdir(bucketdir): hunk ./src/allmydata/storage/server.py 432 self) return share - def remote_slot_readv(self, storage_index, shares, readv): + def remote_slot_readv(self, storageindex, shares, readv): start = time.time() self.count("readv") hunk ./src/allmydata/storage/server.py 435 - si_s = si_b2a(storage_index) + si_s = si_b2a(storageindex) lp = log.msg("storage: slot_readv %s %s" % (si_s, shares), facility="tahoe.storage", level=log.OPERATIONAL) hunk ./src/allmydata/storage/server.py 438 - si_dir = storage_index_to_dir(storage_index) # shares exist if there is a file for them hunk ./src/allmydata/storage/server.py 439 - bucketdir = os.path.join(self.sharedir, si_dir) + bucketdir = si_si2dir(self.sharedir, storageindex) if not os.path.isdir(bucketdir): self.add_latency("readv", time.time() - start) return {} hunk ./src/allmydata/storage/server.py 458 self.add_latency("readv", time.time() - start) return datavs - def remote_advise_corrupt_share(self, share_type, storage_index, shnum, + def remote_advise_corrupt_share(self, share_type, storageindex, shnum, reason): fileutil.make_dirs(self.corruption_advisory_dir) now = time_format.iso_utc(sep="T") hunk ./src/allmydata/storage/server.py 462 - si_s = si_b2a(storage_index) + si_s = si_b2a(storageindex) # windows can't handle colons in the filename fn = os.path.join(self.corruption_advisory_dir, "%s--%s-%d" % (now, si_s, shnum)).replace(":","") hunk ./src/allmydata/storage/server.py 469 f = open(fn, "w") f.write("report: Share Corruption\n") f.write("type: %s\n" % share_type) - f.write("storage_index: %s\n" % si_s) + f.write("storageindex: %s\n" % si_s) f.write("share_number: %d\n" % shnum) f.write("\n") f.write(reason) } [modify null/core.py such that the correct interfaces are implemented wilcoxjg@gmail.com**20110809201822 Ignore-this: 3c64580592474f71633287d1b6beeb6b ] hunk ./src/allmydata/storage/backends/null/core.py 4 from allmydata.storage.backends.base import Backend from allmydata.storage.immutable import BucketWriter, BucketReader from zope.interface import implements +from allmydata.interfaces import IStorageBackend, IStorageBackendShare class NullCore(Backend): implements(IStorageBackend) [make changes to storage/immutable.py most changes are part of movement to DAS specific backend. wilcoxjg@gmail.com**20110809202232 Ignore-this: 70c7c6ea6be2418d70556718a050714 ] { hunk ./src/allmydata/storage/immutable.py 1 -import os, stat, struct, time +import os, time from foolscap.api import Referenceable hunk ./src/allmydata/storage/immutable.py 7 from zope.interface import implements from allmydata.interfaces import RIBucketWriter, RIBucketReader -from allmydata.util import base32, fileutil, log +from allmydata.util import base32, log from allmydata.util.assertutil import precondition from allmydata.util.hashutil import constant_time_compare from allmydata.storage.lease import LeaseInfo hunk ./src/allmydata/storage/immutable.py 14 from allmydata.storage.common import UnknownImmutableContainerVersionError, \ DataTooLargeError -# each share file (in storage/shares/$SI/$SHNUM) contains lease information -# and share data. The share data is accessed by RIBucketWriter.write and -# RIBucketReader.read . The lease information is not accessible through these -# interfaces. - -# The share file has the following layout: -# 0x00: share file version number, four bytes, current version is 1 -# 0x04: share data length, four bytes big-endian = A # See Footnote 1 below. -# 0x08: number of leases, four bytes big-endian -# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy) -# A+0x0c = B: first lease. Lease format is: -# B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner -# B+0x04: renew secret, 32 bytes (SHA256) -# B+0x24: cancel secret, 32 bytes (SHA256) -# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch -# B+0x48: next lease, or end of record - -# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers, -# but it is still filled in by storage servers in case the storage server -# software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the -# share file is moved from one storage server to another. The value stored in -# this field is truncated, so if the actual share data length is >= 2**32, -# then the value stored in this field will be the actual share data length -# modulo 2**32. - -class ShareFile: - LEASE_SIZE = struct.calcsize(">L32s32sL") - sharetype = "immutable" - - def __init__(self, filename, max_size=None, create=False): - """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """ - precondition((max_size is not None) or (not create), max_size, create) - self.home = filename - self._max_size = max_size - if create: - # touch the file, so later callers will see that we're working on - # it. Also construct the metadata. - assert not os.path.exists(self.home) - fileutil.make_dirs(os.path.dirname(self.home)) - f = open(self.home, 'wb') - # The second field -- the four-byte share data length -- is no - # longer used as of Tahoe v1.3.0, but we continue to write it in - # there in case someone downgrades a storage server from >= - # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one - # server to another, etc. We do saturation -- a share data length - # larger than 2**32-1 (what can fit into the field) is marked as - # the largest length that can fit into the field. That way, even - # if this does happen, the old < v1.3.0 server will still allow - # clients to read the first part of the share. - f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0)) - f.close() - self._lease_offset = max_size + 0x0c - self._num_leases = 0 - else: - f = open(self.home, 'rb') - filesize = os.path.getsize(self.home) - (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc)) - f.close() - if version != 1: - msg = "sharefile %s had version %d but we wanted 1" % \ - (filename, version) - raise UnknownImmutableContainerVersionError(msg) - self._num_leases = num_leases - self._lease_offset = filesize - (num_leases * self.LEASE_SIZE) - self._data_offset = 0xc - - def unlink(self): - os.unlink(self.home) - - def read_share_data(self, offset, length): - precondition(offset >= 0) - # reads beyond the end of the data are truncated. Reads that start - # beyond the end of the data return an empty string. I wonder why - # Python doesn't do the following computation for me? - seekpos = self._data_offset+offset - fsize = os.path.getsize(self.home) - actuallength = max(0, min(length, fsize-seekpos)) - if actuallength == 0: - return "" - f = open(self.home, 'rb') - f.seek(seekpos) - return f.read(actuallength) - - def write_share_data(self, offset, data): - length = len(data) - precondition(offset >= 0, offset) - if self._max_size is not None and offset+length > self._max_size: - raise DataTooLargeError(self._max_size, offset, length) - f = open(self.home, 'rb+') - real_offset = self._data_offset+offset - f.seek(real_offset) - assert f.tell() == real_offset - f.write(data) - f.close() - - def _write_lease_record(self, f, lease_number, lease_info): - offset = self._lease_offset + lease_number * self.LEASE_SIZE - f.seek(offset) - assert f.tell() == offset - f.write(lease_info.to_immutable_data()) - - def _read_num_leases(self, f): - f.seek(0x08) - (num_leases,) = struct.unpack(">L", f.read(4)) - return num_leases - - def _write_num_leases(self, f, num_leases): - f.seek(0x08) - f.write(struct.pack(">L", num_leases)) - - def _truncate_leases(self, f, num_leases): - f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) - - def get_leases(self): - """Yields a LeaseInfo instance for all leases.""" - f = open(self.home, 'rb') - (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc)) - f.seek(self._lease_offset) - for i in range(num_leases): - data = f.read(self.LEASE_SIZE) - if data: - yield LeaseInfo().from_immutable_data(data) - - def add_lease(self, lease_info): - f = open(self.home, 'rb+') - num_leases = self._read_num_leases(f) - self._write_lease_record(f, num_leases, lease_info) - self._write_num_leases(f, num_leases+1) - f.close() - - def renew_lease(self, renew_secret, new_expire_time): - for i,lease in enumerate(self.get_leases()): - if constant_time_compare(lease.renew_secret, renew_secret): - # yup. See if we need to update the owner time. - if new_expire_time > lease.expiration_time: - # yes - lease.expiration_time = new_expire_time - f = open(self.home, 'rb+') - self._write_lease_record(f, i, lease) - f.close() - return - raise IndexError("unable to renew non-existent lease") - - def add_or_renew_lease(self, lease_info): - try: - self.renew_lease(lease_info.renew_secret, - lease_info.expiration_time) - except IndexError: - self.add_lease(lease_info) - - - def cancel_lease(self, cancel_secret): - """Remove a lease with the given cancel_secret. If the last lease is - cancelled, the file will be removed. Return the number of bytes that - were freed (by truncating the list of leases, and possibly by - deleting the file. Raise IndexError if there was no lease with the - given cancel_secret. - """ - - leases = list(self.get_leases()) - num_leases_removed = 0 - for i,lease in enumerate(leases): - if constant_time_compare(lease.cancel_secret, cancel_secret): - leases[i] = None - num_leases_removed += 1 - if not num_leases_removed: - raise IndexError("unable to find matching lease to cancel") - if num_leases_removed: - # pack and write out the remaining leases. We write these out in - # the same order as they were added, so that if we crash while - # doing this, we won't lose any non-cancelled leases. - leases = [l for l in leases if l] # remove the cancelled leases - f = open(self.home, 'rb+') - for i,lease in enumerate(leases): - self._write_lease_record(f, i, lease) - self._write_num_leases(f, len(leases)) - self._truncate_leases(f, len(leases)) - f.close() - space_freed = self.LEASE_SIZE * num_leases_removed - if not len(leases): - space_freed += os.stat(self.home)[stat.ST_SIZE] - self.unlink() - return space_freed - - class BucketWriter(Referenceable): implements(RIBucketWriter) hunk ./src/allmydata/storage/immutable.py 17 - def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary): + def __init__(self, ss, immutableshare, max_size, lease_info, canary): self.ss = ss hunk ./src/allmydata/storage/immutable.py 19 - self.incominghome = incominghome - self.finalhome = finalhome - self._max_size = max_size # don't allow the client to write more than this + self._max_size = max_size # don't allow the client to write more than this print self.ss._active_writers.keys() self._canary = canary self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) self.closed = False hunk ./src/allmydata/storage/immutable.py 24 self.throw_out_all_data = False - self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) + self._sharefile = immutableshare # also, add our lease to the file now, so that other ones can be # added by simultaneous uploaders self._sharefile.add_lease(lease_info) hunk ./src/allmydata/storage/immutable.py 45 precondition(not self.closed) start = time.time() - fileutil.make_dirs(os.path.dirname(self.finalhome)) - fileutil.rename(self.incominghome, self.finalhome) - try: - # self.incominghome is like storage/shares/incoming/ab/abcde/4 . - # We try to delete the parent (.../ab/abcde) to avoid leaving - # these directories lying around forever, but the delete might - # fail if we're working on another share for the same storage - # index (like ab/abcde/5). The alternative approach would be to - # use a hierarchy of objects (PrefixHolder, BucketHolder, - # ShareWriter), each of which is responsible for a single - # directory on disk, and have them use reference counting of - # their children to know when they should do the rmdir. This - # approach is simpler, but relies on os.rmdir refusing to delete - # a non-empty directory. Do *not* use fileutil.rm_dir() here! - os.rmdir(os.path.dirname(self.incominghome)) - # we also delete the grandparent (prefix) directory, .../ab , - # again to avoid leaving directories lying around. This might - # fail if there is another bucket open that shares a prefix (like - # ab/abfff). - os.rmdir(os.path.dirname(os.path.dirname(self.incominghome))) - # we leave the great-grandparent (incoming/) directory in place. - except EnvironmentError: - # ignore the "can't rmdir because the directory is not empty" - # exceptions, those are normal consequences of the - # above-mentioned conditions. - pass + self._sharefile.close() + filelen = self._sharefile.stat() self._sharefile = None hunk ./src/allmydata/storage/immutable.py 48 + self.closed = True self._canary.dontNotifyOnDisconnect(self._disconnect_marker) hunk ./src/allmydata/storage/immutable.py 52 - filelen = os.stat(self.finalhome)[stat.ST_SIZE] self.ss.bucket_writer_closed(self, filelen) self.ss.add_latency("close", time.time() - start) self.ss.count("close") hunk ./src/allmydata/storage/immutable.py 90 class BucketReader(Referenceable): implements(RIBucketReader) - def __init__(self, ss, sharefname, storage_index=None, shnum=None): + def __init__(self, ss, share): self.ss = ss hunk ./src/allmydata/storage/immutable.py 92 - self._share_file = ShareFile(sharefname) - self.storage_index = storage_index - self.shnum = shnum + self._share_file = share + self.storageindex = share.storageindex + self.shnum = share.shnum def __repr__(self): return "<%s %s %s>" % (self.__class__.__name__, hunk ./src/allmydata/storage/immutable.py 98 - base32.b2a_l(self.storage_index[:8], 60), + base32.b2a_l(self.storageindex[:8], 60), self.shnum) def remote_read(self, offset, length): hunk ./src/allmydata/storage/immutable.py 110 def remote_advise_corrupt_share(self, reason): return self.ss.remote_advise_corrupt_share("immutable", - self.storage_index, + self.storageindex, self.shnum, reason) } [creates backends/das/core.py wilcoxjg@gmail.com**20110809202620 Ignore-this: 2ea937f8d02aa85396135903be91ed67 ] { adddir ./src/allmydata/storage/backends/das addfile ./src/allmydata/storage/backends/das/core.py hunk ./src/allmydata/storage/backends/das/core.py 1 +import re, weakref, struct, time, stat +from twisted.application import service +from twisted.python.filepath import UnlistableError +from twisted.python import filepath +from twisted.python.filepath import FilePath +from zope.interface import implements + +import allmydata # for __full_version__ +from allmydata.interfaces import IStorageBackend +from allmydata.storage.backends.base import Backend +from allmydata.storage.common import si_b2a, si_a2b, si_si2dir +from allmydata.util.assertutil import precondition +from allmydata.interfaces import IStatsProducer, IShareStore# XXX, RIStorageServer +from allmydata.util import fileutil, idlib, log, time_format +from allmydata.util.fileutil import fp_make_dirs +from allmydata.storage.lease import LeaseInfo +from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ + create_mutable_sharefile +from allmydata.storage.immutable import BucketWriter, BucketReader +from allmydata.storage.crawler import BucketCountingCrawler +from allmydata.util.hashutil import constant_time_compare +from allmydata.storage.backends.das.expirer import LeaseCheckingCrawler +_pyflakes_hush = [si_b2a, si_a2b, si_si2dir] # re-exported + +# storage/ +# storage/shares/incoming +# incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will +# be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success +# storage/shares/$START/$STORAGEINDEX +# storage/shares/$START/$STORAGEINDEX/$SHARENUM + +# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2 +# base-32 chars). +# $SHARENUM matches this regex: +NUM_RE=re.compile("^[0-9]+$") + +class DASCore(Backend): + implements(IStorageBackend) + def __init__(self, storedir, expiration_policy, readonly=False, reserved_space=0): + Backend.__init__(self) + self._setup_storage(storedir, readonly, reserved_space) + self._setup_corruption_advisory() + self._setup_bucket_counter() + self._setup_lease_checkerf(expiration_policy) + + def _setup_storage(self, storedir, readonly, reserved_space): + precondition(isinstance(storedir, FilePath), storedir, FilePath) + self.storedir = storedir + self.readonly = readonly + self.reserved_space = int(reserved_space) + self.sharedir = self.storedir.child("shares") + fileutil.fp_make_dirs(self.sharedir) + self.incomingdir = self.sharedir.child('incoming') + self._clean_incomplete() + if self.reserved_space and (self.get_available_space() is None): + log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored", + umid="0wZ27w", level=log.UNUSUAL) + + + def _clean_incomplete(self): + fileutil.fp_remove(self.incomingdir) + fileutil.fp_make_dirs(self.incomingdir) + + def _setup_corruption_advisory(self): + # we don't actually create the corruption-advisory dir until necessary + self.corruption_advisory_dir = self.storedir.child("corruption-advisories") + + def _setup_bucket_counter(self): + statefname = self.storedir.child("bucket_counter.state") + self.bucket_counter = BucketCountingCrawler(statefname) + self.bucket_counter.setServiceParent(self) + + def _setup_lease_checkerf(self, expiration_policy): + statefile = self.storedir.child("lease_checker.state") + historyfile = self.storedir.child("lease_checker.history") + self.lease_checker = LeaseCheckingCrawler(statefile, historyfile, expiration_policy) + self.lease_checker.setServiceParent(self) + + def get_incoming_shnums(self, storageindex): + """ Return a frozenset of the shnum (as ints) of incoming shares. """ + incomingthissi = si_si2dir(self.incomingdir, storageindex) + try: + childfps = [ fp for fp in incomingthissi.children() if NUM_RE.match(fp.basename()) ] + shnums = [ int(fp.basename()) for fp in childfps] + return frozenset(shnums) + except UnlistableError: + # There is no shares directory at all. + return frozenset() + + def get_shares(self, storageindex): + """ Generate ImmutableShare objects for shares we have for this + storageindex. ("Shares we have" means completed ones, excluding + incoming ones.)""" + finalstoragedir = si_si2dir(self.sharedir, storageindex) + try: + for fp in finalstoragedir.children(): + fpshnumstr = fp.basename() + if NUM_RE.match(fpshnumstr): + finalhome = finalstoragedir.child(fpshnumstr) + yield ImmutableShare(storageindex, fpshnumstr, finalhome) + except UnlistableError: + # There is no shares directory at all. + pass + + def get_available_space(self): + if self.readonly: + return 0 + return fileutil.get_available_space(self.storedir, self.reserved_space) + + def make_bucket_writer(self, storageindex, shnum, max_space_per_bucket, lease_info, canary): + finalhome = si_si2dir(self.sharedir, storageindex).child(str(shnum)) + incominghome = si_si2dir(self.incomingdir, storageindex).child(str(shnum)) + immsh = ImmutableShare(storageindex, shnum, finalhome, incominghome, max_size=max_space_per_bucket, create=True) + bw = BucketWriter(self.ss, immsh, max_space_per_bucket, lease_info, canary) + return bw + + def make_bucket_reader(self, share): + return BucketReader(self.ss, share) + + def set_storage_server(self, ss): + self.ss = ss + + +# each share file (in storage/shares/$SI/$SHNUM) contains lease information +# and share data. The share data is accessed by RIBucketWriter.write and +# RIBucketReader.read . The lease information is not accessible through these +# interfaces. + +# The share file has the following layout: +# 0x00: share file version number, four bytes, current version is 1 +# 0x04: share data length, four bytes big-endian = A # See Footnote 1 below. +# 0x08: number of leases, four bytes big-endian +# 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy) +# A+0x0c = B: first lease. Lease format is: +# B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner +# B+0x04: renew secret, 32 bytes (SHA256) +# B+0x24: cancel secret, 32 bytes (SHA256) +# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch +# B+0x48: next lease, or end of record + +# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers, +# but it is still filled in by storage servers in case the storage server +# software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the +# share file is moved from one storage server to another. The value stored in +# this field is truncated, so if the actual share data length is >= 2**32, +# then the value stored in this field will be the actual share data length +# modulo 2**32. + +class ImmutableShare(object): + LEASE_SIZE = struct.calcsize(">L32s32sL") + sharetype = "immutable" + + def __init__(self, storageindex, shnum, finalhome=None, incominghome=None, max_size=None, create=False): + """ If max_size is not None then I won't allow more than + max_size to be written to me. If create=True then max_size + must not be None. """ + precondition((max_size is not None) or (not create), max_size, create) + self.storageindex = storageindex + self._max_size = max_size + self.incominghome = incominghome + self.finalhome = finalhome + self.shnum = shnum + if create: + # touch the file, so later callers will see that we're working on + # it. Also construct the metadata. + assert not finalhome.exists() + fp_make_dirs(self.incominghome.parent()) + # The second field -- the four-byte share data length -- is no + # longer used as of Tahoe v1.3.0, but we continue to write it in + # there in case someone downgrades a storage server from >= + # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one + # server to another, etc. We do saturation -- a share data length + # larger than 2**32-1 (what can fit into the field) is marked as + # the largest length that can fit into the field. That way, even + # if this does happen, the old < v1.3.0 server will still allow + # clients to read the first part of the share. + self.incominghome.setContent(struct.pack(">LLL", 1, min(2**32-1, max_size), 0) ) + self._lease_offset = max_size + 0x0c + self._num_leases = 0 + else: + fh = self.finalhome.open(mode='rb') + try: + (version, unused, num_leases) = struct.unpack(">LLL", fh.read(0xc)) + finally: + fh.close() + filesize = self.finalhome.getsize() + if version != 1: + msg = "sharefile %s had version %d but we wanted 1" % \ + (self.finalhome, version) + raise UnknownImmutableContainerVersionError(msg) + self._num_leases = num_leases + self._lease_offset = filesize - (num_leases * self.LEASE_SIZE) + self._data_offset = 0xc + + def close(self): + fileutil.fp_make_dirs(self.finalhome.parent()) + self.incominghome.moveTo(self.finalhome) + try: + # self.incominghome is like storage/shares/incoming/ab/abcde/4 . + # We try to delete the parent (.../ab/abcde) to avoid leaving + # these directories lying around forever, but the delete might + # fail if we're working on another share for the same storage + # index (like ab/abcde/5). The alternative approach would be to + # use a hierarchy of objects (PrefixHolder, BucketHolder, + # ShareWriter), each of which is responsible for a single + # directory on disk, and have them use reference counting of + # their children to know when they should do the rmdir. This + # approach is simpler, but relies on os.rmdir refusing to delete + # a non-empty directory. Do *not* use fileutil.rm_dir() here! + fileutil.fp_rmdir_if_empty(self.incominghome.parent()) + # we also delete the grandparent (prefix) directory, .../ab , + # again to avoid leaving directories lying around. This might + # fail if there is another bucket open that shares a prefix (like + # ab/abfff). + fileutil.fp_rmdir_if_empty(self.incominghome.parent().parent()) + # we leave the great-grandparent (incoming/) directory in place. + except EnvironmentError: + # ignore the "can't rmdir because the directory is not empty" + # exceptions, those are normal consequences of the + # above-mentioned conditions. + pass + pass + + def stat(self): + return filepath.stat(self.finalhome.path)[stat.ST_SIZE] + + def get_shnum(self): + return self.shnum + + def unlink(self): + self.finalhome.remove() + + def read_share_data(self, offset, length): + precondition(offset >= 0) + # Reads beyond the end of the data are truncated. Reads that start + # beyond the end of the data return an empty string. + seekpos = self._data_offset+offset + fsize = self.finalhome.getsize() + actuallength = max(0, min(length, fsize-seekpos)) + if actuallength == 0: + return "" + fh = self.finalhome.open(mode='rb') + try: + fh.seek(seekpos) + sharedata = fh.read(actuallength) + finally: + fh.close() + return sharedata + + def write_share_data(self, offset, data): + length = len(data) + precondition(offset >= 0, offset) + if self._max_size is not None and offset+length > self._max_size: + raise DataTooLargeError(self._max_size, offset, length) + fh = self.incominghome.open(mode='rb+') + try: + real_offset = self._data_offset+offset + fh.seek(real_offset) + assert fh.tell() == real_offset + fh.write(data) + finally: + fh.close() + + def _write_lease_record(self, f, lease_number, lease_info): + offset = self._lease_offset + lease_number * self.LEASE_SIZE + fh = f.open() + try: + fh.seek(offset) + assert fh.tell() == offset + fh.write(lease_info.to_immutable_data()) + finally: + fh.close() + + def _read_num_leases(self, f): + fh = f.open() #XXX Should be mocking FilePath.open() + try: + fh.seek(0x08) + ro = fh.read(4) + (num_leases,) = struct.unpack(">L", ro) + finally: + fh.close() + return num_leases + + def _write_num_leases(self, f, num_leases): + fh = f.open() + try: + fh.seek(0x08) + fh.write(struct.pack(">L", num_leases)) + finally: + fh.close() + + def _truncate_leases(self, f, num_leases): + f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) + + def get_leases(self): + """Yields a LeaseInfo instance for all leases.""" + fh = self.finalhome.open(mode='rb') + (version, unused, num_leases) = struct.unpack(">LLL", fh.read(0xc)) + fh.seek(self._lease_offset) + for i in range(num_leases): + data = fh.read(self.LEASE_SIZE) + if data: + yield LeaseInfo().from_immutable_data(data) + + def add_lease(self, lease_info): + num_leases = self._read_num_leases(self.incominghome) + self._write_lease_record(self.incominghome, num_leases, lease_info) + self._write_num_leases(self.incominghome, num_leases+1) + + def renew_lease(self, renew_secret, new_expire_time): + for i,lease in enumerate(self.get_leases()): + if constant_time_compare(lease.renew_secret, renew_secret): + # yup. See if we need to update the owner time. + if new_expire_time > lease.expiration_time: + # yes + lease.expiration_time = new_expire_time + f = open(self.finalhome, 'rb+') + self._write_lease_record(f, i, lease) + f.close() + return + raise IndexError("unable to renew non-existent lease") + + def add_or_renew_lease(self, lease_info): + try: + self.renew_lease(lease_info.renew_secret, + lease_info.expiration_time) + except IndexError: + self.add_lease(lease_info) + + def cancel_lease(self, cancel_secret): + """Remove a lease with the given cancel_secret. If the last lease is + cancelled, the file will be removed. Return the number of bytes that + were freed (by truncating the list of leases, and possibly by + deleting the file. Raise IndexError if there was no lease with the + given cancel_secret. + """ + + leases = list(self.get_leases()) + num_leases_removed = 0 + for i,lease in enumerate(leases): + if constant_time_compare(lease.cancel_secret, cancel_secret): + leases[i] = None + num_leases_removed += 1 + if not num_leases_removed: + raise IndexError("unable to find matching lease to cancel") + if num_leases_removed: + # pack and write out the remaining leases. We write these out in + # the same order as they were added, so that if we crash while + # doing this, we won't lose any non-cancelled leases. + leases = [l for l in leases if l] # remove the cancelled leases + f = open(self.finalhome, 'rb+') + for i,lease in enumerate(leases): + self._write_lease_record(f, i, lease) + self._write_num_leases(f, len(leases)) + self._truncate_leases(f, len(leases)) + f.close() + space_freed = self.LEASE_SIZE * num_leases_removed + if not len(leases): + space_freed += os.stat(self.finalhome)[stat.ST_SIZE] + self.unlink() + return space_freed } [change backends/das/core.py to correct which interfaces are implemented wilcoxjg@gmail.com**20110809203123 Ignore-this: 7f9331a04b55f7feee4335abee011e14 ] hunk ./src/allmydata/storage/backends/das/core.py 13 from allmydata.storage.backends.base import Backend from allmydata.storage.common import si_b2a, si_a2b, si_si2dir from allmydata.util.assertutil import precondition -from allmydata.interfaces import IStatsProducer, IShareStore# XXX, RIStorageServer +from allmydata.interfaces import IStorageBackend from allmydata.util import fileutil, idlib, log, time_format from allmydata.util.fileutil import fp_make_dirs from allmydata.storage.lease import LeaseInfo [util/fileutil.py now expects and manipulates twisted.python.filepath.FilePath objects wilcoxjg@gmail.com**20110809203321 Ignore-this: 12c8aa13424ed51a5df09b92a454627 ] { hunk ./src/allmydata/util/fileutil.py 5 Futz with files like a pro. """ -import sys, exceptions, os, stat, tempfile, time, binascii +import errno, sys, exceptions, os, stat, tempfile, time, binascii + +from allmydata.util.assertutil import precondition from twisted.python import log hunk ./src/allmydata/util/fileutil.py 10 +from twisted.python.filepath import FilePath, UnlistableError from pycryptopp.cipher.aes import AES hunk ./src/allmydata/util/fileutil.py 189 raise tx raise exceptions.IOError, "unknown error prevented creation of directory, or deleted the directory immediately after creation: %s" % dirname # careful not to construct an IOError with a 2-tuple, as that has a special meaning... -def rm_dir(dirname): +def fp_make_dirs(dirfp): + """ + An idempotent version of FilePath.makedirs(). If the dir already + exists, do nothing and return without raising an exception. If this + call creates the dir, return without raising an exception. If there is + an error that prevents creation or if the directory gets deleted after + fp_make_dirs() creates it and before fp_make_dirs() checks that it + exists, raise an exception. + """ + log.msg( "xxx 0 %s" % (dirfp,)) + tx = None + try: + dirfp.makedirs() + except OSError, x: + tx = x + + if not dirfp.isdir(): + if tx: + raise tx + raise exceptions.IOError, "unknown error prevented creation of directory, or deleted the directory immediately after creation: %s" % dirfp # careful not to construct an IOError with a 2-tuple, as that has a special meaning... + +def fp_rmdir_if_empty(dirfp): + """ Remove the directory if it is empty. """ + try: + os.rmdir(dirfp.path) + except OSError, e: + if e.errno != errno.ENOTEMPTY: + raise + else: + dirfp.changed() + +def rmtree(dirname): """ A threadsafe and idempotent version of shutil.rmtree(). If the dir is already gone, do nothing and return without raising an exception. If this hunk ./src/allmydata/util/fileutil.py 239 else: remove(fullname) os.rmdir(dirname) - except Exception, le: - # Ignore "No such file or directory" - if (not isinstance(le, OSError)) or le.args[0] != 2: + except EnvironmentError, le: + # Ignore "No such file or directory", collect any other exception. + if (le.args[0] != 2 and le.args[0] != 3) or (le.args[0] != errno.ENOENT): excs.append(le) hunk ./src/allmydata/util/fileutil.py 243 + except Exception, le: + excs.append(le) # Okay, now we've recursively removed everything, ignoring any "No # such file or directory" errors, and collecting any other errors. hunk ./src/allmydata/util/fileutil.py 256 raise OSError, "Failed to remove dir for unknown reason." raise OSError, excs +def fp_remove(dirfp): + """ + An idempotent version of shutil.rmtree(). If the dir is already gone, + do nothing and return without raising an exception. If this call + removes the dir, return without raising an exception. If there is an + error that prevents removal or if the directory gets created again by + someone else after this deletes it and before this checks that it is + gone, raise an exception. + """ + try: + dirfp.remove() + except UnlistableError, e: + if e.originalException.errno != errno.ENOENT: + raise + except OSError, e: + if e.errno != errno.ENOENT: + raise + +def rm_dir(dirname): + # Renamed to be like shutil.rmtree and unlike rmdir. + return rmtree(dirname) def remove_if_possible(f): try: hunk ./src/allmydata/util/fileutil.py 387 import traceback traceback.print_exc() -def get_disk_stats(whichdir, reserved_space=0): +def get_disk_stats(whichdirfp, reserved_space=0): """Return disk statistics for the storage disk, in the form of a dict with the following fields. total: total bytes on disk hunk ./src/allmydata/util/fileutil.py 408 you can pass how many bytes you would like to leave unused on this filesystem as reserved_space. """ + precondition(isinstance(whichdirfp, FilePath), whichdirfp) if have_GetDiskFreeSpaceExW: # If this is a Windows system and GetDiskFreeSpaceExW is available, use it. hunk ./src/allmydata/util/fileutil.py 419 n_free_for_nonroot = c_ulonglong(0) n_total = c_ulonglong(0) n_free_for_root = c_ulonglong(0) - retval = GetDiskFreeSpaceExW(whichdir, byref(n_free_for_nonroot), + retval = GetDiskFreeSpaceExW(whichdirfp.path, byref(n_free_for_nonroot), byref(n_total), byref(n_free_for_root)) if retval == 0: hunk ./src/allmydata/util/fileutil.py 424 raise OSError("Windows error %d attempting to get disk statistics for %r" - % (GetLastError(), whichdir)) + % (GetLastError(), whichdirfp.path)) free_for_nonroot = n_free_for_nonroot.value total = n_total.value free_for_root = n_free_for_root.value hunk ./src/allmydata/util/fileutil.py 433 # # # - s = os.statvfs(whichdir) + s = os.statvfs(whichdirfp.path) # on my mac laptop: # statvfs(2) is a wrapper around statfs(2). hunk ./src/allmydata/util/fileutil.py 460 'avail': avail, } -def get_available_space(whichdir, reserved_space): +def get_available_space(whichdirfp, reserved_space): """Returns available space for share storage in bytes, or None if no API to get this information is available. hunk ./src/allmydata/util/fileutil.py 472 you can pass how many bytes you would like to leave unused on this filesystem as reserved_space. """ + precondition(isinstance(whichdirfp, FilePath), whichdirfp) try: hunk ./src/allmydata/util/fileutil.py 474 - return get_disk_stats(whichdir, reserved_space)['avail'] + return get_disk_stats(whichdirfp, reserved_space)['avail'] except AttributeError: return None hunk ./src/allmydata/util/fileutil.py 477 - except EnvironmentError: - log.msg("OS call to get disk statistics failed") - return 0 } [add expirer.py wilcoxjg@gmail.com**20110809203519 Ignore-this: b09d460593f0e0aa065e867d5159455b ] { addfile ./src/allmydata/storage/backends/das/expirer.py hunk ./src/allmydata/storage/backends/das/expirer.py 1 +import time, os, pickle, struct # os, pickle, and struct will almost certainly be migrated to the backend... +from allmydata.storage.crawler import ShareCrawler +from allmydata.storage.common import UnknownMutableContainerVersionError, \ + UnknownImmutableContainerVersionError +from twisted.python import log as twlog + +class LeaseCheckingCrawler(ShareCrawler): + """I examine the leases on all shares, determining which are still valid + and which have expired. I can remove the expired leases (if so + configured), and the share will be deleted when the last lease is + removed. + + I collect statistics on the leases and make these available to a web + status page, including: + + Space recovered during this cycle-so-far: + actual (only if expiration_enabled=True): + num-buckets, num-shares, sum of share sizes, real disk usage + ('real disk usage' means we use stat(fn).st_blocks*512 and include any + space used by the directory) + what it would have been with the original lease expiration time + what it would have been with our configured expiration time + + Prediction of space that will be recovered during the rest of this cycle + Prediction of space that will be recovered by the entire current cycle. + + Space recovered during the last 10 cycles <-- saved in separate pickle + + Shares/buckets examined: + this cycle-so-far + prediction of rest of cycle + during last 10 cycles <-- separate pickle + start/finish time of last 10 cycles <-- separate pickle + expiration time used for last 10 cycles <-- separate pickle + + Histogram of leases-per-share: + this-cycle-to-date + last 10 cycles <-- separate pickle + Histogram of lease ages, buckets = 1day + cycle-to-date + last 10 cycles <-- separate pickle + + All cycle-to-date values remain valid until the start of the next cycle. + + """ + + slow_start = 360 # wait 6 minutes after startup + minimum_cycle_time = 12*60*60 # not more than twice per day + + def __init__(self, statefile, historyfp, expiration_policy): + self.historyfp = historyfp + self.expiration_enabled = expiration_policy['enabled'] + self.mode = expiration_policy['mode'] + self.override_lease_duration = None + self.cutoff_date = None + if self.mode == "age": + assert isinstance(expiration_policy['override_lease_duration'], (int, type(None))) + self.override_lease_duration = expiration_policy['override_lease_duration']# seconds + elif self.mode == "cutoff-date": + assert isinstance(expiration_policy['cutoff_date'], int) # seconds-since-epoch + assert cutoff_date is not None + self.cutoff_date = expiration_policy['cutoff_date'] + else: + raise ValueError("GC mode '%s' must be 'age' or 'cutoff-date'" % expiration_policy['mode']) + self.sharetypes_to_expire = expiration_policy['sharetypes'] + ShareCrawler.__init__(self, statefile) + + def add_initial_state(self): + # we fill ["cycle-to-date"] here (even though they will be reset in + # self.started_cycle) just in case someone grabs our state before we + # get started: unit tests do this + so_far = self.create_empty_cycle_dict() + self.state.setdefault("cycle-to-date", so_far) + # in case we upgrade the code while a cycle is in progress, update + # the keys individually + for k in so_far: + self.state["cycle-to-date"].setdefault(k, so_far[k]) + + # initialize history + if not self.historyfp.exists(): + history = {} # cyclenum -> dict + self.historyfp.setContent(pickle.dumps(history)) + + def create_empty_cycle_dict(self): + recovered = self.create_empty_recovered_dict() + so_far = {"corrupt-shares": [], + "space-recovered": recovered, + "lease-age-histogram": {}, # (minage,maxage)->count + "leases-per-share-histogram": {}, # leasecount->numshares + } + return so_far + + def create_empty_recovered_dict(self): + recovered = {} + for a in ("actual", "original", "configured", "examined"): + for b in ("buckets", "shares", "sharebytes", "diskbytes"): + recovered[a+"-"+b] = 0 + recovered[a+"-"+b+"-mutable"] = 0 + recovered[a+"-"+b+"-immutable"] = 0 + return recovered + + def started_cycle(self, cycle): + self.state["cycle-to-date"] = self.create_empty_cycle_dict() + + def stat(self, fn): + return os.stat(fn) + + def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): + bucketdir = os.path.join(prefixdir, storage_index_b32) + s = self.stat(bucketdir) + would_keep_shares = [] + wks = None + + for fn in os.listdir(bucketdir): + try: + shnum = int(fn) + except ValueError: + continue # non-numeric means not a sharefile + sharefile = os.path.join(bucketdir, fn) + try: + wks = self.process_share(sharefile) + except (UnknownMutableContainerVersionError, + UnknownImmutableContainerVersionError, + struct.error): + twlog.msg("lease-checker error processing %s" % sharefile) + twlog.err() + which = (storage_index_b32, shnum) + self.state["cycle-to-date"]["corrupt-shares"].append(which) + wks = (1, 1, 1, "unknown") + would_keep_shares.append(wks) + + sharetype = None + if wks: + # use the last share's sharetype as the buckettype + sharetype = wks[3] + rec = self.state["cycle-to-date"]["space-recovered"] + self.increment(rec, "examined-buckets", 1) + if sharetype: + self.increment(rec, "examined-buckets-"+sharetype, 1) + + try: + bucket_diskbytes = s.st_blocks * 512 + except AttributeError: + bucket_diskbytes = 0 # no stat().st_blocks on windows + if sum([wks[0] for wks in would_keep_shares]) == 0: + self.increment_bucketspace("original", bucket_diskbytes, sharetype) + if sum([wks[1] for wks in would_keep_shares]) == 0: + self.increment_bucketspace("configured", bucket_diskbytes, sharetype) + if sum([wks[2] for wks in would_keep_shares]) == 0: + self.increment_bucketspace("actual", bucket_diskbytes, sharetype) + + def process_share(self, sharefilename): + # first, find out what kind of a share it is + f = open(sharefilename, "rb") + prefix = f.read(32) + f.close() + if prefix == MutableShareFile.MAGIC: + sf = MutableShareFile(sharefilename) + else: + # otherwise assume it's immutable + sf = FSBShare(sharefilename) + sharetype = sf.sharetype + now = time.time() + s = self.stat(sharefilename) + + num_leases = 0 + num_valid_leases_original = 0 + num_valid_leases_configured = 0 + expired_leases_configured = [] + + for li in sf.get_leases(): + num_leases += 1 + original_expiration_time = li.get_expiration_time() + grant_renew_time = li.get_grant_renew_time_time() + age = li.get_age() + self.add_lease_age_to_histogram(age) + + # expired-or-not according to original expiration time + if original_expiration_time > now: + num_valid_leases_original += 1 + + # expired-or-not according to our configured age limit + expired = False + if self.mode == "age": + age_limit = original_expiration_time + if self.override_lease_duration is not None: + age_limit = self.override_lease_duration + if age > age_limit: + expired = True + else: + assert self.mode == "cutoff-date" + if grant_renew_time < self.cutoff_date: + expired = True + if sharetype not in self.sharetypes_to_expire: + expired = False + + if expired: + expired_leases_configured.append(li) + else: + num_valid_leases_configured += 1 + + so_far = self.state["cycle-to-date"] + self.increment(so_far["leases-per-share-histogram"], num_leases, 1) + self.increment_space("examined", s, sharetype) + + would_keep_share = [1, 1, 1, sharetype] + + if self.expiration_enabled: + for li in expired_leases_configured: + sf.cancel_lease(li.cancel_secret) + + if num_valid_leases_original == 0: + would_keep_share[0] = 0 + self.increment_space("original", s, sharetype) + + if num_valid_leases_configured == 0: + would_keep_share[1] = 0 + self.increment_space("configured", s, sharetype) + if self.expiration_enabled: + would_keep_share[2] = 0 + self.increment_space("actual", s, sharetype) + + return would_keep_share + + def increment_space(self, a, s, sharetype): + sharebytes = s.st_size + try: + # note that stat(2) says that st_blocks is 512 bytes, and that + # st_blksize is "optimal file sys I/O ops blocksize", which is + # independent of the block-size that st_blocks uses. + diskbytes = s.st_blocks * 512 + except AttributeError: + # the docs say that st_blocks is only on linux. I also see it on + # MacOS. But it isn't available on windows. + diskbytes = sharebytes + so_far_sr = self.state["cycle-to-date"]["space-recovered"] + self.increment(so_far_sr, a+"-shares", 1) + self.increment(so_far_sr, a+"-sharebytes", sharebytes) + self.increment(so_far_sr, a+"-diskbytes", diskbytes) + if sharetype: + self.increment(so_far_sr, a+"-shares-"+sharetype, 1) + self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes) + self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes) + + def increment_bucketspace(self, a, bucket_diskbytes, sharetype): + rec = self.state["cycle-to-date"]["space-recovered"] + self.increment(rec, a+"-diskbytes", bucket_diskbytes) + self.increment(rec, a+"-buckets", 1) + if sharetype: + self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes) + self.increment(rec, a+"-buckets-"+sharetype, 1) + + def increment(self, d, k, delta=1): + if k not in d: + d[k] = 0 + d[k] += delta + + def add_lease_age_to_histogram(self, age): + bucket_interval = 24*60*60 + bucket_number = int(age/bucket_interval) + bucket_start = bucket_number * bucket_interval + bucket_end = bucket_start + bucket_interval + k = (bucket_start, bucket_end) + self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1) + + def convert_lease_age_histogram(self, lah): + # convert { (minage,maxage) : count } into [ (minage,maxage,count) ] + # since the former is not JSON-safe (JSON dictionaries must have + # string keys). + json_safe_lah = [] + for k in sorted(lah): + (minage,maxage) = k + json_safe_lah.append( (minage, maxage, lah[k]) ) + return json_safe_lah + + def finished_cycle(self, cycle): + # add to our history state, prune old history + h = {} + + start = self.state["current-cycle-start-time"] + now = time.time() + h["cycle-start-finish-times"] = (start, now) + h["expiration-enabled"] = self.expiration_enabled + h["configured-expiration-mode"] = (self.mode, + self.override_lease_duration, + self.cutoff_date, + self.sharetypes_to_expire) + + s = self.state["cycle-to-date"] + + # state["lease-age-histogram"] is a dictionary (mapping + # (minage,maxage) tuple to a sharecount), but we report + # self.get_state()["lease-age-histogram"] as a list of + # (min,max,sharecount) tuples, because JSON can handle that better. + # We record the list-of-tuples form into the history for the same + # reason. + lah = self.convert_lease_age_histogram(s["lease-age-histogram"]) + h["lease-age-histogram"] = lah + h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy() + h["corrupt-shares"] = s["corrupt-shares"][:] + # note: if ["shares-recovered"] ever acquires an internal dict, this + # copy() needs to become a deepcopy + h["space-recovered"] = s["space-recovered"].copy() + + history = pickle.load(self.historyfp.getContent()) + history[cycle] = h + while len(history) > 10: + oldcycles = sorted(history.keys()) + del history[oldcycles[0]] + self.historyfp.setContent(pickle.dumps(history)) + + def get_state(self): + """In addition to the crawler state described in + ShareCrawler.get_state(), I return the following keys which are + specific to the lease-checker/expirer. Note that the non-history keys + (with 'cycle' in their names) are only present if a cycle is + currently running. If the crawler is between cycles, it appropriate + to show the latest item in the 'history' key instead. Also note that + each history item has all the data in the 'cycle-to-date' value, plus + cycle-start-finish-times. + + cycle-to-date: + expiration-enabled + configured-expiration-mode + lease-age-histogram (list of (minage,maxage,sharecount) tuples) + leases-per-share-histogram + corrupt-shares (list of (si_b32,shnum) tuples, minimal verification) + space-recovered + + estimated-remaining-cycle: + # Values may be None if not enough data has been gathered to + # produce an estimate. + space-recovered + + estimated-current-cycle: + # cycle-to-date plus estimated-remaining. Values may be None if + # not enough data has been gathered to produce an estimate. + space-recovered + + history: maps cyclenum to a dict with the following keys: + cycle-start-finish-times + expiration-enabled + configured-expiration-mode + lease-age-histogram + leases-per-share-histogram + corrupt-shares + space-recovered + + The 'space-recovered' structure is a dictionary with the following + keys: + # 'examined' is what was looked at + examined-buckets, examined-buckets-mutable, examined-buckets-immutable + examined-shares, -mutable, -immutable + examined-sharebytes, -mutable, -immutable + examined-diskbytes, -mutable, -immutable + + # 'actual' is what was actually deleted + actual-buckets, -mutable, -immutable + actual-shares, -mutable, -immutable + actual-sharebytes, -mutable, -immutable + actual-diskbytes, -mutable, -immutable + + # would have been deleted, if the original lease timer was used + original-buckets, -mutable, -immutable + original-shares, -mutable, -immutable + original-sharebytes, -mutable, -immutable + original-diskbytes, -mutable, -immutable + + # would have been deleted, if our configured max_age was used + configured-buckets, -mutable, -immutable + configured-shares, -mutable, -immutable + configured-sharebytes, -mutable, -immutable + configured-diskbytes, -mutable, -immutable + + """ + progress = self.get_progress() + + state = ShareCrawler.get_state(self) # does a shallow copy + history = pickle.load(self.historyfp.getContent()) + state["history"] = history + + if not progress["cycle-in-progress"]: + del state["cycle-to-date"] + return state + + so_far = state["cycle-to-date"].copy() + state["cycle-to-date"] = so_far + + lah = so_far["lease-age-histogram"] + so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah) + so_far["expiration-enabled"] = self.expiration_enabled + so_far["configured-expiration-mode"] = (self.mode, + self.override_lease_duration, + self.cutoff_date, + self.sharetypes_to_expire) + + so_far_sr = so_far["space-recovered"] + remaining_sr = {} + remaining = {"space-recovered": remaining_sr} + cycle_sr = {} + cycle = {"space-recovered": cycle_sr} + + if progress["cycle-complete-percentage"] > 0.0: + pc = progress["cycle-complete-percentage"] / 100.0 + m = (1-pc)/pc + for a in ("actual", "original", "configured", "examined"): + for b in ("buckets", "shares", "sharebytes", "diskbytes"): + for c in ("", "-mutable", "-immutable"): + k = a+"-"+b+c + remaining_sr[k] = m * so_far_sr[k] + cycle_sr[k] = so_far_sr[k] + remaining_sr[k] + else: + for a in ("actual", "original", "configured", "examined"): + for b in ("buckets", "shares", "sharebytes", "diskbytes"): + for c in ("", "-mutable", "-immutable"): + k = a+"-"+b+c + remaining_sr[k] = None + cycle_sr[k] = None + + state["estimated-remaining-cycle"] = remaining + state["estimated-current-cycle"] = cycle + return state } [Changes I have made that aren't necessary for the test_backends.py suite to pass. wilcoxjg@gmail.com**20110809203811 Ignore-this: 117d49047456013f382ffc0559f00c40 ] { hunk ./src/allmydata/storage/crawler.py 1 - import os, time, struct import cPickle as pickle from twisted.internet import reactor hunk ./src/allmydata/storage/crawler.py 6 from twisted.application import service from allmydata.storage.common import si_b2a -from allmydata.util import fileutil class TimeSliceExceeded(Exception): pass hunk ./src/allmydata/storage/crawler.py 11 class ShareCrawler(service.MultiService): - """A ShareCrawler subclass is attached to a StorageServer, and + """A subclass of ShareCrawler is attached to a StorageServer, and periodically walks all of its shares, processing each one in some fashion. This crawl is rate-limited, to reduce the IO burden on the host, since large servers can easily have a terabyte of shares, in several hunk ./src/allmydata/storage/crawler.py 29 We assume that the normal upload/download/get_buckets traffic of a tahoe grid will cause the prefixdir contents to be mostly cached in the kernel, or that the number of buckets in each prefixdir will be small enough to - load quickly. A 1TB allmydata.com server was measured to have 2.56M + load quickly. A 1TB allmydata.com server was measured to have 2.56 * 10^6 buckets, spread into the 1024 prefixdirs, with about 2500 buckets per prefix. On this server, each prefixdir took 130ms-200ms to list the first time, and 17ms to list the second time. hunk ./src/allmydata/storage/crawler.py 66 cpu_slice = 1.0 # use up to 1.0 seconds before yielding minimum_cycle_time = 300 # don't run a cycle faster than this - def __init__(self, server, statefile, allowed_cpu_percentage=None): + def __init__(self, statefp, allowed_cpu_percentage=None): service.MultiService.__init__(self) if allowed_cpu_percentage is not None: self.allowed_cpu_percentage = allowed_cpu_percentage hunk ./src/allmydata/storage/crawler.py 70 - self.server = server - self.sharedir = server.sharedir - self.statefile = statefile + self.statefp = statefp self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2] for i in range(2**10)] self.prefixes.sort() hunk ./src/allmydata/storage/crawler.py 190 # of the last bucket to be processed, or # None if we are sleeping between cycles try: - f = open(self.statefile, "rb") - state = pickle.load(f) - f.close() + state = pickle.loads(self.statefp.getContent()) except EnvironmentError: state = {"version": 1, "last-cycle-finished": None, hunk ./src/allmydata/storage/crawler.py 226 else: last_complete_prefix = self.prefixes[lcpi] self.state["last-complete-prefix"] = last_complete_prefix - tmpfile = self.statefile + ".tmp" - f = open(tmpfile, "wb") - pickle.dump(self.state, f) - f.close() - fileutil.move_into_place(tmpfile, self.statefile) + self.statefp.setContent(pickle.dumps(self.state)) def startService(self): # arrange things to look like we were just sleeping, so hunk ./src/allmydata/storage/crawler.py 438 minimum_cycle_time = 60*60 # we don't need this more than once an hour - def __init__(self, server, statefile, num_sample_prefixes=1): - ShareCrawler.__init__(self, server, statefile) + def __init__(self, statefp, num_sample_prefixes=1): + ShareCrawler.__init__(self, statefp) self.num_sample_prefixes = num_sample_prefixes def add_initial_state(self): hunk ./src/allmydata/storage/crawler.py 478 old_cycle,buckets = self.state["storage-index-samples"][prefix] if old_cycle != cycle: del self.state["storage-index-samples"][prefix] - hunk ./src/allmydata/storage/lease.py 17 def get_expiration_time(self): return self.expiration_time + def get_grant_renew_time_time(self): # hack, based upon fixed 31day expiration period return self.expiration_time - 31*24*60*60 hunk ./src/allmydata/storage/lease.py 21 + def get_age(self): return time.time() - self.get_grant_renew_time_time() hunk ./src/allmydata/storage/lease.py 32 self.expiration_time) = struct.unpack(">L32s32sL", data) self.nodeid = None return self + def to_immutable_data(self): return struct.pack(">L32s32sL", self.owner_num, hunk ./src/allmydata/storage/lease.py 45 int(self.expiration_time), self.renew_secret, self.cancel_secret, self.nodeid) + def from_mutable_data(self, data): (self.owner_num, self.expiration_time, } [add __init__.py to backend and core and null wilcoxjg@gmail.com**20110810033751 Ignore-this: 1c72bc54951033ab433c38de58bdc39c ] { addfile ./src/allmydata/storage/backends/__init__.py addfile ./src/allmydata/storage/backends/null/__init__.py } [whitespace-cleanup wilcoxjg@gmail.com**20110810170847 Ignore-this: 7a278e7c87c6fcd2e5ed783667c8b746 ] { hunk ./src/allmydata/interfaces.py 1 - from zope.interface import Interface from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \ ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable hunk ./src/allmydata/storage/backends/das/core.py 47 self._setup_lease_checkerf(expiration_policy) def _setup_storage(self, storedir, readonly, reserved_space): - precondition(isinstance(storedir, FilePath), storedir, FilePath) + precondition(isinstance(storedir, FilePath), storedir, FilePath) self.storedir = storedir self.readonly = readonly self.reserved_space = int(reserved_space) hunk ./src/allmydata/storage/backends/das/core.py 89 except UnlistableError: # There is no shares directory at all. return frozenset() - + def get_shares(self, storageindex): """ Generate ImmutableShare objects for shares we have for this storageindex. ("Shares we have" means completed ones, excluding hunk ./src/allmydata/storage/backends/das/core.py 104 except UnlistableError: # There is no shares directory at all. pass - + def get_available_space(self): if self.readonly: return 0 hunk ./src/allmydata/storage/backends/das/core.py 122 def set_storage_server(self, ss): self.ss = ss - + # each share file (in storage/shares/$SI/$SHNUM) contains lease information # and share data. The share data is accessed by RIBucketWriter.write and hunk ./src/allmydata/storage/backends/das/core.py 223 # above-mentioned conditions. pass pass - + def stat(self): return filepath.stat(self.finalhome.path)[stat.ST_SIZE] hunk ./src/allmydata/storage/backends/das/core.py 309 num_leases = self._read_num_leases(self.incominghome) self._write_lease_record(self.incominghome, num_leases, lease_info) self._write_num_leases(self.incominghome, num_leases+1) - + def renew_lease(self, renew_secret, new_expire_time): for i,lease in enumerate(self.get_leases()): if constant_time_compare(lease.renew_secret, renew_secret): hunk ./src/allmydata/storage/common.py 1 - import os.path from allmydata.util import base32 hunk ./src/allmydata/storage/server.py 149 if self.readonly_storage: return 0 - return fileutil.get_available_space(self.storedir, self.reserved_space) + return fileutil.get_available_space(self.sharedir, self.reserved_space) def allocated_size(self): space = 0 hunk ./src/allmydata/storage/server.py 346 self.count("writev") si_s = si_b2a(storageindex) log.msg("storage: slot_writev %s" % si_s) - + (write_enabler, renew_secret, cancel_secret) = secrets # shares exist if there is a file for them bucketdir = si_si2dir(self.sharedir, storageindex) } [das/__init__.py wilcoxjg@gmail.com**20110810173849 Ignore-this: bdb730cba1d53d8827ef5fef65958471 ] addfile ./src/allmydata/storage/backends/das/__init__.py [test_backends.py: cleaned whitespace and removed unused variables wilcoxjg@gmail.com**20110810201041 Ignore-this: d000d4a7d3a0793464306e9d09437be6 ] { hunk ./src/allmydata/test/test_backends.py 13 from allmydata.storage.common import si_si2dir # The following share file content was generated with # storage.immutable.ShareFile from Tahoe-LAFS v1.8.2 -# with share data == 'a'. The total size of this input +# with share data == 'a'. The total size of this input # is 85 bytes. shareversionnumber = '\x00\x00\x00\x01' sharedatalength = '\x00\x00\x00\x01' hunk ./src/allmydata/test/test_backends.py 29 cancelsecret + expirationtime + nextlease share_data = containerdata + client_data testnodeid = 'testnodeidxxxxxxxxxx' -expiration_policy = {'enabled' : False, +expiration_policy = {'enabled' : False, 'mode' : 'age', 'override_lease_duration' : None, 'cutoff_date' : None, hunk ./src/allmydata/test/test_backends.py 37 class MockFileSystem(unittest.TestCase): - """ I simulate a filesystem that the code under test can use. I simulate - just the parts of the filesystem that the current implementation of DAS + """ I simulate a filesystem that the code under test can use. I simulate + just the parts of the filesystem that the current implementation of DAS backend needs. """ def setUp(self): # Make patcher, patch, and make effects for fs using functions. hunk ./src/allmydata/test/test_backends.py 43 msg( "%s.setUp()" % (self,)) - self.mockedfilepaths = {} + self.mockedfilepaths = {} #keys are pathnames, values are MockFilePath objects. This is necessary because #MockFilePath behavior sometimes depends on the filesystem. Where it does, #self.mockedfilepaths has the relevent info. hunk ./src/allmydata/test/test_backends.py 56 self.sharefinalname = self.sharedirfinalname.child('0') self.FilePathFake = mock.patch('allmydata.storage.backends.das.core.FilePath', new = MockFilePath ) - FakePath = self.FilePathFake.__enter__() + self.FilePathFake.__enter__() self.BCountingCrawler = mock.patch('allmydata.storage.backends.das.core.BucketCountingCrawler') FakeBCC = self.BCountingCrawler.__enter__() hunk ./src/allmydata/test/test_backends.py 89 def tearDown(self): msg( "%s.tearDown()" % (self,)) - FakePath = self.FilePathFake.__exit__() + self.FilePathFake.__exit__() self.mockedfilepaths = {} hunk ./src/allmydata/test/test_backends.py 116 self.mockedfilepaths[self.path].fileobject = self.fileobject self.mockedfilepaths[self.path].existance = self.existance self.setparents() - + def create(self): # This method chokes if there's a pre-existing file! if self.mockedfilepaths[self.path].fileobject: hunk ./src/allmydata/test/test_backends.py 122 raise OSError else: - self.fileobject = MockFileObject(contentstring) self.existance = True self.mockedfilepaths[self.path].fileobject = self.fileobject self.mockedfilepaths[self.path].existance = self.existance hunk ./src/allmydata/test/test_backends.py 125 - self.setparents() + self.setparents() def open(self, mode='r'): # XXX Makes no use of mode. hunk ./src/allmydata/test/test_backends.py 151 childrenfromffs = [ffp for ffp in childrenfromffs if not ffp.path.endswith(self.path)] childrenfromffs = [ffp for ffp in childrenfromffs if ffp.exists()] self.spawn = frozenset(childrenfromffs) - return self.spawn + return self.spawn def parent(self): if self.mockedfilepaths.has_key(self.antecedent): hunk ./src/allmydata/test/test_backends.py 163 def parents(self): antecedents = [] def f(fps, antecedents): - newfps = os.path.split(fps)[0] + newfps = os.path.split(fps)[0] if newfps: antecedents.append(newfps) f(newfps, antecedents) hunk ./src/allmydata/test/test_backends.py 256 @mock.patch('os.listdir') @mock.patch('os.path.isdir') def test_write_share(self, mockisdir, mocklistdir, mockopen, mockmkdir): - """ Write a new share. """ + """ Write a new share. This tests that StorageServer's remote_allocate_buckets generates the correct return types when given test-vector arguments. that bs is of the correct type is verified by bs[0] exercising remote_write without error. """ alreadygot, bs = self.ss.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, set((0,)), 1, mock.Mock()) bs[0].remote_write(0, 'a') hunk ./src/allmydata/test/test_backends.py 275 class TestServerAndFSBackend(MockFileSystem, ReallyEqualMixin): - """ This tests both the StorageServer and the DAS backend together. """ + """ This tests both the StorageServer and the DAS backend together. """ def setUp(self): MockFileSystem.setUp(self) try: hunk ./src/allmydata/test/test_backends.py 292 @mock.patch('allmydata.util.fileutil.get_available_space') def test_out_of_space(self, mockget_available_space, mocktime): mocktime.return_value = 0 - + def call_get_available_space(dir, reserve): return 0 hunk ./src/allmydata/test/test_backends.py 310 mocktime.return_value = 0 # Inspect incoming and fail unless it's empty. incomingset = self.ss.backend.get_incoming_shnums('teststorage_index') - + self.failUnlessReallyEqual(incomingset, frozenset()) hunk ./src/allmydata/test/test_backends.py 312 - + # Populate incoming with the sharenum: 0. alreadygot, bs = self.ss.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, frozenset((0,)), 1, mock.Mock()) hunk ./src/allmydata/test/test_backends.py 329 # has been called. self.failIf(bsa) - # Test allocated size. + # Test allocated size. spaceint = self.ss.allocated_size() self.failUnlessReallyEqual(spaceint, 1) hunk ./src/allmydata/test/test_backends.py 335 # Write 'a' to shnum 0. Only tested together with close and read. bs[0].remote_write(0, 'a') - + # Preclose: Inspect final, failUnless nothing there. self.failUnlessReallyEqual(len(list(self.backend.get_shares('teststorage_index'))), 0) bs[0].remote_close() hunk ./src/allmydata/test/test_backends.py 349 # Exercise the case that the share we're asking to allocate is # already (completely) uploaded. self.ss.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, set((0,)), 1, mock.Mock()) - + def test_read_old_share(self): """ This tests whether the code correctly finds and reads hunk ./src/allmydata/test/test_backends.py 360 StorageServer object. """ # Contruct a file with the appropriate contents in the mockfilesystem. datalen = len(share_data) - finalhome = si_si2dir(self.basedir, 'teststorage_index').child(str(0)) + finalhome = si_si2dir(self.basedir, 'teststorage_index').child(str(0)) finalhome.setContent(share_data) # Now begin the test. } [test_backends.py, backends/das -> backends/disk: renaming backend das to disk wilcoxjg@gmail.com**20110829184834 Ignore-this: c65f84cceb14e6001c6f6b1ddc9b508d ] { move ./src/allmydata/storage/backends/das ./src/allmydata/storage/backends/disk hunk ./src/allmydata/storage/backends/disk/core.py 22 from allmydata.storage.immutable import BucketWriter, BucketReader from allmydata.storage.crawler import BucketCountingCrawler from allmydata.util.hashutil import constant_time_compare -from allmydata.storage.backends.das.expirer import LeaseCheckingCrawler +from allmydata.storage.backends.disk.expirer import LeaseCheckingCrawler _pyflakes_hush = [si_b2a, si_a2b, si_si2dir] # re-exported # storage/ hunk ./src/allmydata/storage/backends/disk/core.py 37 # $SHARENUM matches this regex: NUM_RE=re.compile("^[0-9]+$") -class DASCore(Backend): +class DiskCore(Backend): implements(IStorageBackend) def __init__(self, storedir, expiration_policy, readonly=False, reserved_space=0): Backend.__init__(self) hunk ./src/allmydata/test/test_backends.py 8 import mock # This is the code that we're going to be testing. from allmydata.storage.server import StorageServer -from allmydata.storage.backends.das.core import DASCore +from allmydata.storage.backends.disk.core import DiskCore from allmydata.storage.backends.null.core import NullCore from allmydata.storage.common import si_si2dir # The following share file content was generated with hunk ./src/allmydata/test/test_backends.py 38 class MockFileSystem(unittest.TestCase): """ I simulate a filesystem that the code under test can use. I simulate - just the parts of the filesystem that the current implementation of DAS + just the parts of the filesystem that the current implementation of Disk backend needs. """ def setUp(self): # Make patcher, patch, and make effects for fs using functions. hunk ./src/allmydata/test/test_backends.py 55 self.shareincomingname = self.sharedirincomingname.child('0') self.sharefinalname = self.sharedirfinalname.child('0') - self.FilePathFake = mock.patch('allmydata.storage.backends.das.core.FilePath', new = MockFilePath ) + self.FilePathFake = mock.patch('allmydata.storage.backends.disk.core.FilePath', new = MockFilePath ) self.FilePathFake.__enter__() hunk ./src/allmydata/test/test_backends.py 58 - self.BCountingCrawler = mock.patch('allmydata.storage.backends.das.core.BucketCountingCrawler') + self.BCountingCrawler = mock.patch('allmydata.storage.backends.disk.core.BucketCountingCrawler') FakeBCC = self.BCountingCrawler.__enter__() FakeBCC.side_effect = self.call_FakeBCC hunk ./src/allmydata/test/test_backends.py 62 - self.LeaseCheckingCrawler = mock.patch('allmydata.storage.backends.das.core.LeaseCheckingCrawler') + self.LeaseCheckingCrawler = mock.patch('allmydata.storage.backends.disk.core.LeaseCheckingCrawler') FakeLCC = self.LeaseCheckingCrawler.__enter__() FakeLCC.side_effect = self.call_FakeLCC hunk ./src/allmydata/test/test_backends.py 70 GetSpace = self.get_available_space.__enter__() GetSpace.side_effect = self.call_get_available_space - self.statforsize = mock.patch('allmydata.storage.backends.das.core.filepath.stat') + self.statforsize = mock.patch('allmydata.storage.backends.disk.core.filepath.stat') getsize = self.statforsize.__enter__() getsize.side_effect = self.call_statforsize hunk ./src/allmydata/test/test_backends.py 271 """ This tests whether a server instance can be constructed with a filesystem backend. To pass the test, it mustn't use the filesystem outside of its configured storedir. """ - StorageServer(testnodeid, backend=DASCore(self.storedir, expiration_policy)) + StorageServer(testnodeid, backend=DiskCore(self.storedir, expiration_policy)) class TestServerAndFSBackend(MockFileSystem, ReallyEqualMixin): hunk ./src/allmydata/test/test_backends.py 275 - """ This tests both the StorageServer and the DAS backend together. """ + """ This tests both the StorageServer and the Disk backend together. """ def setUp(self): MockFileSystem.setUp(self) try: hunk ./src/allmydata/test/test_backends.py 279 - self.backend = DASCore(self.storedir, expiration_policy) + self.backend = DiskCore(self.storedir, expiration_policy) self.ss = StorageServer(testnodeid, self.backend) hunk ./src/allmydata/test/test_backends.py 282 - self.backendwithreserve = DASCore(self.storedir, expiration_policy, reserved_space = 1) + self.backendwithreserve = DiskCore(self.storedir, expiration_policy, reserved_space = 1) self.sswithreserve = StorageServer(testnodeid, self.backendwithreserve) except: MockFileSystem.tearDown(self) } [disk/core.py: slips past pyflakes without causing errors wilcoxjg@gmail.com**20110829213631 Ignore-this: a3758ee3bd5da2d4d76fd3cd0de64476 ] { hunk ./src/allmydata/storage/backends/disk/core.py 1 -import re, weakref, struct, time, stat -from twisted.application import service +import re, struct, stat, os from twisted.python.filepath import UnlistableError from twisted.python import filepath from twisted.python.filepath import FilePath hunk ./src/allmydata/storage/backends/disk/core.py 7 from zope.interface import implements -import allmydata # for __full_version__ from allmydata.interfaces import IStorageBackend from allmydata.storage.backends.base import Backend from allmydata.storage.common import si_b2a, si_a2b, si_si2dir hunk ./src/allmydata/storage/backends/disk/core.py 11 from allmydata.util.assertutil import precondition -from allmydata.interfaces import IStorageBackend -from allmydata.util import fileutil, idlib, log, time_format +from allmydata.util import fileutil, log from allmydata.util.fileutil import fp_make_dirs from allmydata.storage.lease import LeaseInfo hunk ./src/allmydata/storage/backends/disk/core.py 14 -from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ - create_mutable_sharefile from allmydata.storage.immutable import BucketWriter, BucketReader from allmydata.storage.crawler import BucketCountingCrawler from allmydata.util.hashutil import constant_time_compare hunk ./src/allmydata/storage/backends/disk/core.py 18 from allmydata.storage.backends.disk.expirer import LeaseCheckingCrawler +from allmydata.storage.common import UnknownImmutableContainerVersionError, DataTooLargeError _pyflakes_hush = [si_b2a, si_a2b, si_si2dir] # re-exported # storage/ } [null/core.py, storage/common.py, storage/immutable.py: pyflaked clean wilcoxjg@gmail.com**20110829214816 Ignore-this: 526f1df98928f52a6df718d7fb510911 ] { hunk ./src/allmydata/storage/backends/null/core.py 1 +import os, struct, stat +from allmydata.util.assertutil import precondition +from allmydata.storage.lease import LeaseInfo +from allmydata.util.hashutil import constant_time_compare from allmydata.storage.backends.base import Backend hunk ./src/allmydata/storage/backends/null/core.py 6 -from allmydata.storage.immutable import BucketWriter, BucketReader +from allmydata.storage.immutable import BucketWriter from zope.interface import implements from allmydata.interfaces import IStorageBackend, IStorageBackendShare hunk ./src/allmydata/storage/common.py 1 -import os.path from allmydata.util import base32 class DataTooLargeError(Exception): hunk ./src/allmydata/storage/immutable.py 9 from allmydata.interfaces import RIBucketWriter, RIBucketReader from allmydata.util import base32, log from allmydata.util.assertutil import precondition -from allmydata.util.hashutil import constant_time_compare -from allmydata.storage.lease import LeaseInfo -from allmydata.storage.common import UnknownImmutableContainerVersionError, \ - DataTooLargeError class BucketWriter(Referenceable): implements(RIBucketWriter) } Context: [test_mutable.Update: only upload the files needed for each test. refs #1500 Brian Warner **20110829072717 Ignore-this: 4d2ab4c7523af9054af7ecca9c3d9dc7 This first step shaves 15% off the runtime: from 139s to 119s on my laptop. It also fixes a couple of places where a Deferred was being dropped, which would cause two tests to run in parallel and also confuse error reporting. ] [Let Uploader retain History instead of passing it into upload(). Fixes #1079. Brian Warner **20110829063246 Ignore-this: 3902c58ec12bd4b2d876806248e19f17 This consistently records all immutable uploads in the Recent Uploads And Downloads page, regardless of code path. Previously, certain webapi upload operations (like PUT /uri/$DIRCAP/newchildname) failed to pass the History object and were left out. ] [Fix mutable publish/retrieve timing status displays. Fixes #1505. Brian Warner **20110828232221 Ignore-this: 4080ce065cf481b2180fd711c9772dd6 publish: * encrypt and encode times are cumulative, not just current-segment retrieve: * same for decrypt and decode times * update "current status" to include segment number * set status to Finished/Failed when download is complete * set progress to 1.0 when complete More improvements to consider: * progress is currently 0% or 100%: should calculate how many segments are involved (remembering retrieve can be less than the whole file) and set it to a fraction * "fetch" time is fuzzy: what we want is to know how much of the delay is not our own fault, but since we do decode/decrypt work while waiting for more shares, it's not straightforward ] [Teach 'tahoe debug catalog-shares about MDMF. Closes #1507. Brian Warner **20110828080931 Ignore-this: 56ef2951db1a648353d7daac6a04c7d1 ] [debug.py: remove some dead comments Brian Warner **20110828074556 Ignore-this: 40e74040dd4d14fd2f4e4baaae506b31 ] [hush pyflakes Brian Warner **20110828074254 Ignore-this: bef9d537a969fa82fe4decc4ba2acb09 ] [MutableFileNode.set_downloader_hints: never depend upon order of dict.values() Brian Warner **20110828074103 Ignore-this: caaf1aa518dbdde4d797b7f335230faa The old code was calculating the "extension parameters" (a list) from the downloader hints (a dictionary) with hints.values(), which is not stable, and would result in corrupted filecaps (with the 'k' and 'segsize' hints occasionally swapped). The new code always uses [k,segsize]. ] [layout.py: fix MDMF share layout documentation Brian Warner **20110828073921 Ignore-this: 3f13366fed75b5e31b51ae895450a225 ] [teach 'tahoe debug dump-share' about MDMF and offsets. refs #1507 Brian Warner **20110828073834 Ignore-this: 3a9d2ef9c47a72bf1506ba41199a1dea ] [test_mutable.Version.test_debug: use splitlines() to fix buildslaves Brian Warner **20110828064728 Ignore-this: c7f6245426fc80b9d1ae901d5218246a Any slave running in a directory with spaces in the name was miscounting shares, causing the test to fail. ] [test_mutable.Version: exercise 'tahoe debug find-shares' on MDMF. refs #1507 Brian Warner **20110828005542 Ignore-this: cb20bea1c28bfa50a72317d70e109672 Also changes NoNetworkGrid to put shares in storage/shares/ . ] [test_mutable.py: oops, missed a .todo Brian Warner **20110828002118 Ignore-this: fda09ae86481352b7a627c278d2a3940 ] [test_mutable: merge davidsarah's patch with my Version refactorings warner@lothar.com**20110827235707 Ignore-this: b5aaf481c90d99e33827273b5d118fd0 ] [Make the immutable/read-only constraint checking for MDMF URIs identical to that for SSK URIs. refs #393 david-sarah@jacaranda.org**20110823012720 Ignore-this: e1f59d7ff2007c81dbef2aeb14abd721 ] [Additional tests for MDMF URIs and for zero-length files. refs #393 david-sarah@jacaranda.org**20110823011532 Ignore-this: a7cc0c09d1d2d72413f9cd227c47a9d5 ] [Additional tests for zero-length partial reads and updates to mutable versions. refs #393 david-sarah@jacaranda.org**20110822014111 Ignore-this: 5fc6f4d06e11910124e4a277ec8a43ea ] [test_mutable.Version: factor out some expensive uploads, save 25% runtime Brian Warner **20110827232737 Ignore-this: ea37383eb85ea0894b254fe4dfb45544 ] [SDMF: update filenode with correct k/N after Retrieve. Fixes #1510. Brian Warner **20110827225031 Ignore-this: b50ae6e1045818c400079f118b4ef48 Without this, we get a regression when modifying a mutable file that was created with more shares (larger N) than our current tahoe.cfg . The modification attempt creates new versions of the (0,1,..,newN-1) shares, but leaves the old versions of the (newN,..,oldN-1) shares alone (and throws a assertion error in SDMFSlotWriteProxy.finish_publishing in the process). The mixed versions that result (some shares with e.g. N=10, some with N=20, such that both versions are recoverable) cause problems for the Publish code, even before MDMF landed. Might be related to refs #1390 and refs #1042. ] [layout.py: annotate assertion to figure out 'tahoe backup' failure Brian Warner **20110827195253 Ignore-this: 9b92b954e3ed0d0f80154fff1ff674e5 ] [Add 'tahoe debug dump-cap' support for MDMF, DIR2-CHK, DIR2-MDMF. refs #1507. Brian Warner **20110827195048 Ignore-this: 61c6af5e33fc88e0251e697a50addb2c This also adds tests for all those cases, and fixes an omission in uri.py that broke parsing of DIR2-MDMF-Verifier and DIR2-CHK-Verifier. ] [MDMF: more writable/writeable consistentifications warner@lothar.com**20110827190602 Ignore-this: 22492a9e20c1819ddb12091062888b55 ] [MDMF: s/Writable/Writeable/g, for consistency with existing SDMF code warner@lothar.com**20110827183357 Ignore-this: 9dd312acedbdb2fc2f7bef0d0fb17c0b ] [setup.cfg: remove no-longer-supported test_mac_diskimage alias. refs #1479 david-sarah@jacaranda.org**20110826230345 Ignore-this: 40e908b8937322a290fb8012bfcad02a ] [test_mutable.Update: increase timeout from 120s to 400s, slaves are failing Brian Warner **20110825230140 Ignore-this: 101b1924a30cdbda9b2e419e95ca15ec ] [tests: fix check_memory test zooko@zooko.com**20110825201116 Ignore-this: 4d66299fa8cb61d2ca04b3f45344d835 fixes #1503 ] [TAG allmydata-tahoe-1.9.0a1 warner@lothar.com**20110825161122 Ignore-this: 3cbf49f00dbda58189f893c427f65605 ] Patch bundle hash: 296daba999522f770510cf12485f85b0e4eccf46