More streaming, with tests passing again.

This commit is contained in:
Itamar Turner-Trauring 2022-06-27 14:44:51 -04:00
parent 06eca79263
commit 6dd2b2d583
2 changed files with 115 additions and 47 deletions

View File

@ -281,9 +281,10 @@ _SCHEMAS = {
@implementer(IPullProducer) @implementer(IPullProducer)
@define @define
class _ReadProducer: class _ReadAllProducer:
""" """
Producer that calls a read function, and writes to a request. Producer that calls a read function repeatedly to read all the data, and
writes to a request.
""" """
request: Request request: Request
@ -292,7 +293,7 @@ class _ReadProducer:
start: int = field(default=0) start: int = field(default=0)
def resumeProducing(self): def resumeProducing(self):
data = self.read_data(self.start, self.start + 65536) data = self.read_data(self.start, 65536)
if not data: if not data:
self.request.unregisterProducer() self.request.unregisterProducer()
d = self.result d = self.result
@ -309,6 +310,52 @@ class _ReadProducer:
pass pass
@implementer(IPullProducer)
@define
class _ReadRangeProducer:
"""
Producer that calls a read function to read a range of data, and writes to
a request.
"""
request: Request
read_data: Callable[[int, int], bytes]
result: Deferred
start: int
remaining: int
first_read: bool = field(default=True)
def resumeProducing(self):
to_read = min(self.remaining, 65536)
data = self.read_data(self.start, to_read)
assert len(data) <= to_read
if self.first_read and data:
# For empty bodies the content-range header makes no sense since
# the end of the range is inclusive.
self.request.setHeader(
"content-range",
ContentRange("bytes", self.start, self.start + len(data)).to_header(),
)
self.request.write(data)
if not data or len(data) < to_read:
self.request.unregisterProducer()
d = self.result
del self.result
d.callback(b"")
return
self.start += len(data)
self.remaining -= len(data)
assert self.remaining >= 0
def pauseProducing(self):
pass
def stopProducing(self):
pass
def read_range(request: Request, read_data: Callable[[int, int], bytes]) -> None: def read_range(request: Request, read_data: Callable[[int, int], bytes]) -> None:
""" """
Read an optional ``Range`` header, reads data appropriately via the given Read an optional ``Range`` header, reads data appropriately via the given
@ -324,9 +371,20 @@ def read_range(request: Request, read_data: Callable[[int, int], bytes]) -> None
The resulting data is written to the request. The resulting data is written to the request.
""" """
def read_data_with_error_handling(offset: int, length: int) -> bytes:
try:
return read_data(offset, length)
except _HTTPError as e:
request.setResponseCode(e.code)
# Empty read means we're done.
return b""
if request.getHeader("range") is None: if request.getHeader("range") is None:
d = Deferred() d = Deferred()
request.registerProducer(_ReadProducer(request, read_data, d), False) request.registerProducer(
_ReadAllProducer(request, read_data_with_error_handling, d), False
)
return d return d
range_header = parse_range_header(request.getHeader("range")) range_header = parse_range_header(request.getHeader("range"))
@ -339,21 +397,15 @@ def read_range(request: Request, read_data: Callable[[int, int], bytes]) -> None
raise _HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE) raise _HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE)
offset, end = range_header.ranges[0] offset, end = range_header.ranges[0]
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = read_data(offset, end - offset)
request.setResponseCode(http.PARTIAL_CONTENT) request.setResponseCode(http.PARTIAL_CONTENT)
if len(data): d = Deferred()
# For empty bodies the content-range header makes no sense since request.registerProducer(
# the end of the range is inclusive. _ReadRangeProducer(
request.setHeader( request, read_data_with_error_handling, d, offset, end - offset
"content-range", ),
ContentRange("bytes", offset, offset + len(data)).to_header(), False,
) )
request.write(data) return d
request.finish()
class HTTPServer(object): class HTTPServer(object):

View File

