Merge pull request #1305 from tahoe-lafs/3910-http-storage-server-tor-support

HTTP storage server supports .onion addresses, and corresponding client support

Fixes ticket:3910
This commit is contained in:
Itamar Turner-Trauring 2023-06-26 09:01:59 -04:00 committed by GitHub
commit 0fb6c5ac18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 166 additions and 116 deletions

0
newsfragments/3910.minor Normal file
View File

View File

@ -102,8 +102,15 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
for location_hint in chain.from_iterable( for location_hint in chain.from_iterable(
hints.split(",") for hints in cls.tub.locationHints hints.split(",") for hints in cls.tub.locationHints
): ):
if location_hint.startswith("tcp:"): if location_hint.startswith("tcp:") or location_hint.startswith("tor:"):
_, hostname, port = location_hint.split(":") scheme, hostname, port = location_hint.split(":")
if scheme == "tcp":
subscheme = None
else:
subscheme = "tor"
# If we're listening on Tor, the hostname needs to have an
# .onion TLD.
assert hostname.endswith(".onion")
port = int(port) port = int(port)
storage_nurls.add( storage_nurls.add(
build_nurl( build_nurl(
@ -111,9 +118,10 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
port, port,
str(swissnum, "ascii"), str(swissnum, "ascii"),
cls.tub.myCertificate.original.to_cryptography(), cls.tub.myCertificate.original.to_cryptography(),
subscheme
) )
) )
# TODO this is probably where we'll have to support Tor and I2P? # TODO this is where we'll have to support Tor and I2P as well.
# See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3888#comment:9 # See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3888#comment:9
# for discussion (there will be separate tickets added for those at # for discussion (there will be separate tickets added for those at
# some point.) # some point.)

View File

