Use deferredutil.HookMixin to simplify callback for tests.
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
parent
decc5ff412
commit
fe0d5f4997
|
@ -13,8 +13,8 @@ from allmydata.util import fileutil
|
||||||
from allmydata.interfaces import IDirectoryNode
|
from allmydata.interfaces import IDirectoryNode
|
||||||
from allmydata.util import log
|
from allmydata.util import log
|
||||||
from allmydata.util.fileutil import precondition_abspath, get_pathinfo
|
from allmydata.util.fileutil import precondition_abspath, get_pathinfo
|
||||||
|
|
||||||
from allmydata.util.assertutil import precondition
|
from allmydata.util.assertutil import precondition
|
||||||
|
from allmydata.util.deferredutil import HookMixin
|
||||||
from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
|
from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
|
||||||
unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
|
unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
|
||||||
from allmydata.immutable.upload import FileName, Data
|
from allmydata.immutable.upload import FileName, Data
|
||||||
|
@ -90,13 +90,14 @@ class MagicFolder(service.MultiService):
|
||||||
return service.MultiService.disownServiceParent(self)
|
return service.MultiService.disownServiceParent(self)
|
||||||
|
|
||||||
|
|
||||||
class QueueMixin(object):
|
class QueueMixin(HookMixin):
|
||||||
def __init__(self, client, local_path_u, db, name):
|
def __init__(self, client, local_path_u, db, name):
|
||||||
self._client = client
|
self._client = client
|
||||||
self._local_path_u = local_path_u
|
self._local_path_u = local_path_u
|
||||||
self._local_path = to_filepath(local_path_u)
|
self._local_path = to_filepath(local_path_u)
|
||||||
self._db = db
|
self._db = db
|
||||||
self._name = name
|
self._name = name
|
||||||
|
self._hooks = {'processed': None}
|
||||||
|
|
||||||
if not self._local_path.exists():
|
if not self._local_path.exists():
|
||||||
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
|
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
|
||||||
|
@ -143,24 +144,10 @@ class QueueMixin(object):
|
||||||
self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
|
self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
|
||||||
else:
|
else:
|
||||||
self._lazy_tail.addCallback(lambda ign: self._process(item))
|
self._lazy_tail.addCallback(lambda ign: self._process(item))
|
||||||
#self._lazy_tail.addErrback(lambda f: self._log("error: %s" % (f,)))
|
self._lazy_tail.addBoth(self._call_hook, 'processed')
|
||||||
|
self._lazy_tail.addErrback(log.err)
|
||||||
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
|
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
|
||||||
|
|
||||||
def _do_callback(self, res):
|
|
||||||
if self._ignore_count == 0:
|
|
||||||
self._callback(res)
|
|
||||||
else:
|
|
||||||
self._ignore_count -= 1
|
|
||||||
return None # intentionally suppress failures, which have already been logged
|
|
||||||
|
|
||||||
def set_callback(self, callback, ignore_count=0):
|
|
||||||
"""
|
|
||||||
set_callback sets a function that will be called after a filesystem change
|
|
||||||
(either local or remote) has been processed, successfully or unsuccessfully.
|
|
||||||
"""
|
|
||||||
self._callback = callback
|
|
||||||
self._ignore_count = ignore_count
|
|
||||||
|
|
||||||
|
|
||||||
class Uploader(QueueMixin):
|
class Uploader(QueueMixin):
|
||||||
def __init__(self, client, local_path_u, db, upload_dircap, inotify, pending_delay):
|
def __init__(self, client, local_path_u, db, upload_dircap, inotify, pending_delay):
|
||||||
|
@ -383,7 +370,6 @@ class Uploader(QueueMixin):
|
||||||
self._log("%r while processing %r" % (f, path_u))
|
self._log("%r while processing %r" % (f, path_u))
|
||||||
return f
|
return f
|
||||||
d.addCallbacks(_succeeded, _failed)
|
d.addCallbacks(_succeeded, _failed)
|
||||||
d.addBoth(self._do_callback)
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _get_metadata(self, encoded_name_u):
|
def _get_metadata(self, encoded_name_u):
|
||||||
|
@ -585,13 +571,13 @@ class Downloader(QueueMixin):
|
||||||
self._log("download failed: %s" % (str(f),))
|
self._log("download failed: %s" % (str(f),))
|
||||||
self._count('objects_download_failed')
|
self._count('objects_download_failed')
|
||||||
return f
|
return f
|
||||||
def remove_from_pending(ign):
|
d.addCallbacks(succeeded, failed)
|
||||||
|
def remove_from_pending(res):
|
||||||
print "REMOVE FROM PENDING _pending = %r, name = %r" % (self._pending, name)
|
print "REMOVE FROM PENDING _pending = %r, name = %r" % (self._pending, name)
|
||||||
self._pending.remove(name)
|
self._pending.remove(name)
|
||||||
print "REMOVE FROM PENDING _after: _pending = %r" % (self._pending,)
|
print "REMOVE FROM PENDING _after: _pending = %r" % (self._pending,)
|
||||||
d.addCallbacks(succeeded, failed)
|
return res
|
||||||
d.addBoth(self._do_callback)
|
d.addBoth(remove_from_pending)
|
||||||
d.addCallback(remove_from_pending)
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _write_downloaded_file(self, name, file_contents):
|
def _write_downloaded_file(self, name, file_contents):
|
||||||
|
|
|
@ -125,8 +125,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||||
|
|
||||||
def _check_move_empty_tree(res):
|
def _check_move_empty_tree(res):
|
||||||
self.mkdir_nonascii(empty_tree_dir)
|
self.mkdir_nonascii(empty_tree_dir)
|
||||||
d2 = defer.Deferred()
|
d2 = self.magicfolder.uploader.set_hook('processed')
|
||||||
self.magicfolder.uploader.set_callback(d2.callback)
|
|
||||||
os.rename(empty_tree_dir, new_empty_tree_dir)
|
os.rename(empty_tree_dir, new_empty_tree_dir)
|
||||||
self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO)
|
self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO)
|
||||||
return d2
|
return d2
|
||||||
|
@ -139,8 +138,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||||
def _check_move_small_tree(res):
|
def _check_move_small_tree(res):
|
||||||
self.mkdir_nonascii(small_tree_dir)
|
self.mkdir_nonascii(small_tree_dir)
|
||||||
fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
|
fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
|
||||||
d2 = defer.Deferred()
|
d2 = self.magicfolder.uploader.set_hook('processed', ignore_count=1)
|
||||||
self.magicfolder.uploader.set_callback(d2.callback, ignore_count=1)
|
|
||||||
os.rename(small_tree_dir, new_small_tree_dir)
|
os.rename(small_tree_dir, new_small_tree_dir)
|
||||||
self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
|
self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
|
||||||
return d2
|
return d2
|
||||||
|
@ -151,8 +149,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
|
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
|
||||||
|
|
||||||
def _check_moved_tree_is_watched(res):
|
def _check_moved_tree_is_watched(res):
|
||||||
d2 = defer.Deferred()
|
d2 = self.magicfolder.uploader.set_hook('processed')
|
||||||
self.magicfolder.uploader.set_callback(d2.callback)
|
|
||||||
fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
|
fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
|
||||||
self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
|
self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
|
||||||
return d2
|
return d2
|
||||||
|
@ -201,8 +198,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||||
d.addCallback(self._create_magicfolder)
|
d.addCallback(self._create_magicfolder)
|
||||||
|
|
||||||
def create_test_file(result):
|
def create_test_file(result):
|
||||||
d2 = defer.Deferred()
|
d2 = self.magicfolder.uploader.set_hook('processed')
|
||||||
self.magicfolder.uploader.set_callback(d2.callback)
|
|
||||||
test_file = abspath_expanduser_unicode(u"what", base=self.local_dir)
|
test_file = abspath_expanduser_unicode(u"what", base=self.local_dir)
|
||||||
fileutil.write(test_file, "meow")
|
fileutil.write(test_file, "meow")
|
||||||
self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE)
|
self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE)
|
||||||
|
@ -274,10 +270,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||||
previously_uploaded = self._get_count('uploader.objects_succeeded')
|
previously_uploaded = self._get_count('uploader.objects_succeeded')
|
||||||
previously_disappeared = self._get_count('uploader.objects_disappeared')
|
previously_disappeared = self._get_count('uploader.objects_disappeared')
|
||||||
|
|
||||||
# Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
|
d = self.magicfolder.uploader.set_hook('processed')
|
||||||
# (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
|
|
||||||
d = defer.Deferred()
|
|
||||||
self.magicfolder.uploader.set_callback(d.callback)
|
|
||||||
|
|
||||||
path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
|
path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
|
||||||
path = to_filepath(path_u)
|
path = to_filepath(path_u)
|
||||||
|
@ -347,8 +340,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||||
|
|
||||||
def Alice_wait_for_upload(result):
|
def Alice_wait_for_upload(result):
|
||||||
print "Alice waits for an upload\n"
|
print "Alice waits for an upload\n"
|
||||||
d2 = defer.Deferred()
|
d2 = self.alice_magicfolder.uploader.set_hook('processed')
|
||||||
self.alice_magicfolder.uploader.set_callback(d2.callback)
|
|
||||||
return d2
|
return d2
|
||||||
d.addCallback(Alice_wait_for_upload)
|
d.addCallback(Alice_wait_for_upload)
|
||||||
d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0))
|
d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0))
|
||||||
|
@ -361,8 +353,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||||
|
|
||||||
def Bob_wait_for_download(result):
|
def Bob_wait_for_download(result):
|
||||||
print "Bob waits for a download\n"
|
print "Bob waits for a download\n"
|
||||||
d2 = defer.Deferred()
|
d2 = self.bob_magicfolder.downloader.set_hook('processed')
|
||||||
self.bob_magicfolder.downloader.set_callback(d2.callback)
|
|
||||||
return d2
|
return d2
|
||||||
d.addCallback(Bob_wait_for_download)
|
d.addCallback(Bob_wait_for_download)
|
||||||
d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 0))
|
d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 0))
|
||||||
|
@ -385,8 +376,8 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||||
|
|
||||||
d.addCallback(Bob_wait_for_download)
|
d.addCallback(Bob_wait_for_download)
|
||||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 2))
|
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 2))
|
||||||
#d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1))
|
d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1))
|
||||||
#d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 1))
|
d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 1))
|
||||||
|
|
||||||
|
|
||||||
def Alice_rewrite_file(result):
|
def Alice_rewrite_file(result):
|
||||||
|
|
|
@ -5,6 +5,7 @@ from foolscap.api import eventually, fireEventually
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
|
|
||||||
from allmydata.util import log
|
from allmydata.util import log
|
||||||
|
from allmydata.util.assertutil import _assert
|
||||||
from allmydata.util.pollmixin import PollMixin
|
from allmydata.util.pollmixin import PollMixin
|
||||||
|
|
||||||
|
|
||||||
|
@ -77,11 +78,13 @@ class HookMixin:
|
||||||
I am a helper mixin that maintains a collection of named hooks, primarily
|
I am a helper mixin that maintains a collection of named hooks, primarily
|
||||||
for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
|
for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
|
||||||
and can then be fired exactly once at the appropriate time by '_call_hook'.
|
and can then be fired exactly once at the appropriate time by '_call_hook'.
|
||||||
|
If 'ignore_count' is given, that number of calls to '_call_hook' will be
|
||||||
|
ignored before firing the hook.
|
||||||
|
|
||||||
I assume a '_hooks' attribute that should set by the class constructor to
|
I assume a '_hooks' attribute that should set by the class constructor to
|
||||||
a dict mapping each valid hook name to None.
|
a dict mapping each valid hook name to None.
|
||||||
"""
|
"""
|
||||||
def set_hook(self, name, d=None):
|
def set_hook(self, name, d=None, ignore_count=0):
|
||||||
"""
|
"""
|
||||||
Called by the hook observer (e.g. by a test).
|
Called by the hook observer (e.g. by a test).
|
||||||
If d is not given, an unfired Deferred is created and returned.
|
If d is not given, an unfired Deferred is created and returned.
|
||||||
|
@ -89,16 +92,20 @@ class HookMixin:
|
||||||
"""
|
"""
|
||||||
if d is None:
|
if d is None:
|
||||||
d = defer.Deferred()
|
d = defer.Deferred()
|
||||||
assert self._hooks[name] is None, self._hooks[name]
|
_assert(ignore_count >= 0, ignore_count=ignore_count)
|
||||||
assert isinstance(d, defer.Deferred), d
|
_assert(name in self._hooks, name=name)
|
||||||
self._hooks[name] = d
|
_assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
|
||||||
|
_assert(isinstance(d, defer.Deferred), d=d)
|
||||||
|
|
||||||
|
self._hooks[name] = (d, ignore_count)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _call_hook(self, res, name):
|
def _call_hook(self, res, name):
|
||||||
"""
|
"""
|
||||||
Called to trigger the hook, with argument 'res'. This is a no-op if the
|
Called to trigger the hook, with argument 'res'. This is a no-op if
|
||||||
hook is unset. Otherwise, the hook will be unset, and then its Deferred
|
the hook is unset. If the hook's ignore_count is positive, it will be
|
||||||
will be fired synchronously.
|
decremented; if it was already zero, the hook will be unset, and then
|
||||||
|
its Deferred will be fired synchronously.
|
||||||
|
|
||||||
The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
|
The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
|
||||||
This ensures that if 'res' is a failure, the hook will be errbacked,
|
This ensures that if 'res' is a failure, the hook will be errbacked,
|
||||||
|
@ -106,11 +113,15 @@ class HookMixin:
|
||||||
'res' is returned so that the current result or failure will be passed
|
'res' is returned so that the current result or failure will be passed
|
||||||
through.
|
through.
|
||||||
"""
|
"""
|
||||||
d = self._hooks[name]
|
hook = self._hooks[name]
|
||||||
if d is None:
|
if hook is None:
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
self._hooks[name] = None
|
(d, ignore_count) = hook
|
||||||
_with_log(d.callback, res)
|
if ignore_count > 0:
|
||||||
|
self._hooks[name] = (d, ignore_count - 1)
|
||||||
|
else:
|
||||||
|
self._hooks[name] = None
|
||||||
|
_with_log(d.callback, res)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue