workqueue: more improvements, more tests

This commit is contained in:
Brian Warner 2007-01-08 22:29:42 -07:00
parent b9edb02820
commit b641f6cbc7
2 changed files with 94 additions and 3 deletions

View File

@ -1,6 +1,7 @@
import os import os
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer
from allmydata import workqueue from allmydata import workqueue
from allmydata.util import idlib from allmydata.util import idlib
@ -11,11 +12,13 @@ class FakeWorkQueue(workqueue.WorkQueue):
self.dispatched_steps = [] self.dispatched_steps = []
def dispatch_step(self, steptype, lines): def dispatch_step(self, steptype, lines):
self.dispatched_steps.append(steptype, lines) self.dispatched_steps.append((steptype, lines))
return defer.succeed(None)
class Items(unittest.TestCase): class Items(unittest.TestCase):
def wq(self, testname): def wq(self, testname):
return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname) return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname)
def testTempfile(self): def testTempfile(self):
wq = self.wq("testTempfile") wq = self.wq("testTempfile")
(f, filename) = wq.create_tempfile(".chkdir") (f, filename) = wq.create_tempfile(".chkdir")
@ -51,3 +54,67 @@ class Items(unittest.TestCase):
self.failUnlessEqual(steps[4], ("unlink_uri", self.failUnlessEqual(steps[4], ("unlink_uri",
["olduri"])) ["olduri"]))
def testCHK2(self):
wq = self.wq("testCHK2")
wq.add_upload_chk("source_filename", "box1")
wq.add_retain_uri_from_box("box1")
wq.add_addpath("box1", ["home", "warner", "foo.txt"])
wq.add_delete_box("box1")
wq.add_unlink_uri("olduri")
# then this batch happens a bit later
(f, tmpfilename) = wq.create_tempfile(".chkdir")
f.write("some data")
f.close()
wq.add_upload_chk(os.path.join(wq.filesdir, tmpfilename), "box2")
wq.add_delete_tempfile(tmpfilename)
wq.add_retain_uri_from_box("box2")
wq.add_delete_box("box2")
wq.add_unlink_uri("oldchk")
self.failUnlessEqual(wq.count_pending_steps(), 10)
steps = wq.get_all_steps()
self.failUnlessEqual(steps[0], ("upload_chk",
["source_filename", "box1"]))
self.failUnlessEqual(steps[1], ("retain_uri_from_box",
["box1"]))
self.failUnlessEqual(steps[2], ("addpath",
["box1", "home", "warner", "foo.txt"]))
self.failUnlessEqual(steps[3], ("delete_box",
["box1"]))
self.failUnlessEqual(steps[4],
("upload_chk",
[os.path.join(wq.filesdir, tmpfilename),
"box2"]))
self.failUnlessEqual(steps[5],
("delete_tempfile", [tmpfilename]))
self.failUnlessEqual(steps[6],
("retain_uri_from_box", ["box2"]))
self.failUnlessEqual(steps[7], ("delete_box", ["box2"]))
self.failUnlessEqual(steps[8], ("unlink_uri",
["olduri"]))
self.failUnlessEqual(steps[9], ("unlink_uri", ["oldchk"]))
def testRun(self):
wq = self.wq("testRun")
wq.add_upload_chk("source_filename", "box1")
wq.add_retain_uri_from_box("box1")
wq.add_addpath("box1", ["home", "warner", "foo.txt"])
wq.add_delete_box("box1")
wq.add_unlink_uri("olduri")
# this tempfile should be deleted after the last step completes
(f, tmpfilename) = wq.create_tempfile(".dummy")
tmpfilename = os.path.join(wq.filesdir, tmpfilename)
f.write("stuff")
f.close()
self.failUnless(os.path.exists(tmpfilename))
d = wq.run_all_steps()
def _check(res):
self.failUnlessEqual(len(wq.dispatched_steps), 5)
self.failUnlessEqual(wq.dispatched_steps[0][0], "upload_chk")
self.failIf(os.path.exists(tmpfilename))
d.addCallback(_check)
return d

View File

@ -258,8 +258,22 @@ class WorkQueue(object):
d = self.dispatch_step(steptype, lines) d = self.dispatch_step(steptype, lines)
d.addCallback(self._delete_step, stepname) d.addCallback(self._delete_step, stepname)
return d return d
# no steps pending, it is safe to clean out leftover files
self._clean_leftover_files()
return None return None
def _clean_leftover_files(self):
# there are no steps pending, therefore any leftover files in our
# filesdir are orphaned and can be deleted. This catches things like
# a tempfile being created but the application gets interrupted
# before the upload step which references it gets created, or if an
# upload step gets written but the remaining sequence (addpath,
# delete_box) does not.
for n in os.listdir(self.filesdir):
os.unlink(os.path.join(self.filesdir, n))
for n in os.listdir(self.boxesdir):
os.unlink(os.path.join(self.boxesdir, n))
def get_next_step(self): def get_next_step(self):
stepnames = [n for n in os.listdir(self.basedir) stepnames = [n for n in os.listdir(self.basedir)
if n.startswith("step-")] if n.startswith("step-")]
@ -296,11 +310,21 @@ class WorkQueue(object):
if n.startswith("step-")]) if n.startswith("step-")])
def get_all_steps(self): def get_all_steps(self):
# returns a list of (steptype, lines) for all steps # returns a list of (steptype, lines) for all steps
steps = [] stepnames = []
for stepname in os.listdir(self.basedir): for stepname in os.listdir(self.basedir):
if stepname.startswith("step-"): if stepname.startswith("step-"):
stepnames.append(stepname)
stepnames.sort()
steps = []
for stepname in stepnames:
steps.append(self._get_step(stepname)[1:]) steps.append(self._get_step(stepname)[1:])
return steps return steps
def run_all_steps(self, ignored=None):
d = self.run_next_step()
if d:
d.addCallback(self.run_all_steps)
return d
return defer.succeed(None)
def open_tempfile(self, filename): def open_tempfile(self, filename):