Merge branch '3783-storage-client-http' into 3937-integration-http-storage

This commit is contained in:
Itamar Turner-Trauring 2022-11-16 11:07:35 -05:00
commit bb053c714a
11 changed files with 460 additions and 139 deletions

View File

@ -52,7 +52,7 @@ fi
# This is primarily aimed at catching hangs on the PyPy job which runs for
# about 21 minutes and then gets killed by CircleCI in a way that fails the
# job and bypasses our "allowed failure" logic.
TIMEOUT="timeout --kill-after 1m 15m"
TIMEOUT="timeout --kill-after 1m 25m"
# Run the test suite as a non-root user. This is the expected usage some
# small areas of the test suite assume non-root privileges (such as unreadable

0
newsfragments/3783.minor Normal file
View File

View File

@ -104,6 +104,7 @@ _client_config = configutil.ValidConfiguration(
"reserved_space",
"storage_dir",
"plugins",
"force_foolscap",
),
"sftpd": (
"accounts.file",
@ -823,9 +824,10 @@ class _Client(node.Node, pollmixin.PollMixin):
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file)
(_, _, swissnum) = decode_furl(furl)
self.storage_nurls = self.tub.negotiationClass.add_storage_server(
ss, swissnum.encode("ascii")
)
if hasattr(self.tub.negotiationClass, "add_storage_server"):
nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii"))
self.storage_nurls = nurls
announcement[storage_client.ANONYMOUS_STORAGE_NURLS] = [n.to_text() for n in nurls]
announcement["anonymous-storage-FURL"] = furl
enabled_storage_servers = self._enable_storage_servers(

View File

@ -697,7 +697,7 @@ def create_connection_handlers(config, i2p_provider, tor_provider):
def create_tub(tub_options, default_connection_handlers, foolscap_connection_handlers,
handler_overrides={}, **kwargs):
handler_overrides={}, force_foolscap=False, **kwargs):
"""
Create a Tub with the right options and handlers. It will be
ephemeral unless the caller provides certFile= in kwargs
@ -707,10 +707,16 @@ def create_tub(tub_options, default_connection_handlers, foolscap_connection_han
:param dict tub_options: every key-value pair in here will be set in
the new Tub via `Tub.setOption`
:param bool force_foolscap: If True, only allow Foolscap, not just HTTPS
storage protocol.
"""
# We listen simulataneously for both Foolscap and HTTPS on the same port,
# We listen simultaneously for both Foolscap and HTTPS on the same port,
# so we have to create a special Foolscap Tub for that to work:
tub = create_tub_with_https_support(**kwargs)
if force_foolscap:
tub = Tub(**kwargs)
else:
tub = create_tub_with_https_support(**kwargs)
for (name, value) in list(tub_options.items()):
tub.setOption(name, value)
@ -901,14 +907,20 @@ def create_main_tub(config, tub_options,
# FIXME? "node.pem" was the CERTFILE option/thing
certfile = config.get_private_path("node.pem")
tub = create_tub(
tub_options,
default_connection_handlers,
foolscap_connection_handlers,
# TODO eventually we will want the default to be False, but for now we
# don't want to enable HTTP by default.
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3934
force_foolscap=config.get_config(
"storage", "force_foolscap", default=True, boolean=True
),
handler_overrides=handler_overrides,
certFile=certfile,
)
if portlocation is None:
log.msg("Tub is not listening")
else:

View File

@ -186,7 +186,7 @@ class DaemonizeTheRealService(Service, HookMixin):
)
)
else:
self.stderr.write("\nUnknown error\n")
self.stderr.write("\nUnknown error, here's the traceback:\n")
reason.printTraceback(self.stderr)
reactor.stop()

View File

