move IWorkQueue into allmydata.interfaces, give VirtualDrive an uploader

This commit is contained in:
Brian Warner 2007-01-21 15:15:31 -07:00
parent 430b3a03fc
commit 81d093b649
4 changed files with 133 additions and 115 deletions

View File

@ -8,8 +8,8 @@ from allmydata.filetree.interfaces import (
NoSuchDirectoryError, NoSuchChildError, PathAlreadyExistsError,
PathDoesNotExistError,
)
from allmydata.upload import IUploadable
from allmydata.interfaces import IDownloader
from allmydata.interfaces import (IDownloader, IUploadable, IUploader,
IWorkQueue)
from allmydata.filetree.nodemaker import NodeMaker
@ -74,10 +74,14 @@ class SubTreeMaker(object):
class VirtualDrive(object):
implements(IVirtualDrive)
def __init__(self, workqueue, downloader, root_node):
def __init__(self, workqueue, downloader, uploader, root_node):
assert IWorkQueue(workqueue)
assert IDownloader(downloader)
assert IUploader(uploader)
assert INode(root_node)
self.workqueue = workqueue
workqueue.set_vdrive(self)
workqueue.set_uploader(uploader)
# TODO: queen?
self.queen = None
self.root_node = root_node

View File

@ -223,3 +223,115 @@ class IUploader(Interface):
def upload_ssk(write_capability, new_version, uploadable):
pass # TODO
class IWorkQueue(Interface):
"""Each filetable root is associated a work queue, which is persisted on
disk and contains idempotent actions that need to be performed. After
each action is completed, it is removed from the queue.
The queue is broken up into several sections. First are the 'upload'
steps. After this are the 'add_subpath' commands. The last section has
the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe
interspersed with 'upload', maybe after 'add_subpath' and before
'unlink'.
The general idea is that the processing of the work queue could be
interrupted at any time, in the middle of a step, and the next time the
application is started, the step can be re-started without problems. The
placement of the 'retain' commands depends upon how long we might expect
the app to be offline.
tempfiles: the workqueue has a special directory where temporary files
are stored. create_tempfile() generates these files, while steps like
add_upload_chk() use them. The add_delete_tempfile() will delete the
tempfile. All tempfiles are deleted when the workqueue becomes empty,
since at that point none of them can still be referenced.
boxes: there is another special directory where named slots (called
'boxes') hold serialized INode specifications (the strings which are
returned by INode.serialize_node()). Boxes are created by calling
create_boxname(). Boxes are filled either at the time of creation or by
steps like add_upload_chk(). Boxes are used by steps like add_addpath()
and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
well as when the workqueue becomes empty.
"""
def create_tempfile(suffix=""):
"""Return (f, filename), where 'f' is an open filehandle, and
'filename' is a string that can be passed to other workqueue steps to
refer to that same file later. NOTE: 'filename' is not an absolute
path, rather it will be interpreted relative to some directory known
only by the workqueue."""
def create_boxname(contents=None):
"""Return a unique box name (as a string). If 'contents' are
provided, it must be an instance that provides INode, and the
serialized form of the node will be written into the box. Otherwise
the boxname can be used by steps like add_upload_chk to hold the
generated uri."""
def add_upload_chk(source_filename, stash_uri_in_boxname):
"""This step uploads a file to the mesh and obtains a content-based
URI which can be used to later retrieve the same contents ('CHK'
mode). This URI includes unlink rights. It does not mark the file for
retention.
When the upload is complete, the resulting URI is stashed in a 'box'
with the specified name. This is basically a local variable. A later
'add_subpath' step will reference this boxname and retrieve the URI.
"""
def add_upload_ssk(write_capability, previous_version, source_filename):
"""This step uploads a file to the mesh in a way that replaces the
previous version and does not require a change to the ID referenced
by the parent.
"""
def add_queen_update_handle(handle, source_filename):
"""Arrange for a central queen to be notified that the given handle
has been updated with the contents of the given tempfile. This will
send a set_handle() message to the queen."""
def add_retain_ssk(read_capability):
"""Arrange for the given SSK to be kept alive."""
def add_unlink_ssk(write_capability):
"""Stop keeping the given SSK alive."""
def add_retain_uri_from_box(boxname):
"""When executed, this step retrieves the URI from the given box and
marks it for retention: this adds it to a list of all URIs that this
system cares about, which will initiate filechecking/repair for the
file."""
def add_addpath(boxname, path):
"""When executed, this step will retrieve the serialized INode from
the given box and call vdrive.add(path, node) .
"""
def add_unlink_uri(uri):
"""When executed, this step will unlink the data referenced by the
given URI: the unlink rights are used to tell any shareholders to
unlink the file (possibly deleting it), and the URI is removed from
the list that this system cares about, cancelling filechecking/repair
for the file.
All 'unlink' steps are pushed to the end of the queue.
"""
def add_delete_tempfile(filename):
"""This step will delete a tempfile created by create_tempfile."""
def add_delete_box(boxname):
"""When executed, this step deletes the given box."""
# methods for use in unit tests
def flush():
"""Execute all steps in the WorkQueue right away. Return a Deferred
that fires (with self) when the queue is empty.
"""
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""

View File