@ -16,6 +16,7 @@ from typing import (
Set, Set,
Dict, Dict,
Callable, Callable,
ClassVar,
) )
from base64 import b64encode from base64 import b64encode
from io import BytesIO from io import BytesIO
@ -55,10 +56,20 @@ from .http_common import (
get_content_type, get_content_type,
CBOR_MIME_TYPE, CBOR_MIME_TYPE,
get_spki_hash, get_spki_hash,
response_is_not_html,
) )
from .common import si_b2a, si_to_human_readable from .common import si_b2a, si_to_human_readable
from ..util.hashutil import timing_safe_compare from ..util.hashutil import timing_safe_compare
from ..util.deferredutil import async_to_deferred from ..util.deferredutil import async_to_deferred
from ..util.tor_provider import _Provider as TorProvider
try:
from txtorcon import Tor # type: ignore
except ImportError:
class Tor: # type: ignore[no-redef]
pass
def _encode_si(si): # type: (bytes) -> str def _encode_si(si): # type: (bytes) -> str
@ -299,18 +310,30 @@ class _StorageClientHTTPSPolicy:
) )
@define(hash=True) @define
class StorageClient(object): class StorageClientFactory:
""" """
Low-level HTTP client that talks to the HTTP storage server. Create ``StorageClient`` instances, using appropriate
``twisted.web.iweb.IAgent`` for different connection methods: normal TCP,
Tor, and eventually I2P.
There is some caching involved since there might be shared setup work, e.g.
connecting to the local Tor service only needs to happen once.
""" """
# If set, we're doing unit testing and we should call this with _default_connection_handlers: dict[str, str]
# HTTPConnectionPool we create. _tor_provider: Optional[TorProvider]
TEST_MODE_REGISTER_HTTP_POOL = None # Cache the Tor instance created by the provider, if relevant.
_tor_instance: Optional[Tor] = None
# If set, we're doing unit testing and we should call this with any
# HTTPConnectionPool that gets passed/created to ``create_agent()``.
TEST_MODE_REGISTER_HTTP_POOL: ClassVar[
Optional[Callable[[HTTPConnectionPool], None]]
] = None
@classmethod @classmethod
def start_test_mode(cls, callback): def start_test_mode(cls, callback: Callable[[HTTPConnectionPool], None]) -> None:
"""Switch to testing mode. """Switch to testing mode.
In testing mode we register the pool with test system using the given In testing mode we register the pool with test system using the given
@ -325,66 +348,88 @@ class StorageClient(object):
"""Stop testing mode.""" """Stop testing mode."""
cls.TEST_MODE_REGISTER_HTTP_POOL = None cls.TEST_MODE_REGISTER_HTTP_POOL = None
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use async def _create_agent(
# ``StorageClient.from_nurl()``. self,
_base_url: DecodedURL
_swissnum: bytes
_treq: Union[treq, StubTreq, HTTPClient]
_pool: Optional[HTTPConnectionPool]
_clock: IReactorTime
@classmethod
def from_nurl(
cls,
nurl: DecodedURL, nurl: DecodedURL,
reactor,
# TODO default_connection_handlers should really be a class, not a dict
# of strings...
default_connection_handlers: dict[str, str],
pool: Optional[HTTPConnectionPool] = None,
agent_factory: Optional[
Callable[[object, IPolicyForHTTPS, HTTPConnectionPool], IAgent]
] = None,
) -> StorageClient:
"""
Create a ``StorageClient`` for the given NURL.
"""
# Safety check: if we're using normal TCP connections, we better not be
# configured for Tor or I2P.
if agent_factory is None:
assert default_connection_handlers["tcp"] == "tcp"
assert nurl.fragment == "v=1"
assert nurl.scheme == "pb"
swissnum = nurl.path[0].encode("ascii")
certificate_hash = nurl.user.encode("ascii")
if pool is None:
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
if cls.TEST_MODE_REGISTER_HTTP_POOL is not None:
cls.TEST_MODE_REGISTER_HTTP_POOL(pool)
def default_agent_factory(
reactor: object, reactor: object,
tls_context_factory: IPolicyForHTTPS, tls_context_factory: IPolicyForHTTPS,
pool: HTTPConnectionPool, pool: HTTPConnectionPool,
) -> IAgent: ) -> IAgent:
"""Create a new ``IAgent``, possibly using Tor."""
if self.TEST_MODE_REGISTER_HTTP_POOL is not None:
self.TEST_MODE_REGISTER_HTTP_POOL(pool)
# TODO default_connection_handlers should really be an object, not a
# dict, so we can ask "is this using Tor" without poking at a
# dictionary with arbitrary strings... See
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4032
handler = self._default_connection_handlers["tcp"]
if handler == "tcp":
return Agent(reactor, tls_context_factory, pool=pool) return Agent(reactor, tls_context_factory, pool=pool)
if handler == "tor" or nurl.scheme == "pb+tor":
assert self._tor_provider is not None
if self._tor_instance is None:
self._tor_instance = await self._tor_provider.get_tor_instance(reactor)
return self._tor_instance.web_agent(
pool=pool, tls_context_factory=tls_context_factory
)
else:
raise RuntimeError(f"Unsupported tcp connection handler: {handler}")
if agent_factory is None: async def create_storage_client(
agent_factory = default_agent_factory self,
nurl: DecodedURL,
reactor: IReactorTime,
pool: Optional[HTTPConnectionPool] = None,
) -> StorageClient:
"""Create a new ``StorageClient`` for the given NURL."""
assert nurl.fragment == "v=1"
assert nurl.scheme in ("pb", "pb+tor")
if pool is None:
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
treq_client = HTTPClient( certificate_hash = nurl.user.encode("ascii")
agent_factory( agent = await self._create_agent(
nurl,
reactor, reactor,
_StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash), _StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
pool, pool,
) )
treq_client = HTTPClient(agent)
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
swissnum = nurl.path[0].encode("ascii")
response_check = lambda _: None
if self.TEST_MODE_REGISTER_HTTP_POOL is not None:
response_check = response_is_not_html
return StorageClient(
https_url,
swissnum,
treq_client,
pool,
reactor,
response_check,
) )
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
return cls(https_url, swissnum, treq_client, pool, reactor) @define(hash=True)
class StorageClient(object):
"""
Low-level HTTP client that talks to the HTTP storage server.
Create using a ``StorageClientFactory`` instance.
"""
# The URL should be a HTTPS URL ("https://...")
_base_url: DecodedURL
_swissnum: bytes
_treq: Union[treq, StubTreq, HTTPClient]
_pool: HTTPConnectionPool
_clock: IReactorTime
# Are we running unit tests?
_analyze_response: Callable[[IResponse], None] = lambda _: None
def relative_url(self, path: str) -> DecodedURL: def relative_url(self, path: str) -> DecodedURL:
"""Get a URL relative to the base URL.""" """Get a URL relative to the base URL."""
@ -491,13 +536,7 @@ class StorageClient(object):
response = await self._treq.request( response = await self._treq.request(
method, url, headers=headers, timeout=timeout, **kwargs method, url, headers=headers, timeout=timeout, **kwargs
) )
self._analyze_response(response)
if self.TEST_MODE_REGISTER_HTTP_POOL is not None:
if response.code != 404:
# We're doing API queries, HTML is never correct except in 404, but
# it's the default for Twisted's web server so make sure nothing
# unexpected happened.
assert get_content_type(response.headers) != "text/html"
return response return response
@ -526,7 +565,6 @@ class StorageClient(object):
def shutdown(self) -> Deferred: def shutdown(self) -> Deferred:
"""Shutdown any connections.""" """Shutdown any connections."""
if self._pool is not None:
return self._pool.closeCachedConnections() return self._pool.closeCachedConnections()