@ -10,7 +10,7 @@ from time import sleep, time
from cbor2 import dumps from cbor2 import dumps
from pycddl import ValidationError as CDDLValidationError from pycddl import ValidationError as CDDLValidationError
from hypothesis import assume, given, strategies as st from hypothesis import assume, given, strategies as st
from fixtures import Fixture, TempDir from fixtures import Fixture, TempDir, MockPatch
from treq.testing import StubTreq from treq.testing import StubTreq
from klein import Klein from klein import Klein
from hyperlink import DecodedURL from hyperlink import DecodedURL
@ -314,6 +314,12 @@ class HttpTestFixture(Fixture):
def _setUp(self): def _setUp(self):
self.clock = Clock() self.clock = Clock()
self.tempdir = self.useFixture(TempDir()) self.tempdir = self.useFixture(TempDir())
self.mock = self.useFixture(
MockPatch(
"twisted.internet.task._theCooperator",
Cooperator(scheduler=lambda c: self.clock.callLater(0.000001, c)),
)
)
self.storage_server = StorageServer( self.storage_server = StorageServer(
self.tempdir.path, b"\x00" * 20, clock=self.clock self.tempdir.path, b"\x00" * 20, clock=self.clock
) )
@ -325,6 +331,12 @@ class HttpTestFixture(Fixture):
treq=self.treq, treq=self.treq,
) )
def result_of_with_flush(self, d):
for i in range(100):
self.clock.advance(0.001)
self.treq.flush()
return result_of(d)
class StorageClientWithHeadersOverride(object): class StorageClientWithHeadersOverride(object):
"""Wrap ``StorageClient`` and override sent headers.""" """Wrap ``StorageClient`` and override sent headers."""
@ -548,7 +560,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# We can now read: # We can now read:
for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]: for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
downloaded = result_of( downloaded = self.http.result_of_with_flush(
self.imm_client.read_share_chunk(storage_index, 1, offset, length) self.imm_client.read_share_chunk(storage_index, 1, offset, length)
) )
self.assertEqual(downloaded, expected_data[offset : offset + length]) self.assertEqual(downloaded, expected_data[offset : offset + length])
@ -623,7 +635,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# The upload of share 1 succeeded, demonstrating that second create() # The upload of share 1 succeeded, demonstrating that second create()
# call didn't overwrite work-in-progress. # call didn't overwrite work-in-progress.
downloaded = result_of( downloaded = self.http.result_of_with_flush(
self.imm_client.read_share_chunk(storage_index, 1, 0, 100) self.imm_client.read_share_chunk(storage_index, 1, 0, 100)
) )
self.assertEqual(downloaded, b"a" * 50 + b"b" * 50) self.assertEqual(downloaded, b"a" * 50 + b"b" * 50)
@ -753,11 +765,15 @@ class ImmutableHTTPAPITests(SyncTestCase):
) )
) )
self.assertEqual( self.assertEqual(
result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)), self.http.result_of_with_flush(
self.imm_client.read_share_chunk(storage_index, 1, 0, 10)
),
b"1" * 10, b"1" * 10,
) )
self.assertEqual( self.assertEqual(
result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)), self.http.result_of_with_flush(
self.imm_client.read_share_chunk(storage_index, 2, 0, 10)
),
b"2" * 10, b"2" * 10,
) )
@ -921,7 +937,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# Abort didn't prevent reading: # Abort didn't prevent reading:
self.assertEqual( self.assertEqual(
uploaded_data, uploaded_data,
result_of( self.http.result_of_with_flush(
self.imm_client.read_share_chunk( self.imm_client.read_share_chunk(
storage_index, storage_index,
0, 0,
@ -986,8 +1002,12 @@ class MutableHTTPAPIsTests(SyncTestCase):
Written data can be read using ``read_share_chunk``. Written data can be read using ``read_share_chunk``.
""" """
storage_index, _, _ = self.create_upload() storage_index, _, _ = self.create_upload()
data0 = result_of(self.mut_client.read_share_chunk(storage_index, 0, 1, 7)) data0 = self.http.result_of_with_flush(
data1 = result_of(self.mut_client.read_share_chunk(storage_index, 1, 0, 8)) self.mut_client.read_share_chunk(storage_index, 0, 1, 7)
)
data1 = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
)
self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1")) self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1"))
def test_read_before_write(self): def test_read_before_write(self):
@ -1015,8 +1035,12 @@ class MutableHTTPAPIsTests(SyncTestCase):
), ),
) )
# But the write did happen: # But the write did happen:
data0 = result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)) data0 = self.http.result_of_with_flush(
data1 = result_of(self.mut_client.read_share_chunk(storage_index, 1, 0, 8)) self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
)
data1 = self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
)
self.assertEqual((data0, data1), (b"aXYZef-0", b"abcdef-1")) self.assertEqual((data0, data1), (b"aXYZef-0", b"abcdef-1"))
def test_conditional_write(self): def test_conditional_write(self):
@ -1057,7 +1081,9 @@ class MutableHTTPAPIsTests(SyncTestCase):
) )
self.assertTrue(result.success) self.assertTrue(result.success)
self.assertEqual( self.assertEqual(
result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)), self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
),
b"aXYZef-0", b"aXYZef-0",
) )
@ -1094,7 +1120,9 @@ class MutableHTTPAPIsTests(SyncTestCase):
# The write did not happen: # The write did not happen:
self.assertEqual( self.assertEqual(
result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)), self.http.result_of_with_flush(
self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
),
b"abcdef-0", b"abcdef-0",
) )
@ -1194,7 +1222,7 @@ class SharedImmutableMutableTestsMixin:
Reading from unknown storage index results in 404. Reading from unknown storage index results in 404.
""" """
with assert_fails_with_http_code(self, http.NOT_FOUND): with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of( self.http.result_of_with_flush(
self.client.read_share_chunk( self.client.read_share_chunk(
b"1" * 16, b"1" * 16,
1, 1,
@ -1209,7 +1237,7 @@ class SharedImmutableMutableTestsMixin:
""" """
storage_index, _, _ = self.upload(1) storage_index, _, _ = self.upload(1)
with assert_fails_with_http_code(self, http.NOT_FOUND): with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of( self.http.result_of_with_flush(
self.client.read_share_chunk( self.client.read_share_chunk(
storage_index, storage_index,
7, # different share number 7, # different share number
@ -1235,7 +1263,7 @@ class SharedImmutableMutableTestsMixin:
with assert_fails_with_http_code( with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE self, http.REQUESTED_RANGE_NOT_SATISFIABLE
): ):
result_of( self.http.result_of_with_flush(
client.read_share_chunk( client.read_share_chunk(
storage_index, storage_index,
1, 1,
@ -1264,20 +1292,8 @@ class SharedImmutableMutableTestsMixin:
""" """
A read with no range returns the whole mutable/immutable. A read with no range returns the whole mutable/immutable.
""" """
self.patch(
task,
"_theCooperator",
Cooperator(scheduler=lambda c: self.http.clock.callLater(0.000001, c)),
)
def result_of_with_flush(d):
for i in range(100):
self.http.clock.advance(0.001)
self.http.treq.flush()
return result_of(d)
storage_index, uploaded_data, _ = self.upload(1, data_length) storage_index, uploaded_data, _ = self.upload(1, data_length)
response = result_of_with_flush( response = self.http.result_of_with_flush(
self.http.client.request( self.http.client.request(
"GET", "GET",
self.http.client.relative_url( self.http.client.relative_url(
@ -1298,7 +1314,7 @@ class SharedImmutableMutableTestsMixin:
def check_range(requested_range, expected_response): def check_range(requested_range, expected_response):
headers = Headers() headers = Headers()
headers.setRawHeaders("range", [requested_range]) headers.setRawHeaders("range", [requested_range])
response = result_of( response = self.http.result_of_with_flush(
self.http.client.request( self.http.client.request(
"GET", "GET",
self.http.client.relative_url( self.http.client.relative_url(