@ -20,7 +20,7 @@ from twisted.web.http_headers import Headers
from twisted.web import http
from twisted.web.iweb import IPolicyForHTTPS
from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred, succeed
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator, IReactorTime
from twisted.internet.ssl import CertificateOptions
from twisted.web.client import Agent, HTTPConnectionPool
from zope.interface import implementer
@ -276,42 +276,67 @@ class _StorageClientHTTPSPolicy:
)
@define
@define(hash=True)
class StorageClient(object):
"""
Low-level HTTP client that talks to the HTTP storage server.
"""
# If set, we're doing unit testing and we should call this with
# HTTPConnectionPool we create.
TEST_MODE_REGISTER_HTTP_POOL = None
@classmethod
def start_test_mode(cls, callback):
"""Switch to testing mode.
In testing mode we register the pool with test system using the given
callback so it can Do Things, most notably killing off idle HTTP
connections at test shutdown and, in some tests, in the midddle of the
test.
"""
cls.TEST_MODE_REGISTER_HTTP_POOL = callback
@classmethod
def stop_test_mode(cls):
"""Stop testing mode."""
cls.TEST_MODE_REGISTER_HTTP_POOL = None
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
# ``StorageClient.from_nurl()``.
_base_url: DecodedURL
_swissnum: bytes
_treq: Union[treq, StubTreq, HTTPClient]
_clock: IReactorTime
@classmethod
def from_nurl(
cls, nurl: DecodedURL, reactor, persistent: bool = True
cls,
nurl: DecodedURL,
reactor,
) -> StorageClient:
"""
Create a ``StorageClient`` for the given NURL.
``persistent`` indicates whether to use persistent HTTP connections.
"""
assert nurl.fragment == "v=1"
assert nurl.scheme == "pb"
swissnum = nurl.path[0].encode("ascii")
certificate_hash = nurl.user.encode("ascii")
pool = HTTPConnectionPool(reactor)
if cls.TEST_MODE_REGISTER_HTTP_POOL is not None:
cls.TEST_MODE_REGISTER_HTTP_POOL(pool)
treq_client = HTTPClient(
Agent(
reactor,
_StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
pool=HTTPConnectionPool(reactor, persistent=persistent),
pool=pool,
)
)
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
return cls(https_url, swissnum, treq_client)
return cls(https_url, swissnum, treq_client, reactor)
def relative_url(self, path):
"""Get a URL relative to the base URL."""
@ -379,13 +404,13 @@ class StorageClient(object):
return self._treq.request(method, url, headers=headers, **kwargs)
@define(hash=True)
class StorageClientGeneral(object):
"""
High-level HTTP APIs that aren't immutable- or mutable-specific.
"""
def __init__(self, client): # type: (StorageClient) -> None
self._client = client
_client: StorageClient
@inlineCallbacks
def get_version(self):
@ -534,7 +559,7 @@ async def advise_corrupt_share(
)
@define
@define(hash=True)
class StorageClientImmutables(object):
"""
APIs for interacting with immutables.

View File

@ -30,6 +30,8 @@ Ported to Python 3.
#
# 6: implement other sorts of IStorageClient classes: S3, etc
from __future__ import annotations
from six import ensure_text
from typing import Union
import re, time, hashlib
@ -37,13 +39,16 @@ from os import urandom
from configparser import NoSectionError
import attr
from hyperlink import DecodedURL
from zope.interface import (
Attribute,
Interface,
implementer,
)
from twisted.python.failure import Failure
from twisted.web import http
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from twisted.internet import defer, reactor
from twisted.application import service
from twisted.plugin import (
getPlugins,
@ -75,6 +80,8 @@ from allmydata.storage.http_client import (
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException
)
ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs"
# who is responsible for de-duplication?
# both?
@ -99,8 +106,8 @@ class StorageClientConfig(object):
:ivar preferred_peers: An iterable of the server-ids (``bytes``) of the
storage servers where share placement is preferred, in order of
decreasing preference. See the *[client]peers.preferred*
documentation for details.
decreasing preference. See the *[client]peers.preferred* documentation
for details.
:ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from
names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the
@ -262,6 +269,10 @@ class StorageFarmBroker(service.MultiService):
by the given announcement.
"""
assert isinstance(server_id, bytes)
if len(server["ann"].get(ANONYMOUS_STORAGE_NURLS, [])) > 0:
s = HTTPNativeStorageServer(server_id, server["ann"])
s.on_status_changed(lambda _: self._got_connection())
return s
handler_overrides = server.get("connections", {})
s = NativeStorageServer(
server_id,
@ -523,6 +534,45 @@ class IFoolscapStorageServer(Interface):
"""
def _parse_announcement(server_id: bytes, furl: bytes, ann: dict) -> tuple[str, bytes, bytes, bytes, bytes]:
"""
Parse the furl and announcement, return:
(nickname, permutation_seed, tubid, short_description, long_description)
"""
m = re.match(br'pb://(\w+)@', furl)
assert m, furl
tubid_s = m.group(1).lower()
tubid = base32.a2b(tubid_s)
if "permutation-seed-base32" in ann:
seed = ann["permutation-seed-base32"]
if isinstance(seed, str):
seed = seed.encode("utf-8")
ps = base32.a2b(seed)
elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id):
ps = base32.a2b(server_id[3:])
else:
log.msg("unable to parse serverid '%(server_id)s as pubkey, "
"hashing it to get permutation-seed, "
"may not converge with other clients",
server_id=server_id,
facility="tahoe.storage_broker",
level=log.UNUSUAL, umid="qu86tw")
ps = hashlib.sha256(server_id).digest()
permutation_seed = ps
assert server_id
long_description = server_id
if server_id.startswith(b"v0-"):
# remove v0- prefix from abbreviated name
short_description = server_id[3:3+8]
else:
short_description = server_id[:8]
nickname = ann.get("nickname", "")
return (nickname, permutation_seed, tubid, short_description, long_description)
@implementer(IFoolscapStorageServer)
@attr.s(frozen=True)
class _FoolscapStorage(object):
@ -566,43 +616,13 @@ class _FoolscapStorage(object):
The furl will be a Unicode string on Python 3; on Python 2 it will be
either a native (bytes) string or a Unicode string.
"""
furl = furl.encode("utf-8")
m = re.match(br'pb://(\w+)@', furl)
assert m, furl
tubid_s = m.group(1).lower()
tubid = base32.a2b(tubid_s)
if "permutation-seed-base32" in ann:
seed = ann["permutation-seed-base32"]
if isinstance(seed, str):
seed = seed.encode("utf-8")
ps = base32.a2b(seed)
elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id):
ps = base32.a2b(server_id[3:])
else:
log.msg("unable to parse serverid '%(server_id)s as pubkey, "
"hashing it to get permutation-seed, "
"may not converge with other clients",
server_id=server_id,
facility="tahoe.storage_broker",
level=log.UNUSUAL, umid="qu86tw")
ps = hashlib.sha256(server_id).digest()
permutation_seed = ps
assert server_id
long_description = server_id
if server_id.startswith(b"v0-"):
# remove v0- prefix from abbreviated name
short_description = server_id[3:3+8]
else:
short_description = server_id[:8]
nickname = ann.get("nickname", "")
(nickname, permutation_seed, tubid, short_description, long_description) = _parse_announcement(server_id, furl.encode("utf-8"), ann)
return cls(
nickname=nickname,
permutation_seed=permutation_seed,
tubid=tubid,
storage_server=storage_server,
furl=furl,
furl=furl.encode("utf-8"),
short_description=short_description,
long_description=long_description,
)
@ -684,6 +704,16 @@ def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref):
raise AnnouncementNotMatched()
def _available_space_from_version(version):
if version is None:
return None
protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict())
available_space = protocol_v1_version.get(b'available-space')
if available_space is None:
available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None)
return available_space
@implementer(IServer)
class NativeStorageServer(service.MultiService):
"""I hold information about a storage server that we want to connect to.
@ -842,13 +872,7 @@ class NativeStorageServer(service.MultiService):
def get_available_space(self):
version = self.get_version()
if version is None:
return None
protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict())
available_space = protocol_v1_version.get(b'available-space')
if available_space is None:
available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None)
return available_space
return _available_space_from_version(version)
def start_connecting(self, trigger_cb):
self._tub = self._tub_maker(self._handler_overrides)
@ -910,6 +934,164 @@ class NativeStorageServer(service.MultiService):
# used when the broker wants us to hurry up
self._reconnector.reset()
@implementer(IServer)
class HTTPNativeStorageServer(service.MultiService):
"""
Like ``NativeStorageServer``, but for HTTP clients.
The notion of being "connected" is less meaningful for HTTP; we just poll
occasionally, and if we've succeeded at last poll, we assume we're
"connected".
"""
def __init__(self, server_id: bytes, announcement, reactor=reactor):
service.MultiService.__init__(self)
assert isinstance(server_id, bytes)
self._server_id = server_id
self.announcement = announcement
self._on_status_changed = ObserverList()
self._reactor = reactor
furl = announcement["anonymous-storage-FURL"].encode("utf-8")
(
self._nickname,
self._permutation_seed,
self._tubid,
self._short_description,
self._long_description
) = _parse_announcement(server_id, furl, announcement)
# TODO need some way to do equivalent of Happy Eyeballs for multiple NURLs?
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3935
nurl = DecodedURL.from_text(announcement[ANONYMOUS_STORAGE_NURLS][0])
self._istorage_server = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor)
)
self._connection_status = connection_status.ConnectionStatus.unstarted()
self._version = None
self._last_connect_time = None
self._connecting_deferred = None
def get_permutation_seed(self):
return self._permutation_seed
def get_name(self):
return self._short_description
def get_longname(self):
return self._long_description
def get_tubid(self):
return self._tubid
def get_lease_seed(self):
# Apparently this is what Foolscap version above does?!
return self._tubid
def get_foolscap_write_enabler_seed(self):
return self._tubid
def get_nickname(self):
return self._nickname
def on_status_changed(self, status_changed):
"""
:param status_changed: a callable taking a single arg (the
NativeStorageServer) that is notified when we become connected
"""
return self._on_status_changed.subscribe(status_changed)
# Special methods used by copy.copy() and copy.deepcopy(). When those are
# used in allmydata.immutable.filenode to copy CheckResults during
# repair, we want it to treat the IServer instances as singletons, and
# not attempt to duplicate them..
def __copy__(self):
return self
def __deepcopy__(self, memodict):
return self
def __repr__(self):
return "<HTTPNativeStorageServer for %r>" % self.get_name()
def get_serverid(self):
return self._server_id
def get_version(self):
return self._version
def get_announcement(self):
return self.announcement
def get_connection_status(self):
return self._connection_status
def is_connected(self):
return self._connection_status.connected
def get_available_space(self):
version = self.get_version()
return _available_space_from_version(version)
def start_connecting(self, trigger_cb):
self._lc = LoopingCall(self._connect)
self._lc.start(1, True)
def _got_version(self, version):
self._last_connect_time = time.time()
self._version = version
self._connection_status = connection_status.ConnectionStatus(
True, "connected", [], self._last_connect_time, self._last_connect_time
)
self._on_status_changed.notify(self)
def _failed_to_connect(self, reason):
self._connection_status = connection_status.ConnectionStatus(
False, f"failure: {reason}", [], self._last_connect_time, self._last_connect_time
)
self._on_status_changed.notify(self)
def get_storage_server(self):
"""
See ``IServer.get_storage_server``.
"""
if self._connection_status.summary == "unstarted":
return None
return self._istorage_server
def stop_connecting(self):
self._lc.stop()
if self._connecting_deferred is not None:
self._connecting_deferred.cancel()
def try_to_connect(self):
self._connect()
def _connect(self):
result = self._istorage_server.get_version()
def remove_connecting_deferred(result):
self._connecting_deferred = None
return result
# Set a short timeout since we're relying on this for server liveness.
self._connecting_deferred = result.addTimeout(5, self._reactor).addBoth(
remove_connecting_deferred).addCallbacks(
self._got_version,
self._failed_to_connect
)
def stopService(self):
if self._connecting_deferred is not None:
self._connecting_deferred.cancel()
result = service.MultiService.stopService(self)
if self._lc.running:
self._lc.stop()
self._failed_to_connect("shut down")
return result
class UnknownServerTypeError(Exception):
pass
@ -1026,7 +1208,7 @@ class _StorageServer(object):
@attr.s
@attr.s(hash=True)
class _FakeRemoteReference(object):
"""
Emulate a Foolscap RemoteReference, calling a local object instead.
@ -1051,7 +1233,7 @@ class _HTTPBucketWriter(object):
storage_index = attr.ib(type=bytes)
share_number = attr.ib(type=int)
upload_secret = attr.ib(type=bytes)
finished = attr.ib(type=bool, default=False)
finished = attr.ib(type=defer.Deferred[bool], factory=defer.Deferred)
def abort(self):
return self.client.abort_upload(self.storage_index, self.share_number,
@ -1063,18 +1245,27 @@ class _HTTPBucketWriter(object):
self.storage_index, self.share_number, self.upload_secret, offset, data
)
if result.finished:
self.finished = True
self.finished.callback(True)
defer.returnValue(None)
def close(self):
# A no-op in HTTP protocol.
if not self.finished:
return defer.fail(RuntimeError("You didn't finish writing?!"))
return defer.succeed(None)
# We're not _really_ closed until all writes have succeeded and we
# finished writing all the data.
return self.finished
def _ignore_404(failure: Failure) -> Union[Failure, None]:
"""
Useful for advise_corrupt_share(), since it swallows unknown share numbers
in Foolscap.
"""
if failure.check(HTTPClientException) and failure.value.code == http.NOT_FOUND:
return None
else:
return failure
@attr.s
@attr.s(hash=True)
class _HTTPBucketReader(object):
"""
Emulate a ``RIBucketReader``, but use HTTP protocol underneath.
@ -1092,7 +1283,7 @@ class _HTTPBucketReader(object):
return self.client.advise_corrupt_share(
self.storage_index, self.share_number,
str(reason, "utf-8", errors="backslashreplace")
)
).addErrback(_ignore_404)
# WORK IN PROGRESS, for now it doesn't actually implement whole thing.
@ -1192,7 +1383,7 @@ class _HTTPStorageServer(object):
raise ValueError("Unknown share type")
return client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)
).addErrback(_ignore_404)
@defer.inlineCallbacks
def slot_readv(self, storage_index, shares, readv):

View File

@ -5,22 +5,14 @@ in ``allmydata.test.test_system``.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# Don't import bytes since it causes issues on (so far unported) modules on Python 2.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, dict, list, object, range, max, min, str # noqa: F401
from typing import Optional
import os
from functools import partial
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import deferLater
from twisted.application import service
from foolscap.api import flushEventualQueue
@ -28,6 +20,11 @@ from foolscap.api import flushEventualQueue
from allmydata import client
from allmydata.introducer.server import create_introducer
from allmydata.util import fileutil, log, pollmixin
from allmydata.storage import http_client
from allmydata.storage_client import (
NativeStorageServer,
HTTPNativeStorageServer,
)
from twisted.python.filepath import (
FilePath,
@ -644,7 +641,15 @@ def _render_section_values(values):
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# If set to True, use Foolscap for storage protocol. If set to False, HTTP
# will be used when possible. If set to None, this suggests a bug in the
# test code.
FORCE_FOOLSCAP_FOR_STORAGE : Optional[bool] = None
def setUp(self):
self._http_client_pools = []
http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool)
self.addCleanup(http_client.StorageClient.stop_test_mode)
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
@ -652,10 +657,35 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
self.sparent = service.MultiService()
self.sparent.startService()
def _got_new_http_connection_pool(self, pool):
# Register the pool for shutdown later:
self._http_client_pools.append(pool)
# Disable retries:
pool.retryAutomatically = False
# Make a much more aggressive timeout for connections, we're connecting
# locally after all... and also make sure it's lower than the delay we
# add in tearDown, to prevent dirty reactor issues.
getConnection = pool.getConnection
def getConnectionWithTimeout(*args, **kwargs):
d = getConnection(*args, **kwargs)
d.addTimeout(1, reactor)
return d
pool.getConnection = getConnectionWithTimeout
def close_idle_http_connections(self):
"""Close all HTTP client connections that are just hanging around."""
return defer.gatherResults(
[pool.closeCachedConnections() for pool in self._http_client_pools]
)
def tearDown(self):
log.msg("shutting down SystemTest services")
d = self.sparent.stopService()
d.addBoth(flush_but_dont_ignore)
d.addBoth(lambda x: self.close_idle_http_connections().addCallback(lambda _: x))
d.addBoth(lambda x: deferLater(reactor, 2, lambda: x))
return d
def getdir(self, subdir):
@ -714,21 +744,31 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
:return: A ``Deferred`` that fires when the nodes have connected to
each other.
"""
self.assertIn(
self.FORCE_FOOLSCAP_FOR_STORAGE, (True, False),
"You forgot to set FORCE_FOOLSCAP_FOR_STORAGE on {}".format(self.__class__)
)
self.numclients = NUMCLIENTS
self.introducer = yield self._create_introducer()
self.add_service(self.introducer)
self.introweb_url = self._get_introducer_web()
yield self._set_up_client_nodes()
yield self._set_up_client_nodes(self.FORCE_FOOLSCAP_FOR_STORAGE)
native_server = next(iter(self.clients[0].storage_broker.get_known_servers()))
if self.FORCE_FOOLSCAP_FOR_STORAGE:
expected_storage_server_class = NativeStorageServer
else:
expected_storage_server_class = HTTPNativeStorageServer
self.assertIsInstance(native_server, expected_storage_server_class)
@inlineCallbacks
def _set_up_client_nodes(self):
def _set_up_client_nodes(self, force_foolscap):
q = self.introducer
self.introducer_furl = q.introducer_url
self.clients = []
basedirs = []
for i in range(self.numclients):
basedirs.append((yield self._set_up_client_node(i)))
basedirs.append((yield self._set_up_client_node(i, force_foolscap)))
# start clients[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl).
@ -761,7 +801,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# and the helper-using webport
self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL()
def _generate_config(self, which, basedir):
def _generate_config(self, which, basedir, force_foolscap=False):
config = {}
allclients = set(range(self.numclients))
@ -791,6 +831,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
sethelper = partial(setconf, config, which, "helper")
setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,))
setconf(config, which, "storage", "force_foolscap", str(force_foolscap))
tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor)
setnode("tub.port", tub_port_endpoint)
@ -808,17 +849,16 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
" furl: %s\n") % self.introducer_furl
iyaml_fn = os.path.join(basedir, "private", "introducers.yaml")
fileutil.write(iyaml_fn, iyaml)
return _render_config(config)
def _set_up_client_node(self, which):
def _set_up_client_node(self, which, force_foolscap):
basedir = self.getdir("client%d" % (which,))
fileutil.make_dirs(os.path.join(basedir, "private"))
if len(SYSTEM_TEST_CERTS) > (which + 1):
f = open(os.path.join(basedir, "private", "node.pem"), "w")
f.write(SYSTEM_TEST_CERTS[which + 1])
f.close()
config = self._generate_config(which, basedir)
config = self._generate_config(which, basedir, force_foolscap)
fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
return basedir

