Merge pull request #1193 from tahoe-lafs/3890-mutable-http-api
Mutable storage HTTP API, part 1 Fixes ticket:3890
This commit is contained in:
commit
be2c6b09f3
|
@ -48,8 +48,6 @@ workflows:
|
||||||
{}
|
{}
|
||||||
- "pyinstaller":
|
- "pyinstaller":
|
||||||
{}
|
{}
|
||||||
- "deprecations":
|
|
||||||
{}
|
|
||||||
- "c-locale":
|
- "c-locale":
|
||||||
{}
|
{}
|
||||||
# Any locale other than C or UTF-8.
|
# Any locale other than C or UTF-8.
|
||||||
|
@ -297,20 +295,6 @@ jobs:
|
||||||
# aka "Latin 1"
|
# aka "Latin 1"
|
||||||
LANG: "en_US.ISO-8859-1"
|
LANG: "en_US.ISO-8859-1"
|
||||||
|
|
||||||
|
|
||||||
deprecations:
|
|
||||||
<<: *DEBIAN
|
|
||||||
|
|
||||||
environment:
|
|
||||||
<<: *UTF_8_ENVIRONMENT
|
|
||||||
# Select the deprecations tox environments.
|
|
||||||
TAHOE_LAFS_TOX_ENVIRONMENT: "deprecations,upcoming-deprecations"
|
|
||||||
# Put the logs somewhere we can report them.
|
|
||||||
TAHOE_LAFS_WARNINGS_LOG: "/tmp/artifacts/deprecation-warnings.log"
|
|
||||||
# The deprecations tox environments don't do coverage measurement.
|
|
||||||
UPLOAD_COVERAGE: ""
|
|
||||||
|
|
||||||
|
|
||||||
integration:
|
integration:
|
||||||
<<: *DEBIAN
|
<<: *DEBIAN
|
||||||
|
|
||||||
|
|
|
@ -743,11 +743,15 @@ For example::
|
||||||
|
|
||||||
[1, 5]
|
[1, 5]
|
||||||
|
|
||||||
``GET /v1/mutable/:storage_index?share=:s0&share=:sN&offset=:o1&size=:z0&offset=:oN&size=:zN``
|
``GET /v1/mutable/:storage_index/:share_number``
|
||||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||||
|
|
||||||
Read data from the indicated mutable shares.
|
Read data from the indicated mutable shares, just like ``GET /v1/immutable/:storage_index``
|
||||||
Just like ``GET /v1/mutable/:storage_index``.
|
|
||||||
|
The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content).
|
||||||
|
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
|
||||||
|
Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported.
|
||||||
|
|
||||||
|
|
||||||
``POST /v1/mutable/:storage_index/:share_number/corrupt``
|
``POST /v1/mutable/:storage_index/:share_number/corrupt``
|
||||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||||
|
|
|
@ -4,11 +4,10 @@ HTTP client that talks to the HTTP storage server.
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Union, Set, Optional
|
from typing import Union, Optional, Sequence, Mapping
|
||||||
|
|
||||||
from base64 import b64encode
|
from base64 import b64encode
|
||||||
|
|
||||||
import attr
|
from attrs import define, asdict, frozen
|
||||||
|
|
||||||
# TODO Make sure to import Python version?
|
# TODO Make sure to import Python version?
|
||||||
from cbor2 import loads, dumps
|
from cbor2 import loads, dumps
|
||||||
|
@ -39,6 +38,7 @@ from .http_common import (
|
||||||
)
|
)
|
||||||
from .common import si_b2a
|
from .common import si_b2a
|
||||||
from ..util.hashutil import timing_safe_compare
|
from ..util.hashutil import timing_safe_compare
|
||||||
|
from ..util.deferredutil import async_to_deferred
|
||||||
|
|
||||||
_OPENSSL = Binding().lib
|
_OPENSSL = Binding().lib
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ class ClientException(Exception):
|
||||||
_SCHEMAS = {
|
_SCHEMAS = {
|
||||||
"get_version": Schema(
|
"get_version": Schema(
|
||||||
"""
|
"""
|
||||||
message = {'http://allmydata.org/tahoe/protocols/storage/v1' => {
|
response = {'http://allmydata.org/tahoe/protocols/storage/v1' => {
|
||||||
'maximum-immutable-share-size' => uint
|
'maximum-immutable-share-size' => uint
|
||||||
'maximum-mutable-share-size' => uint
|
'maximum-mutable-share-size' => uint
|
||||||
'available-space' => uint
|
'available-space' => uint
|
||||||
|
@ -79,7 +79,7 @@ _SCHEMAS = {
|
||||||
),
|
),
|
||||||
"allocate_buckets": Schema(
|
"allocate_buckets": Schema(
|
||||||
"""
|
"""
|
||||||
message = {
|
response = {
|
||||||
already-have: #6.258([* uint])
|
already-have: #6.258([* uint])
|
||||||
allocated: #6.258([* uint])
|
allocated: #6.258([* uint])
|
||||||
}
|
}
|
||||||
|
@ -87,16 +87,25 @@ _SCHEMAS = {
|
||||||
),
|
),
|
||||||
"immutable_write_share_chunk": Schema(
|
"immutable_write_share_chunk": Schema(
|
||||||
"""
|
"""
|
||||||
message = {
|
response = {
|
||||||
required: [* {begin: uint, end: uint}]
|
required: [* {begin: uint, end: uint}]
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
),
|
),
|
||||||
"list_shares": Schema(
|
"list_shares": Schema(
|
||||||
"""
|
"""
|
||||||
message = #6.258([* uint])
|
response = #6.258([* uint])
|
||||||
"""
|
"""
|
||||||
),
|
),
|
||||||
|
"mutable_read_test_write": Schema(
|
||||||
|
"""
|
||||||
|
response = {
|
||||||
|
"success": bool,
|
||||||
|
"data": {* share_number: [* bstr]}
|
||||||
|
}
|
||||||
|
share_number = uint
|
||||||
|
"""
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -121,12 +130,12 @@ def _decode_cbor(response, schema: Schema):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@define
|
||||||
class ImmutableCreateResult(object):
|
class ImmutableCreateResult(object):
|
||||||
"""Result of creating a storage index for an immutable."""
|
"""Result of creating a storage index for an immutable."""
|
||||||
|
|
||||||
already_have = attr.ib(type=Set[int])
|
already_have: set[int]
|
||||||
allocated = attr.ib(type=Set[int])
|
allocated: set[int]
|
||||||
|
|
||||||
|
|
||||||
class _TLSContextFactory(CertificateOptions):
|
class _TLSContextFactory(CertificateOptions):
|
||||||
|
@ -200,14 +209,14 @@ class _TLSContextFactory(CertificateOptions):
|
||||||
|
|
||||||
@implementer(IPolicyForHTTPS)
|
@implementer(IPolicyForHTTPS)
|
||||||
@implementer(IOpenSSLClientConnectionCreator)
|
@implementer(IOpenSSLClientConnectionCreator)
|
||||||
@attr.s
|
@define
|
||||||
class _StorageClientHTTPSPolicy:
|
class _StorageClientHTTPSPolicy:
|
||||||
"""
|
"""
|
||||||
A HTTPS policy that ensures the SPKI hash of the public key matches a known
|
A HTTPS policy that ensures the SPKI hash of the public key matches a known
|
||||||
hash, i.e. pinning-based validation.
|
hash, i.e. pinning-based validation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
expected_spki_hash = attr.ib(type=bytes)
|
expected_spki_hash: bytes
|
||||||
|
|
||||||
# IPolicyForHTTPS
|
# IPolicyForHTTPS
|
||||||
def creatorForNetloc(self, hostname, port):
|
def creatorForNetloc(self, hostname, port):
|
||||||
|
@ -220,24 +229,22 @@ class _StorageClientHTTPSPolicy:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@define
|
||||||
class StorageClient(object):
|
class StorageClient(object):
|
||||||
"""
|
"""
|
||||||
Low-level HTTP client that talks to the HTTP storage server.
|
Low-level HTTP client that talks to the HTTP storage server.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(
|
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
|
||||||
self, url, swissnum, treq=treq
|
# ``StorageClient.from_nurl()``.
|
||||||
): # type: (DecodedURL, bytes, Union[treq,StubTreq,HTTPClient]) -> None
|
_base_url: DecodedURL
|
||||||
"""
|
_swissnum: bytes
|
||||||
The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
|
_treq: Union[treq, StubTreq, HTTPClient]
|
||||||
``StorageClient.from_nurl()``.
|
|
||||||
"""
|
|
||||||
self._base_url = url
|
|
||||||
self._swissnum = swissnum
|
|
||||||
self._treq = treq
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_nurl(cls, nurl: DecodedURL, reactor, persistent: bool = True) -> StorageClient:
|
def from_nurl(
|
||||||
|
cls, nurl: DecodedURL, reactor, persistent: bool = True
|
||||||
|
) -> StorageClient:
|
||||||
"""
|
"""
|
||||||
Create a ``StorageClient`` for the given NURL.
|
Create a ``StorageClient`` for the given NURL.
|
||||||
|
|
||||||
|
@ -280,6 +287,7 @@ class StorageClient(object):
|
||||||
lease_renew_secret=None,
|
lease_renew_secret=None,
|
||||||
lease_cancel_secret=None,
|
lease_cancel_secret=None,
|
||||||
upload_secret=None,
|
upload_secret=None,
|
||||||
|
write_enabler_secret=None,
|
||||||
headers=None,
|
headers=None,
|
||||||
message_to_serialize=None,
|
message_to_serialize=None,
|
||||||
**kwargs
|
**kwargs
|
||||||
|
@ -298,6 +306,7 @@ class StorageClient(object):
|
||||||
(Secrets.LEASE_RENEW, lease_renew_secret),
|
(Secrets.LEASE_RENEW, lease_renew_secret),
|
||||||
(Secrets.LEASE_CANCEL, lease_cancel_secret),
|
(Secrets.LEASE_CANCEL, lease_cancel_secret),
|
||||||
(Secrets.UPLOAD, upload_secret),
|
(Secrets.UPLOAD, upload_secret),
|
||||||
|
(Secrets.WRITE_ENABLER, write_enabler_secret),
|
||||||
]:
|
]:
|
||||||
if value is None:
|
if value is None:
|
||||||
continue
|
continue
|
||||||
|
@ -342,25 +351,65 @@ class StorageClientGeneral(object):
|
||||||
returnValue(decoded_response)
|
returnValue(decoded_response)
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@define
|
||||||
class UploadProgress(object):
|
class UploadProgress(object):
|
||||||
"""
|
"""
|
||||||
Progress of immutable upload, per the server.
|
Progress of immutable upload, per the server.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# True when upload has finished.
|
# True when upload has finished.
|
||||||
finished = attr.ib(type=bool)
|
finished: bool
|
||||||
# Remaining ranges to upload.
|
# Remaining ranges to upload.
|
||||||
required = attr.ib(type=RangeMap)
|
required: RangeMap
|
||||||
|
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def read_share_chunk(
|
||||||
|
client: StorageClient,
|
||||||
|
share_type: str,
|
||||||
|
storage_index: bytes,
|
||||||
|
share_number: int,
|
||||||
|
offset: int,
|
||||||
|
length: int,
|
||||||
|
) -> Deferred[bytes]:
|
||||||
|
"""
|
||||||
|
Download a chunk of data from a share.
|
||||||
|
|
||||||
|
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
|
||||||
|
downloads should be transparently retried and redownloaded by the
|
||||||
|
implementation a few times so that if a failure percolates up, the
|
||||||
|
caller can assume the failure isn't a short-term blip.
|
||||||
|
|
||||||
|
NOTE: the underlying HTTP protocol is much more flexible than this API,
|
||||||
|
so a future refactor may expand this in order to simplify the calling
|
||||||
|
code and perhaps download data more efficiently. But then again maybe
|
||||||
|
the HTTP protocol will be simplified, see
|
||||||
|
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
|
||||||
|
"""
|
||||||
|
url = client.relative_url(
|
||||||
|
"/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number)
|
||||||
|
)
|
||||||
|
response = yield client.request(
|
||||||
|
"GET",
|
||||||
|
url,
|
||||||
|
headers=Headers(
|
||||||
|
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
|
||||||
|
),
|
||||||
|
)
|
||||||
|
if response.code == http.PARTIAL_CONTENT:
|
||||||
|
body = yield response.content()
|
||||||
|
returnValue(body)
|
||||||
|
else:
|
||||||
|
raise ClientException(response.code)
|
||||||
|
|
||||||
|
|
||||||
|
@define
|
||||||
class StorageClientImmutables(object):
|
class StorageClientImmutables(object):
|
||||||
"""
|
"""
|
||||||
APIs for interacting with immutables.
|
APIs for interacting with immutables.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, client: StorageClient):
|
_client: StorageClient
|
||||||
self._client = client
|
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def create(
|
def create(
|
||||||
|
@ -371,7 +420,7 @@ class StorageClientImmutables(object):
|
||||||
upload_secret,
|
upload_secret,
|
||||||
lease_renew_secret,
|
lease_renew_secret,
|
||||||
lease_cancel_secret,
|
lease_cancel_secret,
|
||||||
): # type: (bytes, Set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult]
|
): # type: (bytes, set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult]
|
||||||
"""
|
"""
|
||||||
Create a new storage index for an immutable.
|
Create a new storage index for an immutable.
|
||||||
|
|
||||||
|
@ -474,42 +523,18 @@ class StorageClientImmutables(object):
|
||||||
remaining.set(True, chunk["begin"], chunk["end"])
|
remaining.set(True, chunk["begin"], chunk["end"])
|
||||||
returnValue(UploadProgress(finished=finished, required=remaining))
|
returnValue(UploadProgress(finished=finished, required=remaining))
|
||||||
|
|
||||||
@inlineCallbacks
|
|
||||||
def read_share_chunk(
|
def read_share_chunk(
|
||||||
self, storage_index, share_number, offset, length
|
self, storage_index, share_number, offset, length
|
||||||
): # type: (bytes, int, int, int) -> Deferred[bytes]
|
): # type: (bytes, int, int, int) -> Deferred[bytes]
|
||||||
"""
|
"""
|
||||||
Download a chunk of data from a share.
|
Download a chunk of data from a share.
|
||||||
|
|
||||||
TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed
|
|
||||||
downloads should be transparently retried and redownloaded by the
|
|
||||||
implementation a few times so that if a failure percolates up, the
|
|
||||||
caller can assume the failure isn't a short-term blip.
|
|
||||||
|
|
||||||
NOTE: the underlying HTTP protocol is much more flexible than this API,
|
|
||||||
so a future refactor may expand this in order to simplify the calling
|
|
||||||
code and perhaps download data more efficiently. But then again maybe
|
|
||||||
the HTTP protocol will be simplified, see
|
|
||||||
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
|
|
||||||
"""
|
"""
|
||||||
url = self._client.relative_url(
|
return read_share_chunk(
|
||||||
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
|
self._client, "immutable", storage_index, share_number, offset, length
|
||||||
)
|
)
|
||||||
response = yield self._client.request(
|
|
||||||
"GET",
|
|
||||||
url,
|
|
||||||
headers=Headers(
|
|
||||||
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
|
|
||||||
),
|
|
||||||
)
|
|
||||||
if response.code == http.PARTIAL_CONTENT:
|
|
||||||
body = yield response.content()
|
|
||||||
returnValue(body)
|
|
||||||
else:
|
|
||||||
raise ClientException(response.code)
|
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]]
|
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[set[int]]
|
||||||
"""
|
"""
|
||||||
Return the set of shares for a given storage index.
|
Return the set of shares for a given storage index.
|
||||||
"""
|
"""
|
||||||
|
@ -573,3 +598,125 @@ class StorageClientImmutables(object):
|
||||||
raise ClientException(
|
raise ClientException(
|
||||||
response.code,
|
response.code,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@frozen
|
||||||
|
class WriteVector:
|
||||||
|
"""Data to write to a chunk."""
|
||||||
|
|
||||||
|
offset: int
|
||||||
|
data: bytes
|
||||||
|
|
||||||
|
|
||||||
|
@frozen
|
||||||
|
class TestVector:
|
||||||
|
"""Checks to make on a chunk before writing to it."""
|
||||||
|
|
||||||
|
offset: int
|
||||||
|
size: int
|
||||||
|
specimen: bytes
|
||||||
|
|
||||||
|
|
||||||
|
@frozen
|
||||||
|
class ReadVector:
|
||||||
|
"""
|
||||||
|
Reads to do on chunks, as part of a read/test/write operation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
offset: int
|
||||||
|
size: int
|
||||||
|
|
||||||
|
|
||||||
|
@frozen
|
||||||
|
class TestWriteVectors:
|
||||||
|
"""Test and write vectors for a specific share."""
|
||||||
|
|
||||||
|
test_vectors: Sequence[TestVector]
|
||||||
|
write_vectors: Sequence[WriteVector]
|
||||||
|
new_length: Optional[int] = None
|
||||||
|
|
||||||
|
def asdict(self) -> dict:
|
||||||
|
"""Return dictionary suitable for sending over CBOR."""
|
||||||
|
d = asdict(self)
|
||||||
|
d["test"] = d.pop("test_vectors")
|
||||||
|
d["write"] = d.pop("write_vectors")
|
||||||
|
d["new-length"] = d.pop("new_length")
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
@frozen
|
||||||
|
class ReadTestWriteResult:
|
||||||
|
"""Result of sending read-test-write vectors."""
|
||||||
|
|
||||||
|
success: bool
|
||||||
|
# Map share numbers to reads corresponding to the request's list of
|
||||||
|
# ReadVectors:
|
||||||
|
reads: Mapping[int, Sequence[bytes]]
|
||||||
|
|
||||||
|
|
||||||
|
@frozen
|
||||||
|
class StorageClientMutables:
|
||||||
|
"""
|
||||||
|
APIs for interacting with mutables.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_client: StorageClient
|
||||||
|
|
||||||
|
@async_to_deferred
|
||||||
|
async def read_test_write_chunks(
|
||||||
|
self,
|
||||||
|
storage_index: bytes,
|
||||||
|
write_enabler_secret: bytes,
|
||||||
|
lease_renew_secret: bytes,
|
||||||
|
lease_cancel_secret: bytes,
|
||||||
|
testwrite_vectors: dict[int, TestWriteVectors],
|
||||||
|
read_vector: list[ReadVector],
|
||||||
|
) -> ReadTestWriteResult:
|
||||||
|
"""
|
||||||
|
Read, test, and possibly write chunks to a particular mutable storage
|
||||||
|
index.
|
||||||
|
|
||||||
|
Reads are done before writes.
|
||||||
|
|
||||||
|
Given a mapping between share numbers and test/write vectors, the tests
|
||||||
|
are done and if they are valid the writes are done.
|
||||||
|
"""
|
||||||
|
# TODO unit test all the things
|
||||||
|
url = self._client.relative_url(
|
||||||
|
"/v1/mutable/{}/read-test-write".format(_encode_si(storage_index))
|
||||||
|
)
|
||||||
|
message = {
|
||||||
|
"test-write-vectors": {
|
||||||
|
share_number: twv.asdict()
|
||||||
|
for (share_number, twv) in testwrite_vectors.items()
|
||||||
|
},
|
||||||
|
"read-vector": [asdict(r) for r in read_vector],
|
||||||
|
}
|
||||||
|
response = await self._client.request(
|
||||||
|
"POST",
|
||||||
|
url,
|
||||||
|
write_enabler_secret=write_enabler_secret,
|
||||||
|
lease_renew_secret=lease_renew_secret,
|
||||||
|
lease_cancel_secret=lease_cancel_secret,
|
||||||
|
message_to_serialize=message,
|
||||||
|
)
|
||||||
|
if response.code == http.OK:
|
||||||
|
result = await _decode_cbor(response, _SCHEMAS["mutable_read_test_write"])
|
||||||
|
return ReadTestWriteResult(success=result["success"], reads=result["data"])
|
||||||
|
else:
|
||||||
|
raise ClientException(response.code, (await response.content()))
|
||||||
|
|
||||||
|
def read_share_chunk(
|
||||||
|
self,
|
||||||
|
storage_index: bytes,
|
||||||
|
share_number: int,
|
||||||
|
offset: int,
|
||||||
|
length: int,
|
||||||
|
) -> bytes:
|
||||||
|
"""
|
||||||
|
Download a chunk of data from a share.
|
||||||
|
"""
|
||||||
|
# TODO unit test all the things
|
||||||
|
return read_share_chunk(
|
||||||
|
self._client, "mutable", storage_index, share_number, offset, length
|
||||||
|
)
|
||||||
|
|
|
@ -38,6 +38,7 @@ class Secrets(Enum):
|
||||||
LEASE_RENEW = "lease-renew-secret"
|
LEASE_RENEW = "lease-renew-secret"
|
||||||
LEASE_CANCEL = "lease-cancel-secret"
|
LEASE_CANCEL = "lease-cancel-secret"
|
||||||
UPLOAD = "upload-secret"
|
UPLOAD = "upload-secret"
|
||||||
|
WRITE_ENABLER = "write-enabler"
|
||||||
|
|
||||||
|
|
||||||
def get_spki_hash(certificate: Certificate) -> bytes:
|
def get_spki_hash(certificate: Certificate) -> bytes:
|
||||||
|
|
|
@ -239,19 +239,39 @@ class _HTTPError(Exception):
|
||||||
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258
|
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258
|
||||||
# indicates a set.
|
# indicates a set.
|
||||||
_SCHEMAS = {
|
_SCHEMAS = {
|
||||||
"allocate_buckets": Schema("""
|
"allocate_buckets": Schema(
|
||||||
message = {
|
"""
|
||||||
|
request = {
|
||||||
share-numbers: #6.258([* uint])
|
share-numbers: #6.258([* uint])
|
||||||
allocated-size: uint
|
allocated-size: uint
|
||||||
}
|
}
|
||||||
"""),
|
"""
|
||||||
"advise_corrupt_share": Schema("""
|
),
|
||||||
message = {
|
"advise_corrupt_share": Schema(
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
reason: tstr
|
reason: tstr
|
||||||
}
|
}
|
||||||
""")
|
"""
|
||||||
|
),
|
||||||
|
"mutable_read_test_write": Schema(
|
||||||
|
"""
|
||||||
|
request = {
|
||||||
|
"test-write-vectors": {
|
||||||
|
* share_number: {
|
||||||
|
"test": [* {"offset": uint, "size": uint, "specimen": bstr}]
|
||||||
|
"write": [* {"offset": uint, "data": bstr}]
|
||||||
|
"new-length": uint // null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"read-vector": [* {"offset": uint, "size": uint}]
|
||||||
|
}
|
||||||
|
share_number = uint
|
||||||
|
"""
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class HTTPServer(object):
|
class HTTPServer(object):
|
||||||
"""
|
"""
|
||||||
A HTTP interface to the storage server.
|
A HTTP interface to the storage server.
|
||||||
|
@ -537,7 +557,9 @@ class HTTPServer(object):
|
||||||
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
|
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
|
||||||
methods=["POST"],
|
methods=["POST"],
|
||||||
)
|
)
|
||||||
def advise_corrupt_share(self, request, authorization, storage_index, share_number):
|
def advise_corrupt_share_immutable(
|
||||||
|
self, request, authorization, storage_index, share_number
|
||||||
|
):
|
||||||
"""Indicate that given share is corrupt, with a text reason."""
|
"""Indicate that given share is corrupt, with a text reason."""
|
||||||
try:
|
try:
|
||||||
bucket = self._storage_server.get_buckets(storage_index)[share_number]
|
bucket = self._storage_server.get_buckets(storage_index)[share_number]
|
||||||
|
@ -548,6 +570,81 @@ class HTTPServer(object):
|
||||||
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
|
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
|
||||||
return b""
|
return b""
|
||||||
|
|
||||||
|
##### Mutable APIs #####
|
||||||
|
|
||||||
|
@_authorized_route(
|
||||||
|
_app,
|
||||||
|
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.WRITE_ENABLER},
|
||||||
|
"/v1/mutable/<storage_index:storage_index>/read-test-write",
|
||||||
|
methods=["POST"],
|
||||||
|
)
|
||||||
|
def mutable_read_test_write(self, request, authorization, storage_index):
|
||||||
|
"""Read/test/write combined operation for mutables."""
|
||||||
|
# TODO unit tests
|
||||||
|
rtw_request = self._read_encoded(request, _SCHEMAS["mutable_read_test_write"])
|
||||||
|
secrets = (
|
||||||
|
authorization[Secrets.WRITE_ENABLER],
|
||||||
|
authorization[Secrets.LEASE_RENEW],
|
||||||
|
authorization[Secrets.LEASE_CANCEL],
|
||||||
|
)
|
||||||
|
success, read_data = self._storage_server.slot_testv_and_readv_and_writev(
|
||||||
|
storage_index,
|
||||||
|
secrets,
|
||||||
|
{
|
||||||
|
k: (
|
||||||
|
[(d["offset"], d["size"], b"eq", d["specimen"]) for d in v["test"]],
|
||||||
|
[(d["offset"], d["data"]) for d in v["write"]],
|
||||||
|
v["new-length"],
|
||||||
|
)
|
||||||
|
for (k, v) in rtw_request["test-write-vectors"].items()
|
||||||
|
},
|
||||||
|
[(d["offset"], d["size"]) for d in rtw_request["read-vector"]],
|
||||||
|
)
|
||||||
|
return self._send_encoded(request, {"success": success, "data": read_data})
|
||||||
|
|
||||||
|
@_authorized_route(
|
||||||
|
_app,
|
||||||
|
set(),
|
||||||
|
"/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||||
|
methods=["GET"],
|
||||||
|
)
|
||||||
|
def read_mutable_chunk(self, request, authorization, storage_index, share_number):
|
||||||
|
"""Read a chunk from a mutable."""
|
||||||
|
if request.getHeader("range") is None:
|
||||||
|
# TODO in follow-up ticket
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
# TODO reduce duplication with immutable reads?
|
||||||
|
# TODO unit tests, perhaps shared if possible
|
||||||
|
range_header = parse_range_header(request.getHeader("range"))
|
||||||
|
if (
|
||||||
|
range_header is None
|
||||||
|
or range_header.units != "bytes"
|
||||||
|
or len(range_header.ranges) > 1 # more than one range
|
||||||
|
or range_header.ranges[0][1] is None # range without end
|
||||||
|
):
|
||||||
|
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
|
||||||
|
return b""
|
||||||
|
|
||||||
|
offset, end = range_header.ranges[0]
|
||||||
|
|
||||||
|
# TODO limit memory usage
|
||||||
|
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
|
||||||
|
data = self._storage_server.slot_readv(
|
||||||
|
storage_index, [share_number], [(offset, end - offset)]
|
||||||
|
)[share_number][0]
|
||||||
|
|
||||||
|
# TODO reduce duplication?
|
||||||
|
request.setResponseCode(http.PARTIAL_CONTENT)
|
||||||
|
if len(data):
|
||||||
|
# For empty bodies the content-range header makes no sense since
|
||||||
|
# the end of the range is inclusive.
|
||||||
|
request.setHeader(
|
||||||
|
"content-range",
|
||||||
|
ContentRange("bytes", offset, offset + len(data)).to_header(),
|
||||||
|
)
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
@implementer(IStreamServerEndpoint)
|
@implementer(IStreamServerEndpoint)
|
||||||
@attr.s
|
@attr.s
|
||||||
|
|
|
@ -77,7 +77,8 @@ from allmydata.util.hashutil import permute_server_hash
|
||||||
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
|
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
|
||||||
from allmydata.storage.http_client import (
|
from allmydata.storage.http_client import (
|
||||||
StorageClient, StorageClientImmutables, StorageClientGeneral,
|
StorageClient, StorageClientImmutables, StorageClientGeneral,
|
||||||
ClientException as HTTPClientException
|
ClientException as HTTPClientException, StorageClientMutables,
|
||||||
|
ReadVector, TestWriteVectors, WriteVector, TestVector
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1189,3 +1190,64 @@ class _HTTPStorageServer(object):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError() # future tickets
|
raise NotImplementedError() # future tickets
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def slot_readv(self, storage_index, shares, readv):
|
||||||
|
mutable_client = StorageClientMutables(self._http_client)
|
||||||
|
pending_reads = {}
|
||||||
|
reads = {}
|
||||||
|
# TODO if shares list is empty, that means list all shares, so we need
|
||||||
|
# to do a query to get that.
|
||||||
|
assert shares # TODO replace with call to list shares if and only if it's empty
|
||||||
|
|
||||||
|
# Start all the queries in parallel:
|
||||||
|
for share_number in shares:
|
||||||
|
share_reads = defer.gatherResults(
|
||||||
|
[
|
||||||
|
mutable_client.read_share_chunk(
|
||||||
|
storage_index, share_number, offset, length
|
||||||
|
)
|
||||||
|
for (offset, length) in readv
|
||||||
|
]
|
||||||
|
)
|
||||||
|
pending_reads[share_number] = share_reads
|
||||||
|
|
||||||
|
# Wait for all the queries to finish:
|
||||||
|
for share_number, pending_result in pending_reads.items():
|
||||||
|
reads[share_number] = yield pending_result
|
||||||
|
|
||||||
|
return reads
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def slot_testv_and_readv_and_writev(
|
||||||
|
self,
|
||||||
|
storage_index,
|
||||||
|
secrets,
|
||||||
|
tw_vectors,
|
||||||
|
r_vector,
|
||||||
|
):
|
||||||
|
mutable_client = StorageClientMutables(self._http_client)
|
||||||
|
we_secret, lr_secret, lc_secret = secrets
|
||||||
|
client_tw_vectors = {}
|
||||||
|
for share_num, (test_vector, data_vector, new_length) in tw_vectors.items():
|
||||||
|
client_test_vectors = [
|
||||||
|
TestVector(offset=offset, size=size, specimen=specimen)
|
||||||
|
for (offset, size, specimen) in test_vector
|
||||||
|
]
|
||||||
|
client_write_vectors = [
|
||||||
|
WriteVector(offset=offset, data=data) for (offset, data) in data_vector
|
||||||
|
]
|
||||||
|
client_tw_vectors[share_num] = TestWriteVectors(
|
||||||
|
test_vectors=client_test_vectors,
|
||||||
|
write_vectors=client_write_vectors,
|
||||||
|
new_length=new_length
|
||||||
|
)
|
||||||
|
client_read_vectors = [
|
||||||
|
ReadVector(offset=offset, size=size)
|
||||||
|
for (offset, size) in r_vector
|
||||||
|
]
|
||||||
|
client_result = yield mutable_client.read_test_write_chunks(
|
||||||
|
storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors,
|
||||||
|
client_read_vectors,
|
||||||
|
)
|
||||||
|
return (client_result.success, client_result.reads)
|
||||||
|
|
|
@ -129,3 +129,31 @@ class UntilTests(unittest.TestCase):
|
||||||
self.assertEqual([1], counter)
|
self.assertEqual([1], counter)
|
||||||
r1.callback(None)
|
r1.callback(None)
|
||||||
self.assertEqual([2], counter)
|
self.assertEqual([2], counter)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncToDeferred(unittest.TestCase):
|
||||||
|
"""Tests for ``deferredutil.async_to_deferred.``"""
|
||||||
|
|
||||||
|
def test_async_to_deferred_success(self):
|
||||||
|
"""
|
||||||
|
Normal results from a ``@async_to_deferred``-wrapped function get
|
||||||
|
turned into a ``Deferred`` with that value.
|
||||||
|
"""
|
||||||
|
@deferredutil.async_to_deferred
|
||||||
|
async def f(x, y):
|
||||||
|
return x + y
|
||||||
|
|
||||||
|
result = f(1, y=2)
|
||||||
|
self.assertEqual(self.successResultOf(result), 3)
|
||||||
|
|
||||||
|
def test_async_to_deferred_exception(self):
|
||||||
|
"""
|
||||||
|
Exceptions from a ``@async_to_deferred``-wrapped function get
|
||||||
|
turned into a ``Deferred`` with that value.
|
||||||
|
"""
|
||||||
|
@deferredutil.async_to_deferred
|
||||||
|
async def f(x, y):
|
||||||
|
return x/y
|
||||||
|
|
||||||
|
result = f(1, 0)
|
||||||
|
self.assertIsInstance(self.failureResultOf(result).value, ZeroDivisionError)
|
||||||
|
|
|
@ -1140,4 +1140,19 @@ class HTTPImmutableAPIsTests(
|
||||||
class FoolscapMutableAPIsTests(
|
class FoolscapMutableAPIsTests(
|
||||||
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
|
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
|
||||||
):
|
):
|
||||||
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
|
"""Foolscap-specific tests for mutable ``IStorageServer`` APIs."""
|
||||||
|
|
||||||
|
|
||||||
|
class HTTPMutableAPIsTests(
|
||||||
|
_HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
|
||||||
|
):
|
||||||
|
"""HTTP-specific tests for mutable ``IStorageServer`` APIs."""
|
||||||
|
|
||||||
|
# TODO will be implemented in later tickets
|
||||||
|
SKIP_TESTS = {
|
||||||
|
"test_STARAW_write_enabler_must_match",
|
||||||
|
"test_add_lease_renewal",
|
||||||
|
"test_add_new_lease",
|
||||||
|
"test_advise_corrupt_share",
|
||||||
|
"test_slot_readv_no_shares",
|
||||||
|
}
|
||||||
|
|
|
@ -6,14 +6,12 @@ server authentication logic, which may one day apply outside of HTTP Storage
|
||||||
Protocol.
|
Protocol.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from functools import wraps
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
|
|
||||||
from twisted.internet.endpoints import serverFromString
|
from twisted.internet.endpoints import serverFromString
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
from twisted.internet.defer import Deferred
|
|
||||||
from twisted.internet.task import deferLater
|
from twisted.internet.task import deferLater
|
||||||
from twisted.web.server import Site
|
from twisted.web.server import Site
|
||||||
from twisted.web.static import Data
|
from twisted.web.static import Data
|
||||||
|
@ -31,6 +29,7 @@ from .certs import (
|
||||||
from ..storage.http_common import get_spki_hash
|
from ..storage.http_common import get_spki_hash
|
||||||
from ..storage.http_client import _StorageClientHTTPSPolicy
|
from ..storage.http_client import _StorageClientHTTPSPolicy
|
||||||
from ..storage.http_server import _TLSEndpointWrapper
|
from ..storage.http_server import _TLSEndpointWrapper
|
||||||
|
from ..util.deferredutil import async_to_deferred
|
||||||
|
|
||||||
|
|
||||||
class HTTPSNurlTests(SyncTestCase):
|
class HTTPSNurlTests(SyncTestCase):
|
||||||
|
@ -73,20 +72,6 @@ ox5zO3LrQmQw11OaIAs2/kviKAoKTFFxeyYcpS5RuKNDZfHQCXlLwt9bySxG
|
||||||
self.assertEqual(get_spki_hash(certificate), expected_hash)
|
self.assertEqual(get_spki_hash(certificate), expected_hash)
|
||||||
|
|
||||||
|
|
||||||
def async_to_deferred(f):
|
|
||||||
"""
|
|
||||||
Wrap an async function to return a Deferred instead.
|
|
||||||
|
|
||||||
Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886
|
|
||||||
"""
|
|
||||||
|
|
||||||
@wraps(f)
|
|
||||||
def not_async(*args, **kwargs):
|
|
||||||
return Deferred.fromCoroutine(f(*args, **kwargs))
|
|
||||||
|
|
||||||
return not_async
|
|
||||||
|
|
||||||
|
|
||||||
class PinningHTTPSValidation(AsyncTestCase):
|
class PinningHTTPSValidation(AsyncTestCase):
|
||||||
"""
|
"""
|
||||||
Test client-side validation logic of HTTPS certificates that uses
|
Test client-side validation logic of HTTPS certificates that uses
|
||||||
|
|
|
@ -4,24 +4,13 @@ Utilities for working with Twisted Deferreds.
|
||||||
Ported to Python 3.
|
Ported to Python 3.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import absolute_import
|
|
||||||
from __future__ import division
|
|
||||||
from __future__ import print_function
|
|
||||||
from __future__ import unicode_literals
|
|
||||||
|
|
||||||
from future.utils import PY2
|
|
||||||
if PY2:
|
|
||||||
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
try:
|
from typing import (
|
||||||
from typing import (
|
Callable,
|
||||||
Callable,
|
Any,
|
||||||
Any,
|
)
|
||||||
)
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
from foolscap.api import eventually
|
from foolscap.api import eventually
|
||||||
from eliot.twisted import (
|
from eliot.twisted import (
|
||||||
|
@ -231,3 +220,17 @@ def until(
|
||||||
yield action()
|
yield action()
|
||||||
if condition():
|
if condition():
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
|
def async_to_deferred(f):
|
||||||
|
"""
|
||||||
|
Wrap an async function to return a Deferred instead.
|
||||||
|
|
||||||
|
Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886
|
||||||
|
"""
|
||||||
|
|
||||||
|
@wraps(f)
|
||||||
|
def not_async(*args, **kwargs):
|
||||||
|
return defer.Deferred.fromCoroutine(f(*args, **kwargs))
|
||||||
|
|
||||||
|
return not_async
|
||||||
|
|
Loading…
Reference in New Issue