webish: implement replace= for POST commands

This commit is contained in:
Brian Warner 2007-08-15 15:21:38 -07:00
parent 1752c9e29e
commit c1da0c11bc
2 changed files with 203 additions and 7 deletions

View File

@ -707,6 +707,12 @@ class Web(WebMixin, unittest.TestCase):
"409 Conflict", "409 Conflict",
"There was already a child by that name, and you asked me " "There was already a child by that name, and you asked me "
"to not replace it") "to not replace it")
def _check(res):
self.failUnless("sub" in self._foo_node.children)
newdir_uri = self._foo_node.children["sub"]
newdir_node = self.nodes[newdir_uri]
self.failUnlessEqual(newdir_node.children.keys(), ["baz.txt"])
d.addCallback(_check)
return d return d
def test_PUT_NEWDIRURL_mkdirs(self): def test_PUT_NEWDIRURL_mkdirs(self):
@ -924,6 +930,41 @@ class Web(WebMixin, unittest.TestCase):
d.addCallback(_check) d.addCallback(_check)
return d return d
def test_POST_upload_replace(self):
d = self.POST("/vdrive/global/foo", t="upload",
file=("bar.txt", self.NEWFILE_CONTENTS))
def _check(res):
self.failUnless("bar.txt" in self._foo_node.children)
new_uri = self._foo_node.children["bar.txt"]
new_contents = self.files[new_uri]
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
self.failUnlessEqual(res.strip(), new_uri)
d.addCallback(_check)
return d
def test_POST_upload_no_replace_queryarg(self):
d = self.POST("/vdrive/global/foo?replace=false", t="upload",
file=("bar.txt", self.NEWFILE_CONTENTS))
d.addBoth(self.shouldFail, error.Error,
"POST_upload_no_replace_queryarg",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
d.addCallback(lambda res: self.GET("/vdrive/global/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_POST_upload_no_replace_field(self):
d = self.POST("/vdrive/global/foo", t="upload", replace="false",
file=("bar.txt", self.NEWFILE_CONTENTS))
d.addBoth(self.shouldFail, error.Error, "POST_upload_no_replace_field",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
d.addCallback(lambda res: self.GET("/vdrive/global/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_POST_upload_whendone(self): def test_POST_upload_whendone(self):
d = self.POST("/vdrive/global/foo", t="upload", when_done="/THERE", d = self.POST("/vdrive/global/foo", t="upload", when_done="/THERE",
file=("new.txt", self.NEWFILE_CONTENTS)) file=("new.txt", self.NEWFILE_CONTENTS))
@ -975,6 +1016,46 @@ class Web(WebMixin, unittest.TestCase):
d.addCallback(_check) d.addCallback(_check)
return d return d
def test_POST_mkdir_replace(self): # return value?
d = self.POST("/vdrive/global/foo", t="mkdir", name="sub")
def _check(res):
self.failUnless("sub" in self._foo_node.children)
newdir_uri = self._foo_node.children["sub"]
newdir_node = self.nodes[newdir_uri]
self.failIf(newdir_node.children)
d.addCallback(_check)
return d
def test_POST_mkdir_no_replace_queryarg(self): # return value?
d = self.POST("/vdrive/global/foo?replace=false", t="mkdir", name="sub")
d.addBoth(self.shouldFail, error.Error,
"POST_mkdir_no_replace_queryarg",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
def _check(res):
self.failUnless("sub" in self._foo_node.children)
newdir_uri = self._foo_node.children["sub"]
newdir_node = self.nodes[newdir_uri]
self.failUnlessEqual(newdir_node.children.keys(), ["baz.txt"])
d.addCallback(_check)
return d
def test_POST_mkdir_no_replace_field(self): # return value?
d = self.POST("/vdrive/global/foo", t="mkdir", name="sub",
replace="false")
d.addBoth(self.shouldFail, error.Error, "POST_mkdir_no_replace_field",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
def _check(res):
self.failUnless("sub" in self._foo_node.children)
newdir_uri = self._foo_node.children["sub"]
newdir_node = self.nodes[newdir_uri]
self.failUnlessEqual(newdir_node.children.keys(), ["baz.txt"])
d.addCallback(_check)
return d
def test_POST_mkdir_whendone_field(self): def test_POST_mkdir_whendone_field(self):
d = self.POST("/vdrive/global/foo", d = self.POST("/vdrive/global/foo",
t="mkdir", name="newdir", when_done="/THERE") t="mkdir", name="newdir", when_done="/THERE")
@ -1012,6 +1093,47 @@ class Web(WebMixin, unittest.TestCase):
d.addCallback(_check) d.addCallback(_check)
return d return d
def test_POST_put_uri_replace(self):
newuri = self.makefile(8)
contents = self.files[newuri]
d = self.POST("/vdrive/global/foo", t="uri", name="bar.txt", uri=newuri)
def _check(res):
self.failUnless("bar.txt" in self._foo_node.children)
new_uri = self._foo_node.children["bar.txt"]
new_contents = self.files[new_uri]
self.failUnlessEqual(new_contents, contents)
self.failUnlessEqual(res.strip(), new_uri)
d.addCallback(_check)
return d
def test_POST_put_uri_no_replace_queryarg(self):
newuri = self.makefile(8)
contents = self.files[newuri]
d = self.POST("/vdrive/global/foo?replace=false", t="uri",
name="bar.txt", uri=newuri)
d.addBoth(self.shouldFail, error.Error,
"POST_put_uri_no_replace_queryarg",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
d.addCallback(lambda res: self.GET("/vdrive/global/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_POST_put_uri_no_replace_field(self):
newuri = self.makefile(8)
contents = self.files[newuri]
d = self.POST("/vdrive/global/foo", t="uri", replace="false",
name="bar.txt", uri=newuri)
d.addBoth(self.shouldFail, error.Error,
"POST_put_uri_no_replace_field",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
d.addCallback(lambda res: self.GET("/vdrive/global/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_POST_delete(self): def test_POST_delete(self):
d = self.POST("/vdrive/global/foo", t="delete", name="bar.txt") d = self.POST("/vdrive/global/foo", t="delete", name="bar.txt")
def _check(res): def _check(res):
@ -1032,6 +1154,51 @@ class Web(WebMixin, unittest.TestCase):
d.addCallback(self.failUnlessIsBarJSON) d.addCallback(self.failUnlessIsBarJSON)
return d return d
def test_POST_rename_file_replace(self):
# rename a file and replace a directory with it
d = self.POST("/vdrive/global/foo", t="rename",
from_name="bar.txt", to_name='empty')
def _check(res):
self.failIf("bar.txt" in self._foo_node.children)
self.failUnless("empty" in self._foo_node.children)
d.addCallback(_check)
d.addCallback(lambda res: self.GET("/vdrive/global/foo/empty"))
d.addCallback(self.failUnlessIsBarDotTxt)
d.addCallback(lambda res: self.GET("/vdrive/global/foo/empty?t=json"))
d.addCallback(self.failUnlessIsBarJSON)
return d
def test_POST_rename_file_no_replace_queryarg(self):
# rename a file and replace a directory with it
d = self.POST("/vdrive/global/foo?replace=false", t="rename",
from_name="bar.txt", to_name='empty')
d.addBoth(self.shouldFail, error.Error,
"POST_rename_file_no_replace_queryarg",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
d.addCallback(lambda res: self.GET("/vdrive/global/foo/empty?t=json"))
d.addCallback(self.failUnlessIsEmptyJSON)
return d
def test_POST_rename_file_no_replace_field(self):
# rename a file and replace a directory with it
d = self.POST("/vdrive/global/foo", t="rename", replace="false",
from_name="bar.txt", to_name='empty')
d.addBoth(self.shouldFail, error.Error,
"POST_rename_file_no_replace_field",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
d.addCallback(lambda res: self.GET("/vdrive/global/foo/empty?t=json"))
d.addCallback(self.failUnlessIsEmptyJSON)
return d
def failUnlessIsEmptyJSON(self, res):
data = self.worlds_cheapest_json_decoder(res)
self.failUnlessEqual(data[0], "dirnode")
self.failUnlessEqual(len(data[1]["children"]), 0)
def test_POST_rename_file_slash_fail(self): def test_POST_rename_file_slash_fail(self):
d = self.POST("/vdrive/global/foo", t="rename", d = self.POST("/vdrive/global/foo", t="rename",
from_name="bar.txt", to_name='kirk/spock.txt') from_name="bar.txt", to_name='kirk/spock.txt')

View File

@ -555,6 +555,19 @@ class POSTHandler(rend.Page):
self._node = node self._node = node
self._replace = replace self._replace = replace
def _check_replacement(self, name):
if self._replace:
return defer.succeed(None)
d = self._node.has_child(name)
def _got(present):
if present:
raise NoReplacementError("There was already a child by that "
"name, and you asked me to not "
"replace it.")
return None
d.addCallback(_got)
return d
def renderHTTP(self, ctx): def renderHTTP(self, ctx):
req = inevow.IRequest(ctx) req = inevow.IRequest(ctx)
@ -579,10 +592,15 @@ class POSTHandler(rend.Page):
if "when_done" in req.fields: if "when_done" in req.fields:
when_done = req.fields["when_done"].value when_done = req.fields["when_done"].value
if "replace" in req.fields:
if req.fields["replace"].value.lower() in ("false", "0"):
self._replace = False
if t == "mkdir": if t == "mkdir":
if not name: if not name:
raise RuntimeError("mkdir requires a name") raise RuntimeError("mkdir requires a name")
d = self._node.create_empty_directory(name) d = self._check_replacement(name)
d.addCallback(lambda res: self._node.create_empty_directory(name))
def _done(res): def _done(res):
return "directory created" return "directory created"
d.addCallback(_done) d.addCallback(_done)
@ -593,7 +611,8 @@ class POSTHandler(rend.Page):
newuri = req.args["uri"][0] newuri = req.args["uri"][0]
else: else:
newuri = req.fields["uri"].value newuri = req.fields["uri"].value
d = self._node.set_uri(name, newuri) d = self._check_replacement(name)
d.addCallback(lambda res: self._node.set_uri(name, newuri))
def _done(res): def _done(res):
return newuri return newuri
d.addCallback(_done) d.addCallback(_done)
@ -616,7 +635,8 @@ class POSTHandler(rend.Page):
req.setResponseCode(http.BAD_REQUEST) req.setResponseCode(http.BAD_REQUEST)
req.setHeader("content-type", "text/plain") req.setHeader("content-type", "text/plain")
return "%s= may not contain a slash" % (k,) return "%s= may not contain a slash" % (k,)
d = self._node.get(from_name) d = self._check_replacement(to_name)
d.addCallback(lambda res: self._node.get(from_name))
def add_dest(child): def add_dest(child):
uri = child.get_uri() uri = child.get_uri()
# now actually do the rename # now actually do the rename
@ -632,7 +652,8 @@ class POSTHandler(rend.Page):
contents = req.fields["file"] contents = req.fields["file"]
name = name or contents.filename name = name or contents.filename
uploadable = upload.FileHandle(contents.file) uploadable = upload.FileHandle(contents.file)
d = self._node.add_file(name, uploadable) d = self._check_replacement(name)
d.addCallback(lambda res: self._node.add_file(name, uploadable))
def _done(newnode): def _done(newnode):
return newnode.get_uri() return newnode.get_uri()
d.addCallback(_done) d.addCallback(_done)
@ -641,6 +662,17 @@ class POSTHandler(rend.Page):
return "BAD t=%s" % t return "BAD t=%s" % t
if when_done: if when_done:
d.addCallback(lambda res: url.URL.fromString(when_done)) d.addCallback(lambda res: url.URL.fromString(when_done))
def _check_replacement(f):
# TODO: make this more human-friendly: maybe send them to the
# when_done page but with an extra query-arg that will display
# the error message in a big box at the top of the page. The
# directory page that when_done= usually points to accepts a
# result= argument.. use that.
f.trap(NoReplacementError)
req.setResponseCode(http.CONFLICT)
req.setHeader("content-type", "text/plain")
return str(f.value)
d.addErrback(_check_replacement)
return d return d
class DELETEHandler(rend.Page): class DELETEHandler(rend.Page):
@ -877,9 +909,6 @@ class VDrive(rend.Page):
# TODO: think about clobbering/revealing config files and node secrets # TODO: think about clobbering/revealing config files and node secrets
replace = True replace = True
# if "replace" in req.fields:
# if req.fields["replace"].value.lower() in ("false", "0"):
# replace = False
if "replace" in req.args: if "replace" in req.args:
if req.args["replace"][0].lower() in ("false", "0"): if req.args["replace"][0].lower() in ("false", "0"):
replace = False replace = False