View File

@ -15,9 +15,8 @@ from typing import Set
from random import Random
from unittest import SkipTest
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import Clock
from twisted.internet import reactor
from foolscap.api import Referenceable, RemoteException
# A better name for this would be IStorageClient...
@ -26,8 +25,6 @@ from allmydata.interfaces import IStorageServer
from .common_system import SystemTestMixin
from .common import AsyncTestCase
from allmydata.storage.server import StorageServer # not a IStorageServer!!
from allmydata.storage.http_client import StorageClient
from allmydata.storage_client import _HTTPStorageServer
# Use random generator with known seed, so results are reproducible if tests
@ -439,6 +436,17 @@ class IStorageServerImmutableAPIsTestsMixin(object):
b"immutable", storage_index, 0, b"ono"
)
@inlineCallbacks
def test_advise_corrupt_share_unknown_share_number(self):
"""
Calling ``advise_corrupt_share()`` on an immutable share, with an
unknown share number, does not result in error.
"""
storage_index, _, _ = yield self.create_share()
yield self.storage_client.advise_corrupt_share(
b"immutable", storage_index, 999, b"ono"
)
@inlineCallbacks
def test_allocate_buckets_creates_lease(self):
"""
@ -908,6 +916,19 @@ class IStorageServerMutableAPIsTestsMixin(object):
b"mutable", storage_index, 0, b"ono"
)
@inlineCallbacks
def test_advise_corrupt_share_unknown_share_number(self):
"""
Calling ``advise_corrupt_share()`` on a mutable share with an unknown
share number does not result in error (other behavior is opaque at this
level of abstraction).
"""
secrets, storage_index = yield self.create_slot()
yield self.storage_client.advise_corrupt_share(
b"mutable", storage_index, 999, b"ono"
)
@inlineCallbacks
def test_STARAW_create_lease(self):
"""
@ -1023,7 +1044,10 @@ class _SharedMixin(SystemTestMixin):
SKIP_TESTS = set() # type: Set[str]
def _get_istorage_server(self):
raise NotImplementedError("implement in subclass")
native_server = next(iter(self.clients[0].storage_broker.get_known_servers()))
client = native_server.get_storage_server()
self.assertTrue(IStorageServer.providedBy(client))
return client
@inlineCallbacks
def setUp(self):
@ -1046,7 +1070,7 @@ class _SharedMixin(SystemTestMixin):
self._clock = Clock()
self._clock.advance(123456)
self.server._clock = self._clock
self.storage_client = yield self._get_istorage_server()
self.storage_client = self._get_istorage_server()
def fake_time(self):
"""Return the current fake, test-controlled, time."""
@ -1062,51 +1086,29 @@ class _SharedMixin(SystemTestMixin):
yield SystemTestMixin.tearDown(self)
class _FoolscapMixin(_SharedMixin):
"""Run tests on Foolscap version of ``IStorageServer``."""
def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))
def _get_istorage_server(self):
client = self._get_native_server().get_storage_server()
self.assertTrue(IStorageServer.providedBy(client))
return succeed(client)
class _HTTPMixin(_SharedMixin):
"""Run tests on the HTTP version of ``IStorageServer``."""
def _get_istorage_server(self):
nurl = list(self.clients[0].storage_nurls)[0]
# Create HTTP client with non-persistent connections, so we don't leak
# state across tests:
client: IStorageServer = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor, persistent=False)
)
self.assertTrue(IStorageServer.providedBy(client))
return succeed(client)
class FoolscapSharedAPIsTests(
_FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
_SharedMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for shared ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = True
class HTTPSharedAPIsTests(
_HTTPMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
_SharedMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for shared ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = False
class FoolscapImmutableAPIsTests(
_FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
_SharedMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = True
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
@ -1129,23 +1131,29 @@ class FoolscapImmutableAPIsTests(
"""
current = self.storage_client
yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server()
self.storage_client = self._get_istorage_server()
assert self.storage_client is not current
class HTTPImmutableAPIsTests(
_HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
_SharedMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for immutable ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = False
class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
_SharedMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for mutable ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = True
class HTTPMutableAPIsTests(
_HTTPMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
_SharedMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for mutable ``IStorageServer`` APIs."""
FORCE_FOOLSCAP_FOR_STORAGE = False

View File

@ -291,6 +291,10 @@ class CustomHTTPServerTests(SyncTestCase):
def setUp(self):
super(CustomHTTPServerTests, self).setUp()
StorageClient.start_test_mode(
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
self.addCleanup(StorageClient.stop_test_mode)
# Could be a fixture, but will only be used in this test class so not
# going to bother:
self._http_server = TestApp()
@ -298,6 +302,7 @@ class CustomHTTPServerTests(SyncTestCase):
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=StubTreq(self._http_server._app.resource()),
clock=Clock(),
)
def test_authorization_enforcement(self):
@ -375,6 +380,10 @@ class HttpTestFixture(Fixture):
"""
def _setUp(self):
StorageClient.start_test_mode(
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
self.addCleanup(StorageClient.stop_test_mode)
self.clock = Clock()
self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in
@ -396,6 +405,7 @@ class HttpTestFixture(Fixture):
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=self.treq,
clock=self.clock,
)
def result_of_with_flush(self, d):
@ -480,6 +490,7 @@ class GenericHTTPAPITests(SyncTestCase):
DecodedURL.from_text("http://127.0.0.1"),
b"something wrong",
treq=StubTreq(self.http.http_server.get_resource()),
clock=self.http.clock,
)
)
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
@ -1441,7 +1452,9 @@ class SharedImmutableMutableTestsMixin:
self.http.client.request(
"GET",
self.http.client.relative_url(
"/storage/v1/{}/{}/1".format(self.KIND, _encode_si(storage_index))
"/storage/v1/{}/{}/1".format(
self.KIND, _encode_si(storage_index)
)
),
headers=headers,
)

View File

@ -117,11 +117,17 @@ class CountingDataUploadable(upload.Data):
class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
"""Foolscap integration-y tests."""
FORCE_FOOLSCAP_FOR_STORAGE = True
timeout = 180
@property
def basedir(self):
return "system/SystemTest/{}-foolscap-{}".format(
self.id().split(".")[-1], self.FORCE_FOOLSCAP_FOR_STORAGE
)
def test_connections(self):
self.basedir = "system/SystemTest/test_connections"
d = self.set_up_nodes()
self.extra_node = None
d.addCallback(lambda res: self.add_extra_node(self.numclients))
@ -149,11 +155,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
del test_connections
def test_upload_and_download_random_key(self):
self.basedir = "system/SystemTest/test_upload_and_download_random_key"
return self._test_upload_and_download(convergence=None)
def test_upload_and_download_convergent(self):
self.basedir = "system/SystemTest/test_upload_and_download_convergent"
return self._test_upload_and_download(convergence=b"some convergence string")
def _test_upload_and_download(self, convergence):
@ -516,7 +520,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def test_mutable(self):
self.basedir = "system/SystemTest/test_mutable"
DATA = b"initial contents go here." # 25 bytes % 3 != 0
DATA_uploadable = MutableData(DATA)
NEWDATA = b"new contents yay"
@ -746,7 +749,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
# plaintext_hash check.
def test_filesystem(self):
self.basedir = "system/SystemTest/test_filesystem"
self.data = LARGE_DATA
d = self.set_up_nodes()
def _new_happy_semantics(ign):
@ -1713,7 +1715,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def test_filesystem_with_cli_in_subprocess(self):
# We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
d = self.set_up_nodes()
def _new_happy_semantics(ign):
for c in self.clients:
@ -1794,9 +1795,21 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
class Connections(SystemTestMixin, unittest.TestCase):
FORCE_FOOLSCAP_FOR_STORAGE = True
def test_rref(self):
self.basedir = "system/Connections/rref"
# The way the listening port is created is via
# SameProcessStreamEndpointAssigner (allmydata.test.common), which then
# makes an endpoint string parsed by AdoptedServerPort. The latter does
# dup(fd), which results in the filedescriptor staying alive _until the
# test ends_. That means that when we disown the service, we still have
# the listening port there on the OS level! Just the resulting
# connections aren't handled. So this test relies on aggressive
# timeouts in the HTTP client and presumably some equivalent in
# Foolscap, since connection refused does _not_ happen.
self.basedir = "system/Connections/rref-foolscap-{}".format(
self.FORCE_FOOLSCAP_FOR_STORAGE
)
d = self.set_up_nodes(2)
def _start(ign):
self.c0 = self.clients[0]
@ -1812,9 +1825,13 @@ class Connections(SystemTestMixin, unittest.TestCase):
# now shut down the server
d.addCallback(lambda ign: self.clients[1].disownServiceParent())
# kill any persistent http connections that might continue to work
d.addCallback(lambda ign: self.close_idle_http_connections())
# and wait for the client to notice
def _poll():
return len(self.c0.storage_broker.get_connected_servers()) < 2
return len(self.c0.storage_broker.get_connected_servers()) == 1
d.addCallback(lambda ign: self.poll(_poll))
def _down(ign):
@ -1824,3 +1841,16 @@ class Connections(SystemTestMixin, unittest.TestCase):
self.assertEqual(storage_server, self.s1_storage_server)
d.addCallback(_down)
return d
class HTTPSystemTest(SystemTest):
"""HTTP storage protocol variant of the system tests."""
FORCE_FOOLSCAP_FOR_STORAGE = False
class HTTPConnections(Connections):
"""HTTP storage protocol variant of the connections tests."""
FORCE_FOOLSCAP_FOR_STORAGE = False