@ -11,6 +11,9 @@ from allmydata.interfaces import IDownloader, IUploader
from allmydata import workqueue
from cStringIO import StringIO
class FakeMesh(object):
implements(IDownloader, IUploader)
"""
class FakeOpener(object):
implements(IOpener)
@ -332,15 +335,19 @@ class InPairs(unittest.TestCase):
class StubDownloader(object):
implements(IDownloader)
class StubUploader(object):
implements(IUploader)
class Stuff(unittest.TestCase):
def makeVirtualDrive(self, basedir, root_node=None):
wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue"))
dl = StubDownloader()
ul = StubUploader()
if not root_node:
root_node = directory.LocalFileSubTreeNode()
root_node.new("rootdirtree.save")
v = vdrive.VirtualDrive(wq, dl, root_node)
v = vdrive.VirtualDrive(wq, dl, ul, root_node)
return v
def failUnlessListsAreEqual(self, list1, list2):

View File

@ -7,117 +7,7 @@ from allmydata.util.idlib import b2a
from allmydata.Crypto.Cipher import AES
from allmydata.filetree.nodemaker import NodeMaker
from allmydata.filetree.interfaces import INode
class IWorkQueue(Interface):
"""Each filetable root is associated a work queue, which is persisted on
disk and contains idempotent actions that need to be performed. After
each action is completed, it is removed from the queue.
The queue is broken up into several sections. First are the 'upload'
steps. After this are the 'add_subpath' commands. The last section has
the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe
interspersed with 'upload', maybe after 'add_subpath' and before
'unlink'.
The general idea is that the processing of the work queue could be
interrupted at any time, in the middle of a step, and the next time the
application is started, the step can be re-started without problems. The
placement of the 'retain' commands depends upon how long we might expect
the app to be offline.
tempfiles: the workqueue has a special directory where temporary files
are stored. create_tempfile() generates these files, while steps like
add_upload_chk() use them. The add_delete_tempfile() will delete the
tempfile. All tempfiles are deleted when the workqueue becomes empty,
since at that point none of them can still be referenced.
boxes: there is another special directory where named slots (called
'boxes') hold serialized INode specifications (the strings which are
returned by INode.serialize_node()). Boxes are created by calling
create_boxname(). Boxes are filled either at the time of creation or by
steps like add_upload_chk(). Boxes are used by steps like add_addpath()
and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
well as when the workqueue becomes empty.
"""
def create_tempfile(suffix=""):
"""Return (f, filename), where 'f' is an open filehandle, and
'filename' is a string that can be passed to other workqueue steps to
refer to that same file later. NOTE: 'filename' is not an absolute
path, rather it will be interpreted relative to some directory known
only by the workqueue."""
def create_boxname(contents=None):
"""Return a unique box name (as a string). If 'contents' are
provided, it must be an instance that provides INode, and the
serialized form of the node will be written into the box. Otherwise
the boxname can be used by steps like add_upload_chk to hold the
generated uri."""
def add_upload_chk(source_filename, stash_uri_in_boxname):
"""This step uploads a file to the mesh and obtains a content-based
URI which can be used to later retrieve the same contents ('CHK'
mode). This URI includes unlink rights. It does not mark the file for
retention.
When the upload is complete, the resulting URI is stashed in a 'box'
with the specified name. This is basically a local variable. A later
'add_subpath' step will reference this boxname and retrieve the URI.
"""
def add_upload_ssk(write_capability, previous_version, source_filename):
"""This step uploads a file to the mesh in a way that replaces the
previous version and does not require a change to the ID referenced
by the parent.
"""
def add_queen_update_handle(handle, source_filename):
"""Arrange for a central queen to be notified that the given handle
has been updated with the contents of the given tempfile. This will
send a set_handle() message to the queen."""
def add_retain_ssk(read_capability):
"""Arrange for the given SSK to be kept alive."""
def add_unlink_ssk(write_capability):
"""Stop keeping the given SSK alive."""
def add_retain_uri_from_box(boxname):
"""When executed, this step retrieves the URI from the given box and
marks it for retention: this adds it to a list of all URIs that this
system cares about, which will initiate filechecking/repair for the
file."""
def add_addpath(boxname, path):
"""When executed, this step will retrieve the serialized INode from
the given box and call vdrive.add(path, node) .
"""
def add_unlink_uri(uri):
"""When executed, this step will unlink the data referenced by the
given URI: the unlink rights are used to tell any shareholders to
unlink the file (possibly deleting it), and the URI is removed from
the list that this system cares about, cancelling filechecking/repair
for the file.
All 'unlink' steps are pushed to the end of the queue.
"""
def add_delete_tempfile(filename):
"""This step will delete a tempfile created by create_tempfile."""
def add_delete_box(boxname):
"""When executed, this step deletes the given box."""
# methods for use in unit tests
def flush():
"""Execute all steps in the WorkQueue right away. Return a Deferred
that fires (with self) when the queue is empty.
"""
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""
from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader
class Step(object):
@ -161,6 +51,8 @@ class WorkQueue(object):
assert basedir.endswith("workqueue")
self.basedir = basedir
self._node_maker = NodeMaker()
self._uploader = None # filled in later
self._downloader = None # filled in later
self.seqnum = 0
self.tmpdir = os.path.join(basedir, "tmp")
#self.trashdir = os.path.join(basedir, "trash")
@ -196,6 +88,9 @@ class WorkQueue(object):
def set_vdrive(self, vdrive):
self.vdrive = vdrive
def set_uploader(self, uploader):
assert IUploader(uploader)
self._uploader = uploader
def create_tempfile(self, suffix=""):
randomname = b2a(os.urandom(10))