Convert _process_deque

This commit is contained in:
Jean-Paul Calderone 2019-02-25 14:43:52 -05:00
parent 2b9e6784ab
commit 83c4056a5d
1 changed files with 34 additions and 20 deletions

View File

@ -460,6 +460,19 @@ PERFORM_SCAN = ActionType(
u"Remote storage is being scanned for changes which need to be synchronized.", u"Remote storage is being scanned for changes which need to be synchronized.",
) )
_COUNT = Field.for_types(
u"count",
[int, long],
u"The number of items in the processing queue.",
)
PROCESS_QUEUE = ActionType(
u"magic-folder:process-queue",
[_COUNT],
[],
u"A Magic-Folder is working through an item queue.",
)
SCAN_REMOTE_COLLECTIVE = ActionType( SCAN_REMOTE_COLLECTIVE = ActionType(
u"magic-folder:scan-remote-collective", u"magic-folder:scan-remote-collective",
[], [],
@ -713,7 +726,7 @@ class QueueMixin(HookMixin):
def _perform_scan(self): def _perform_scan(self):
return return
@defer.inlineCallbacks @eliotutil.inline_callbacks
def _process_deque(self): def _process_deque(self):
# process everything currently in the queue. we're turning it # process everything currently in the queue. we're turning it
# into a list so that if any new items get added while we're # into a list so that if any new items get added while we're
@ -729,24 +742,21 @@ class QueueMixin(HookMixin):
# completed) # completed)
self._in_progress.extend(to_process) self._in_progress.extend(to_process)
if to_process: with PROCESS_QUEUE(count=len(to_process)):
self._log("%d items to process" % len(to_process), ) for item in to_process:
for item in to_process: self._process_history.appendleft(item)
self._process_history.appendleft(item) self._in_progress.remove(item)
self._in_progress.remove(item) try:
try: proc = yield self._process(item)
self._log(" processing '%r'" % (item,)) if not proc:
proc = yield self._process(item) self._process_history.remove(item)
self._log(" done: %r" % proc) self._call_hook(item, 'item_processed')
if not proc: except:
self._process_history.remove(item) write_traceback()
self._call_hook(item, 'item_processed') item.set_status('failed', self._clock.seconds())
except Exception as e: proc = Failure()
log.err("processing '%r' failed: %s" % (item, e))
item.set_status('failed', self._clock.seconds())
proc = Failure()
self._call_hook(proc, 'processed') self._call_hook(proc, 'processed')
def _get_relpath(self, filepath): def _get_relpath(self, filepath):
self._log("_get_relpath(%r)" % (filepath,)) self._log("_get_relpath(%r)" % (filepath,))
@ -1009,8 +1019,8 @@ class Uploader(QueueMixin):
def _process(self, item): def _process(self, item):
""" """
process a single QueuedItem. If this returns False, the item is Possibly upload a single QueuedItem. If this returns False, the item is
removed from _process_history removed from _process_history.
""" """
with PROCESS_ITEM(item=item).context(): with PROCESS_ITEM(item=item).context():
d = DeferredContext(defer.succeed(False)) d = DeferredContext(defer.succeed(False))
@ -1542,6 +1552,10 @@ class Downloader(QueueMixin, WriteFileMixin):
) )
def _process(self, item): def _process(self, item):
"""
Possibly upload a single QueuedItem. If this returns False, the item is
removed from _process_history.
"""
# Downloader # Downloader
now = self._clock.seconds() now = self._clock.seconds()