View File

@ -12,6 +12,7 @@ from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from werkzeug.http import parse_options_header from werkzeug.http import parse_options_header
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
CBOR_MIME_TYPE = "application/cbor" CBOR_MIME_TYPE = "application/cbor"
@ -27,6 +28,18 @@ def get_content_type(headers: Headers) -> Optional[str]:
return content_type return content_type
def response_is_not_html(response: IResponse) -> None:
"""
During tests, this is registered so we can ensure the web server
doesn't give us text/html.
HTML is never correct except in 404, but it's the default for
Twisted's web server so we assert nothing unexpected happened.
"""
if response.code != 404:
assert get_content_type(response.headers) != "text/html"
def swissnum_auth_header(swissnum: bytes) -> bytes: def swissnum_auth_header(swissnum: bytes) -> bytes:
"""Return value for ``Authorization`` header.""" """Return value for ``Authorization`` header."""
return b"Tahoe-LAFS " + b64encode(swissnum).strip() return b"Tahoe-LAFS " + b64encode(swissnum).strip()

View File

@ -995,13 +995,20 @@ class _TLSEndpointWrapper(object):
def build_nurl( def build_nurl(
hostname: str, port: int, swissnum: str, certificate: CryptoCertificate hostname: str,
port: int,
swissnum: str,
certificate: CryptoCertificate,
subscheme: Optional[str] = None,
) -> DecodedURL: ) -> DecodedURL:
""" """
Construct a HTTPS NURL, given the hostname, port, server swissnum, and x509 Construct a HTTPS NURL, given the hostname, port, server swissnum, and x509
certificate for the server. Clients can then connect to the server using certificate for the server. Clients can then connect to the server using
this NURL. this NURL.
""" """
scheme = "pb"
if subscheme is not None:
scheme = f"{scheme}+{subscheme}"
return DecodedURL().replace( return DecodedURL().replace(
fragment="v=1", # how we know this NURL is HTTP-based (i.e. not Foolscap) fragment="v=1", # how we know this NURL is HTTP-based (i.e. not Foolscap)
host=hostname, host=hostname,
@ -1013,7 +1020,7 @@ def build_nurl(
"ascii", "ascii",
), ),
), ),
scheme="pb", scheme=scheme,
) )

View File

@ -51,7 +51,6 @@ from zope.interface import (
) )
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.web import http from twisted.web import http
from twisted.web.iweb import IAgent, IPolicyForHTTPS
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.application import service from twisted.application import service
@ -89,7 +88,8 @@ from allmydata.util.deferredutil import async_to_deferred, race
from allmydata.storage.http_client import ( from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral, StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException, StorageClientMutables, ClientException as HTTPClientException, StorageClientMutables,
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException,
StorageClientFactory
) )
from .node import _Config from .node import _Config
@ -1068,8 +1068,9 @@ class HTTPNativeStorageServer(service.MultiService):
self._on_status_changed = ObserverList() self._on_status_changed = ObserverList()
self._reactor = reactor self._reactor = reactor
self._grid_manager_verifier = grid_manager_verifier self._grid_manager_verifier = grid_manager_verifier
self._tor_provider = tor_provider self._storage_client_factory = StorageClientFactory(
self._default_connection_handlers = default_connection_handlers default_connection_handlers, tor_provider
)
furl = announcement["anonymous-storage-FURL"].encode("utf-8") furl = announcement["anonymous-storage-FURL"].encode("utf-8")
( (
@ -1232,26 +1233,6 @@ class HTTPNativeStorageServer(service.MultiService):
self._connecting_deferred = connecting self._connecting_deferred = connecting
return connecting return connecting
async def _agent_factory(self) -> Optional[Callable[[object, IPolicyForHTTPS, HTTPConnectionPool],IAgent]]:
"""Return a factory for ``twisted.web.iweb.IAgent``."""
# TODO default_connection_handlers should really be an object, not a
# dict, so we can ask "is this using Tor" without poking at a
# dictionary with arbitrary strings... See
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4032
handler = self._default_connection_handlers["tcp"]
if handler == "tcp":
return None
if handler == "tor":
assert self._tor_provider is not None
tor_instance = await self._tor_provider.get_tor_instance(self._reactor)
def agent_factory(reactor: object, tls_context_factory: IPolicyForHTTPS, pool: HTTPConnectionPool) -> IAgent:
assert reactor == self._reactor
return tor_instance.web_agent(pool=pool, tls_context_factory=tls_context_factory)
return agent_factory
else:
raise RuntimeError(f"Unsupported tcp connection handler: {handler}")
@async_to_deferred @async_to_deferred
async def _pick_server_and_get_version(self): async def _pick_server_and_get_version(self):
""" """
@ -1270,28 +1251,24 @@ class HTTPNativeStorageServer(service.MultiService):
# version() calls before we are live talking to a server, it could only # version() calls before we are live talking to a server, it could only
# be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992 # be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992
agent_factory = await self._agent_factory() @async_to_deferred
async def request(reactor, nurl: DecodedURL):
def request(reactor, nurl: DecodedURL):
# Since we're just using this one off to check if the NURL # Since we're just using this one off to check if the NURL
# works, no need for persistent pool or other fanciness. # works, no need for persistent pool or other fanciness.
pool = HTTPConnectionPool(reactor, persistent=False) pool = HTTPConnectionPool(reactor, persistent=False)
pool.retryAutomatically = False pool.retryAutomatically = False
return StorageClientGeneral( storage_client = await self._storage_client_factory.create_storage_client(
StorageClient.from_nurl( nurl, reactor, pool
nurl, reactor, self._default_connection_handlers, )
pool=pool, agent_factory=agent_factory) return await StorageClientGeneral(storage_client).get_version()
).get_version()
nurl = await _pick_a_http_server(reactor, self._nurls, request) nurl = await _pick_a_http_server(reactor, self._nurls, request)
# If we've gotten this far, we've found a working NURL. # If we've gotten this far, we've found a working NURL.
self._istorage_server = _HTTPStorageServer.from_http_client( storage_client = await self._storage_client_factory.create_storage_client(
StorageClient.from_nurl( nurl, reactor, None
nurl, reactor, self._default_connection_handlers,
agent_factory=agent_factory
)
) )
self._istorage_server = _HTTPStorageServer.from_http_client(storage_client)
return self._istorage_server return self._istorage_server
try: try:

View File

@ -686,8 +686,8 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def setUp(self): def setUp(self):
self._http_client_pools = [] self._http_client_pools = []
http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool) http_client.StorageClientFactory.start_test_mode(self._got_new_http_connection_pool)
self.addCleanup(http_client.StorageClient.stop_test_mode) self.addCleanup(http_client.StorageClientFactory.stop_test_mode)
self.port_assigner = SameProcessStreamEndpointAssigner() self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp() self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown) self.addCleanup(self.port_assigner.tearDown)

View File

@ -43,7 +43,11 @@ from testtools.matchers import Equals
from zope.interface import implementer from zope.interface import implementer
from .common import SyncTestCase from .common import SyncTestCase
from ..storage.http_common import get_content_type, CBOR_MIME_TYPE from ..storage.http_common import (
get_content_type,
CBOR_MIME_TYPE,
response_is_not_html,
)
from ..storage.common import si_b2a from ..storage.common import si_b2a
from ..storage.lease import LeaseInfo from ..storage.lease import LeaseInfo
from ..storage.server import StorageServer from ..storage.server import StorageServer
@ -58,6 +62,7 @@ from ..storage.http_server import (
) )
from ..storage.http_client import ( from ..storage.http_client import (
StorageClient, StorageClient,
StorageClientFactory,
ClientException, ClientException,
StorageClientImmutables, StorageClientImmutables,
ImmutableCreateResult, ImmutableCreateResult,
@ -315,7 +320,6 @@ def result_of(d):
+ "This is probably a test design issue." + "This is probably a test design issue."
) )
class CustomHTTPServerTests(SyncTestCase): class CustomHTTPServerTests(SyncTestCase):
""" """
Tests that use a custom HTTP server. Tests that use a custom HTTP server.
@ -323,10 +327,10 @@ class CustomHTTPServerTests(SyncTestCase):
def setUp(self): def setUp(self):
super(CustomHTTPServerTests, self).setUp() super(CustomHTTPServerTests, self).setUp()
StorageClient.start_test_mode( StorageClientFactory.start_test_mode(
lambda pool: self.addCleanup(pool.closeCachedConnections) lambda pool: self.addCleanup(pool.closeCachedConnections)
) )
self.addCleanup(StorageClient.stop_test_mode) self.addCleanup(StorageClientFactory.stop_test_mode)
# Could be a fixture, but will only be used in this test class so not # Could be a fixture, but will only be used in this test class so not
# going to bother: # going to bother:
self._http_server = TestApp() self._http_server = TestApp()
@ -341,6 +345,7 @@ class CustomHTTPServerTests(SyncTestCase):
# fixed if https://github.com/twisted/treq/issues/226 were ever # fixed if https://github.com/twisted/treq/issues/226 were ever
# fixed. # fixed.
clock=treq._agent._memoryReactor, clock=treq._agent._memoryReactor,
analyze_response=response_is_not_html,
) )
self._http_server.clock = self.client._clock self._http_server.clock = self.client._clock
@ -529,10 +534,10 @@ class HttpTestFixture(Fixture):
""" """
def _setUp(self): def _setUp(self):
StorageClient.start_test_mode( StorageClientFactory.start_test_mode(
lambda pool: self.addCleanup(pool.closeCachedConnections) lambda pool: self.addCleanup(pool.closeCachedConnections)
) )
self.addCleanup(StorageClient.stop_test_mode) self.addCleanup(StorageClientFactory.stop_test_mode)
self.clock = Reactor() self.clock = Reactor()
self.tempdir = self.useFixture(TempDir()) self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in # The global Cooperator used by Twisted (a) used by pull producers in
@ -558,6 +563,7 @@ class HttpTestFixture(Fixture):
treq=self.treq, treq=self.treq,
pool=None, pool=None,
clock=self.clock, clock=self.clock,
analyze_response=response_is_not_html,
) )
def result_of_with_flush(self, d): def result_of_with_flush(self, d):
@ -671,6 +677,7 @@ class GenericHTTPAPITests(SyncTestCase):
treq=StubTreq(self.http.http_server.get_resource()), treq=StubTreq(self.http.http_server.get_resource()),
pool=None, pool=None,
clock=self.http.clock, clock=self.http.clock,
analyze_response=response_is_not_html,
) )
) )
with assert_fails_with_http_code(self, http.UNAUTHORIZED): with assert_fails_with_http_code(self, http.UNAUTHORIZED):