Correct behavior on timed out immutable uploads.
This commit is contained in:
parent
92b952a5fe
commit
f47741afb1
|
@ -2,19 +2,7 @@
|
||||||
HTTP server for storage.
|
HTTP server for storage.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import absolute_import
|
from typing import Dict, List, Set, Tuple
|
||||||
from __future__ import division
|
|
||||||
from __future__ import print_function
|
|
||||||
from __future__ import unicode_literals
|
|
||||||
|
|
||||||
from future.utils import PY2
|
|
||||||
|
|
||||||
if PY2:
|
|
||||||
# fmt: off
|
|
||||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
|
||||||
# fmt: on
|
|
||||||
else:
|
|
||||||
from typing import Dict, List, Set
|
|
||||||
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
|
@ -148,6 +136,9 @@ class UploadsInProgress(object):
|
||||||
# Map storage index to corresponding uploads-in-progress
|
# Map storage index to corresponding uploads-in-progress
|
||||||
_uploads = attr.ib(type=Dict[bytes, StorageIndexUploads], factory=dict)
|
_uploads = attr.ib(type=Dict[bytes, StorageIndexUploads], factory=dict)
|
||||||
|
|
||||||
|
# Map BucketWriter to (storage index, share number)
|
||||||
|
_bucketwriters = attr.ib(type=Dict[BucketWriter, Tuple[bytes, int]], factory=dict)
|
||||||
|
|
||||||
def add_write_bucket(
|
def add_write_bucket(
|
||||||
self,
|
self,
|
||||||
storage_index: bytes,
|
storage_index: bytes,
|
||||||
|
@ -155,13 +146,11 @@ class UploadsInProgress(object):
|
||||||
upload_secret: bytes,
|
upload_secret: bytes,
|
||||||
bucket: BucketWriter,
|
bucket: BucketWriter,
|
||||||
):
|
):
|
||||||
"""Add a new ``BucketWriter`` to be tracked.
|
"""Add a new ``BucketWriter`` to be tracked."""
|
||||||
|
|
||||||
TODO 3877 how does a timed-out BucketWriter get removed?!
|
|
||||||
"""
|
|
||||||
si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
|
si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
|
||||||
si_uploads.shares[share_number] = bucket
|
si_uploads.shares[share_number] = bucket
|
||||||
si_uploads.upload_secrets[share_number] = upload_secret
|
si_uploads.upload_secrets[share_number] = upload_secret
|
||||||
|
self._bucketwriters[bucket] = (storage_index, share_number)
|
||||||
|
|
||||||
def get_write_bucket(
|
def get_write_bucket(
|
||||||
self, storage_index: bytes, share_number: int, upload_secret: bytes
|
self, storage_index: bytes, share_number: int, upload_secret: bytes
|
||||||
|
@ -173,8 +162,9 @@ class UploadsInProgress(object):
|
||||||
except (KeyError, IndexError):
|
except (KeyError, IndexError):
|
||||||
raise _HTTPError(http.NOT_FOUND)
|
raise _HTTPError(http.NOT_FOUND)
|
||||||
|
|
||||||
def remove_write_bucket(self, storage_index: bytes, share_number: int):
|
def remove_write_bucket(self, bucket: BucketWriter):
|
||||||
"""Stop tracking the given ``BucketWriter``."""
|
"""Stop tracking the given ``BucketWriter``."""
|
||||||
|
storage_index, share_number = self._bucketwriters.pop(bucket)
|
||||||
uploads_index = self._uploads[storage_index]
|
uploads_index = self._uploads[storage_index]
|
||||||
uploads_index.shares.pop(share_number)
|
uploads_index.shares.pop(share_number)
|
||||||
uploads_index.upload_secrets.pop(share_number)
|
uploads_index.upload_secrets.pop(share_number)
|
||||||
|
@ -246,6 +236,12 @@ class HTTPServer(object):
|
||||||
# Maps storage index to StorageIndexUploads:
|
# Maps storage index to StorageIndexUploads:
|
||||||
self._uploads = UploadsInProgress()
|
self._uploads = UploadsInProgress()
|
||||||
|
|
||||||
|
# When an upload finishes successfully, gets aborted, or times out,
|
||||||
|
# make sure it gets removed from our tracking datastructure:
|
||||||
|
self._storage_server.register_bucket_writer_close_handler(
|
||||||
|
self._uploads.remove_write_bucket
|
||||||
|
)
|
||||||
|
|
||||||
def get_resource(self):
|
def get_resource(self):
|
||||||
"""Return twisted.web ``Resource`` for this object."""
|
"""Return twisted.web ``Resource`` for this object."""
|
||||||
return self._app.resource()
|
return self._app.resource()
|
||||||
|
@ -322,11 +318,9 @@ class HTTPServer(object):
|
||||||
|
|
||||||
# TODO 3877 test for checking upload secret
|
# TODO 3877 test for checking upload secret
|
||||||
|
|
||||||
# Abort the upload:
|
# Abort the upload; this should close it which will eventually result
|
||||||
|
# in self._uploads.remove_write_bucket() being called.
|
||||||
bucket.abort()
|
bucket.abort()
|
||||||
# Stop tracking the bucket, so we can create a new one later if a
|
|
||||||
# client requests it:
|
|
||||||
self._uploads.remove_write_bucket(storage_index, share_number)
|
|
||||||
|
|
||||||
return b""
|
return b""
|
||||||
|
|
||||||
|
|
|
@ -810,3 +810,50 @@ class ImmutableHTTPAPITests(SyncTestCase):
|
||||||
check_range("bytes=0-10", "bytes 0-10/*")
|
check_range("bytes=0-10", "bytes 0-10/*")
|
||||||
# Can't go beyond the end of the immutable!
|
# Can't go beyond the end of the immutable!
|
||||||
check_range("bytes=10-100", "bytes 10-25/*")
|
check_range("bytes=10-100", "bytes 10-25/*")
|
||||||
|
|
||||||
|
def test_timed_out_upload_allows_reupload(self):
|
||||||
|
"""
|
||||||
|
If an in-progress upload times out, it is cancelled, allowing a new
|
||||||
|
upload to occur.
|
||||||
|
"""
|
||||||
|
# Start an upload:
|
||||||
|
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
|
||||||
|
result_of(
|
||||||
|
self.imm_client.write_share_chunk(
|
||||||
|
storage_index,
|
||||||
|
1,
|
||||||
|
upload_secret,
|
||||||
|
0,
|
||||||
|
b"123",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now, time passes, the in-progress upload should disappear...
|
||||||
|
self.http.clock.advance(30 * 60 + 1)
|
||||||
|
|
||||||
|
# Now we can create a new share with the same storage index without
|
||||||
|
# complaint:
|
||||||
|
upload_secret = urandom(32)
|
||||||
|
lease_secret = urandom(32)
|
||||||
|
created = result_of(
|
||||||
|
self.imm_client.create(
|
||||||
|
storage_index,
|
||||||
|
{1},
|
||||||
|
100,
|
||||||
|
upload_secret,
|
||||||
|
lease_secret,
|
||||||
|
lease_secret,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertEqual(created.allocated, {1})
|
||||||
|
|
||||||
|
# And write to it, too:
|
||||||
|
result_of(
|
||||||
|
self.imm_client.write_share_chunk(
|
||||||
|
storage_index,
|
||||||
|
1,
|
||||||
|
upload_secret,
|
||||||
|
0,
|
||||||
|
b"ABC",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue