Merge pull request #631 from tahoe-lafs/integration/storage-economics

Add a plugin system allowing for third-party storage server implementations.
This commit is contained in:
Jean-Paul Calderone 2019-11-18 08:45:33 -05:00 committed by GitHub
commit 74d56a8b46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 2761 additions and 229 deletions

View File

@ -9,6 +9,7 @@ Configuring a Tahoe-LAFS node
#. `Connection Management`_
#. `Client Configuration`_
#. `Storage Server Configuration`_
#. `Storage Server Plugin Configuration`_
#. `Frontend Configuration`_
#. `Running A Helper`_
#. `Running An Introducer`_
@ -738,6 +739,17 @@ Storage Server Configuration
for clients who do not wish to provide storage service. The default value
is ``True``.
``anonymous = (boolean, optional)``
If this is ``True``, the node will expose the storage server via Foolscap
without any additional authentication or authorization. The capability to
use all storage services is conferred by knowledge of the Foolscap fURL
for the storage server which will be included in the storage server's
announcement. If it is ``False``, the node will not expose this and
storage must be exposed using the storage server plugin system (see
`Storage Server Plugin Configuration`_ for details). The default value is
``True``.
``readonly = (boolean, optional)``
If ``True``, the node will run a storage server but will not accept any
@ -798,6 +810,33 @@ Storage Server Configuration
In addition,
see :doc:`accepting-donations` for a convention encouraging donations to storage server operators.
Storage Server Plugin Configuration
===================================
In addition to the built-in storage server,
it is also possible to load and configure storage server plugins into Tahoe-LAFS.
Plugins to load are specified in the ``[storage]`` section.
``plugins = (string, optional)``
This gives a comma-separated list of plugin names.
Plugins named here will be loaded and offered to clients.
The default is for no such plugins to be loaded.
Each plugin can also be configured in a dedicated section.
The section for each plugin is named after the plugin itself::
[storageserver.plugins.<plugin name>]
For example,
the configuration section for a plugin named ``acme-foo-v1`` is ``[storageserver.plugins.acme-foo-v1]``.
The contents of such sections are defined by the plugins themselves.
Refer to the documentation provided with those plugins.
Running A Helper
================

View File

@ -0,0 +1 @@
allmydata.interfaces.IFoolscapStoragePlugin has been introduced, an extension point for customizing the storage protocol.

View File

@ -0,0 +1 @@
Storage servers can now be configured to load plugins for allmydata.interfaces.IFoolscapStoragePlugin and offer them to clients.

View File

@ -0,0 +1 @@
Storage clients can now be configured to load plugins for allmydata.interfaces.IFoolscapStoragePlugin and use them to negotiate with servers.

0
newsfragments/3086.minor Normal file
View File

0
newsfragments/3097.minor Normal file
View File

0
newsfragments/3118.minor Normal file
View File

0
newsfragments/3119.minor Normal file
View File

View File

@ -0,0 +1 @@
The [storage] configuration section now accepts a boolean *anonymous* item to enable or disable anonymous storage access. The default behavior remains unchanged.

0
newsfragments/3242.minor Normal file
View File

0
newsfragments/3243.minor Normal file
View File

0
newsfragments/3248.minor Normal file
View File

0
newsfragments/3250.minor Normal file
View File

0
newsfragments/3264.minor Normal file
View File

View File

@ -2,8 +2,18 @@ import os, stat, time, weakref
from base64 import urlsafe_b64encode
from functools import partial
from errno import ENOENT, EPERM
from ConfigParser import NoSectionError
from foolscap.furl import (
decode_furl,
)
import attr
from zope.interface import implementer
from twisted.plugin import (
getPlugins,
)
from twisted.internet import reactor, defer
from twisted.application import service
from twisted.application.internet import TimerService
@ -18,7 +28,10 @@ from allmydata.immutable.upload import Uploader
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import (hashutil, base32, pollmixin, log, idlib, yamlutil)
from allmydata.util import (
hashutil, base32, pollmixin, log, idlib,
yamlutil, configutil,
)
from allmydata.util.encodingutil import (get_filesystem_encoding,
from_utf8_or_none)
from allmydata.util.abbreviate import parse_abbreviated_size
@ -27,7 +40,14 @@ from allmydata.util.i2p_provider import create as create_i2p_provider
from allmydata.util.tor_provider import create as create_tor_provider
from allmydata.stats import StatsProvider
from allmydata.history import History
from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION, DEFAULT_MAX_SEGMENT_SIZE
from allmydata.interfaces import (
IStatsProducer,
SDMF_VERSION,
MDMF_VERSION,
DEFAULT_MAX_SEGMENT_SIZE,
IFoolscapStoragePlugin,
IAnnounceableStorageServer,
)
from allmydata.nodemaker import NodeMaker
from allmydata.blacklist import Blacklist
from allmydata import node
@ -39,9 +59,20 @@ GiB=1024*MiB
TiB=1024*GiB
PiB=1024*TiB
def _valid_config_sections():
cfg = node._common_config_sections()
cfg.update({
def _is_valid_section(section_name):
"""
Check for valid dynamic configuration section names.
Currently considers all possible storage server plugin sections valid.
"""
return (
section_name.startswith(b"storageserver.plugins.") or
section_name.startswith(b"storageclient.plugins.")
)
_client_config = configutil.ValidConfiguration(
static_valid_sections={
"client": (
"helper.furl",
"introducer.furl",
@ -52,6 +83,7 @@ def _valid_config_sections():
"shares.needed",
"shares.total",
"stats_gatherer.furl",
"storage.plugins",
),
"drop_upload": ( # deprecated already?
"enabled",
@ -65,6 +97,7 @@ def _valid_config_sections():
"storage": (
"debug_discard",
"enabled",
"anonymous",
"expire.cutoff_date",
"expire.enabled",
"expire.immutable",
@ -75,6 +108,7 @@ def _valid_config_sections():
"readonly",
"reserved_space",
"storage_dir",
"plugins",
),
"sftpd": (
"accounts.file",
@ -93,8 +127,16 @@ def _valid_config_sections():
"local.directory",
"poll_interval",
),
})
return cfg
},
is_valid_section=_is_valid_section,
# Anything in a valid section is a valid item, for now.
is_valid_item=lambda section, ignored: _is_valid_section(section),
)
def _valid_config():
cfg = node._common_valid_config()
return cfg.update(_client_config)
# this is put into README in new node-directories
CLIENT_README = """
@ -180,10 +222,16 @@ def read_config(basedir, portnumfile, generated_files=[]):
return node.read_config(
basedir, portnumfile,
generated_files=generated_files,
_valid_config_sections=_valid_config_sections,
_valid_config=_valid_config(),
)
config_from_string = partial(
node.config_from_string,
_valid_config=_valid_config(),
)
def create_client(basedir=u".", _client_factory=None):
"""
Creates a new client instance (a subclass of Node).
@ -208,7 +256,8 @@ def create_client(basedir=u".", _client_factory=None):
return defer.fail()
def create_client_from_config(config, _client_factory=None):
@defer.inlineCallbacks
def create_client_from_config(config, _client_factory=None, _introducer_factory=None):
"""
Creates a new client instance (a subclass of Node). Most code
should probably use `create_client` instead.
@ -220,46 +269,175 @@ def create_client_from_config(config, _client_factory=None):
:param _client_factory: for testing; the class to instantiate
instead of _Client
:param _introducer_factory: for testing; the class to instantiate instead
of IntroducerClient
"""
try:
if _client_factory is None:
_client_factory = _Client
if _client_factory is None:
_client_factory = _Client
i2p_provider = create_i2p_provider(reactor, config)
tor_provider = create_tor_provider(reactor, config)
handlers = node.create_connection_handlers(reactor, config, i2p_provider, tor_provider)
default_connection_handlers, foolscap_connection_handlers = handlers
tub_options = node.create_tub_options(config)
i2p_provider = create_i2p_provider(reactor, config)
tor_provider = create_tor_provider(reactor, config)
handlers = node.create_connection_handlers(reactor, config, i2p_provider, tor_provider)
default_connection_handlers, foolscap_connection_handlers = handlers
tub_options = node.create_tub_options(config)
main_tub = node.create_main_tub(
config, tub_options, default_connection_handlers,
foolscap_connection_handlers, i2p_provider, tor_provider,
)
control_tub = node.create_control_tub()
main_tub = node.create_main_tub(
config, tub_options, default_connection_handlers,
foolscap_connection_handlers, i2p_provider, tor_provider,
)
control_tub = node.create_control_tub()
introducer_clients = create_introducer_clients(config, main_tub)
storage_broker = create_storage_farm_broker(
config, default_connection_handlers, foolscap_connection_handlers,
tub_options, introducer_clients
)
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
storage_broker = create_storage_farm_broker(
config, default_connection_handlers, foolscap_connection_handlers,
tub_options, introducer_clients
)
client = _client_factory(
client = _client_factory(
config,
main_tub,
control_tub,
i2p_provider,
tor_provider,
introducer_clients,
storage_broker,
)
# Initialize storage separately after creating the client. This is
# necessary because we need to pass a reference to the client in to the
# storage plugins to allow them to initialize themselves (specifically,
# they may want the anonymous IStorageServer implementation so they don't
# have to duplicate all of its basic storage functionality). A better way
# to do this, eventually, may be to create that implementation first and
# then pass it in to both storage plugin creation and the client factory.
# This avoids making a partially initialized client object escape the
# client factory and removes the circular dependency between these
# objects.
storage_plugins = yield _StoragePlugins.from_config(
client.get_anonymous_storage_server,
config,
)
client.init_storage(storage_plugins.announceable_storage_servers)
i2p_provider.setServiceParent(client)
tor_provider.setServiceParent(client)
for ic in introducer_clients:
ic.setServiceParent(client)
storage_broker.setServiceParent(client)
defer.returnValue(client)
@attr.s
class _StoragePlugins(object):
"""
Functionality related to getting storage plugins set up and ready for use.
:ivar list[IAnnounceableStorageServer] announceable_storage_servers: The
announceable storage servers that should be used according to node
configuration.
"""
announceable_storage_servers = attr.ib()
@classmethod
@defer.inlineCallbacks
def from_config(cls, get_anonymous_storage_server, config):
"""
Load and configured storage plugins.
:param get_anonymous_storage_server: A no-argument callable which
returns the node's anonymous ``IStorageServer`` implementation.
:param _Config config: The node's configuration.
:return: A ``_StoragePlugins`` initialized from the given
configuration.
"""
storage_plugin_names = cls._get_enabled_storage_plugin_names(config)
plugins = list(cls._collect_storage_plugins(storage_plugin_names))
unknown_plugin_names = storage_plugin_names - {plugin.name for plugin in plugins}
if unknown_plugin_names:
raise configutil.UnknownConfigError(
"Storage plugins {} are enabled but not known on this system.".format(
unknown_plugin_names,
),
)
announceable_storage_servers = yield cls._create_plugin_storage_servers(
get_anonymous_storage_server,
config,
main_tub,
control_tub,
i2p_provider,
tor_provider,
introducer_clients,
storage_broker,
plugins,
)
i2p_provider.setServiceParent(client)
tor_provider.setServiceParent(client)
for ic in introducer_clients:
ic.setServiceParent(client)
storage_broker.setServiceParent(client)
return defer.succeed(client)
except Exception:
return defer.fail()
defer.returnValue(cls(
announceable_storage_servers,
))
@classmethod
def _get_enabled_storage_plugin_names(cls, config):
"""
Get the names of storage plugins that are enabled in the configuration.
"""
return set(
config.get_config(
"storage", "plugins", b""
).decode("ascii").split(u",")
) - {u""}
@classmethod
def _collect_storage_plugins(cls, storage_plugin_names):
"""
Get the storage plugins with names matching those given.
"""
return list(
plugin
for plugin
in getPlugins(IFoolscapStoragePlugin)
if plugin.name in storage_plugin_names
)
@classmethod
def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins):
"""
Cause each storage plugin to instantiate its storage server and return
them all.
:return: A ``Deferred`` that fires with storage servers instantiated
by all of the given storage server plugins.
"""
return defer.gatherResults(
list(
plugin.get_storage_server(
cls._get_storage_plugin_configuration(config, plugin.name),
get_anonymous_storage_server,
).addCallback(
partial(
_add_to_announcement,
{u"name": plugin.name},
),
)
for plugin
# The order is fairly arbitrary and it is not meant to convey
# anything but providing *some* stable ordering makes the data
# a little easier to deal with (mainly in tests and when
# manually inspecting it).
in sorted(plugins, key=lambda p: p.name)
),
)
@classmethod
def _get_storage_plugin_configuration(cls, config, storage_plugin_name):
"""
Load the configuration for a storage server plugin with the given name.
:return dict[bytes, bytes]: The matching configuration.
"""
try:
config = config.items(
"storageserver.plugins." + storage_plugin_name,
)
except NoSectionError:
config = []
return dict(config)
def _sequencer(config):
@ -279,12 +457,18 @@ def _sequencer(config):
return seqnum, nonce
def create_introducer_clients(config, main_tub):
def create_introducer_clients(config, main_tub, _introducer_factory=None):
"""
Read, validate and parse any 'introducers.yaml' configuration.
:param _introducer_factory: for testing; the class to instantiate instead
of IntroducerClient
:returns: a list of IntroducerClient instances
"""
if _introducer_factory is None:
_introducer_factory = IntroducerClient
# we return this list
introducer_clients = []
@ -330,7 +514,7 @@ def create_introducer_clients(config, main_tub):
for petname, introducer in introducers.items():
introducer_cache_filepath = FilePath(config.get_private_path("introducer_{}_cache.yaml".format(petname)))
ic = IntroducerClient(
ic = _introducer_factory(
main_tub,
introducer['furl'].encode("ascii"),
config.nickname,
@ -361,8 +545,9 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con
:param list introducer_clients: IntroducerClient instances if
we're connecting to any
"""
ps = config.get_config("client", "peers.preferred", "").split(",")
preferred_peers = tuple([p.strip() for p in ps if p != ""])
storage_client_config = storage_client.StorageClientConfig.from_node_config(
config,
)
def tub_creator(handler_overrides=None, **kwargs):
return node.create_tub(
@ -376,13 +561,97 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con
sb = storage_client.StorageFarmBroker(
permute_peers=True,
tub_maker=tub_creator,
preferred_peers=preferred_peers,
node_config=config,
storage_client_config=storage_client_config,
)
for ic in introducer_clients:
sb.use_introducer(ic)
return sb
def _register_reference(key, config, tub, referenceable):
"""
Register a referenceable in a tub with a stable fURL.
Stability is achieved by storing the fURL in the configuration the first
time and then reading it back on for future calls.
:param bytes key: An identifier for this reference which can be used to
identify its fURL in the configuration.
:param _Config config: The configuration to use for fURL persistence.
:param Tub tub: The tub in which to register the reference.
:param Referenceable referenceable: The referenceable to register in the
Tub.
:return bytes: The fURL at which the object is registered.
"""
persisted_furl = config.get_private_config(
key,
default=None,
)
name = None
if persisted_furl is not None:
_, _, name = decode_furl(persisted_furl)
registered_furl = tub.registerReference(
referenceable,
name=name,
)
if persisted_furl is None:
config.write_private_config(key, registered_furl)
return registered_furl
@implementer(IAnnounceableStorageServer)
@attr.s
class AnnounceableStorageServer(object):
announcement = attr.ib()
storage_server = attr.ib()
def _add_to_announcement(information, announceable_storage_server):
"""
Create a new ``AnnounceableStorageServer`` based on
``announceable_storage_server`` with ``information`` added to its
``announcement``.
"""
updated_announcement = announceable_storage_server.announcement.copy()
updated_announcement.update(information)
return AnnounceableStorageServer(
updated_announcement,
announceable_storage_server.storage_server,
)
def storage_enabled(config):
"""
Is storage enabled according to the given configuration object?
:param _Config config: The configuration to inspect.
:return bool: ``True`` if storage is enabled, ``False`` otherwise.
"""
return config.get_config(b"storage", b"enabled", True, boolean=True)
def anonymous_storage_enabled(config):
"""
Is anonymous access to storage enabled according to the given
configuration object?
:param _Config config: The configuration to inspect.
:return bool: ``True`` if storage is enabled, ``False`` otherwise.
"""
return (
storage_enabled(config) and
config.get_config(b"storage", b"anonymous", True, boolean=True)
)
@implementer(IStatsProducer)
class _Client(node.Node, pollmixin.PollMixin):
@ -423,7 +692,6 @@ class _Client(node.Node, pollmixin.PollMixin):
self.init_stats_provider()
self.init_secrets()
self.init_node_key()
self.init_storage()
self.init_control()
self._key_generator = KeyGenerator()
key_gen_furl = config.get_config("client", "key_generator.furl", None)
@ -524,13 +792,24 @@ class _Client(node.Node, pollmixin.PollMixin):
self.config.write_config_file("permutation-seed", seed+"\n")
return seed.strip()
def init_storage(self):
# should we run a storage server (and publish it for others to use)?
if not self.config.get_config("storage", "enabled", True, boolean=True):
return
if not self._is_tub_listening():
raise ValueError("config error: storage is enabled, but tub "
"is not listening ('tub.port=' is empty)")
def get_anonymous_storage_server(self):
"""
Get the anonymous ``IStorageServer`` implementation for this node.
Note this will return an object even if storage is disabled on this
node (but the object will not be exposed, peers will not be able to
access it, and storage will remain disabled).
The one and only instance for this node is always returned. It is
created first if necessary.
"""
try:
ss = self.getServiceNamed(StorageServer.name)
except KeyError:
pass
else:
return ss
readonly = self.config.get_config("storage", "readonly", False, boolean=True)
config_storedir = self.get_config(
@ -583,14 +862,81 @@ class _Client(node.Node, pollmixin.PollMixin):
expiration_cutoff_date=cutoff_date,
expiration_sharetypes=expiration_sharetypes)
ss.setServiceParent(self)
return ss
def init_storage(self, announceable_storage_servers):
# should we run a storage server (and publish it for others to use)?
if not storage_enabled(self.config):
return
if not self._is_tub_listening():
raise ValueError("config error: storage is enabled, but tub "
"is not listening ('tub.port=' is empty)")
ss = self.get_anonymous_storage_server()
announcement = {
"permutation-seed-base32": self._init_permutation_seed(ss),
}
if anonymous_storage_enabled(self.config):
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file)
announcement["anonymous-storage-FURL"] = furl
enabled_storage_servers = self._enable_storage_servers(
announceable_storage_servers,
)
storage_options = list(
storage_server.announcement
for storage_server
in enabled_storage_servers
)
plugins_announcement = {}
if storage_options:
# Only add the new key if there are any plugins enabled.
plugins_announcement[u"storage-options"] = storage_options
announcement.update(plugins_announcement)
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file)
ann = {"anonymous-storage-FURL": furl,
"permutation-seed-base32": self._init_permutation_seed(ss),
}
for ic in self.introducer_clients:
ic.publish("storage", ann, self._node_private_key)
ic.publish("storage", announcement, self._node_private_key)
def get_client_storage_plugin_web_resources(self):
"""
Get all of the client-side ``IResource`` implementations provided by
enabled storage plugins.
:return dict[bytes, IResource provider]: The implementations.
"""
return self.storage_broker.get_client_storage_plugin_web_resources(
self.config,
)
def _enable_storage_servers(self, announceable_storage_servers):
"""
Register and announce the given storage servers.
"""
for announceable in announceable_storage_servers:
yield self._enable_storage_server(announceable)
def _enable_storage_server(self, announceable_storage_server):
"""
Register a storage server.
"""
config_key = b"storage-plugin.{}.furl".format(
# Oops, why don't I have a better handle on this value?
announceable_storage_server.announcement[u"name"],
)
furl = _register_reference(
config_key,
self.config,
self.tub,
announceable_storage_server.storage_server,
)
announceable_storage_server = _add_to_announcement(
{u"storage-server-FURL": furl},
announceable_storage_server,
)
return announceable_storage_server
def init_client(self):
helper_furl = self.config.get_config("client", "helper.furl", None)

View File

@ -1,5 +1,8 @@
from zope.interface import Interface, Attribute
from twisted.plugin import (
IPlugin,
)
from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \
ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
@ -3023,3 +3026,132 @@ class IConnectionStatus(Interface):
connection hint and the handler it is using) to the status string
(pending, connected, refused, or other errors).
""")
class IFoolscapStoragePlugin(IPlugin):
"""
An ``IStoragePlugin`` provides client- and server-side implementations of
a Foolscap-based protocol which can be used to store and retrieve data.
Implementations are free to apply access control or authorization policies
to this storage service and doing so is a large part of the motivation for
providing this point of pluggability.
There should be enough information and hook points to support at
least these use-cases:
- anonymous, everything allowed (current default)
- "storage club" / "friend-net" (possibly identity based)
- cryptocurrencies (ideally, paying for each API call)
- anonymous tokens (payment for service, but without identities)
"""
name = Attribute(
"""
A name for referring to this plugin. This name is both user-facing
(for example, it is written in configuration files) and machine-facing
(for example, it may be used to construct URLs). It should be unique
across all plugins for this interface. Two plugins with the same name
cannot be used in one client.
Because it is used to construct URLs, it is constrained to URL safe
characters (it must be a *segment* as defined by RFC 3986, section
3.3).
:type: ``unicode``
"""
)
def get_storage_server(configuration, get_anonymous_storage_server):
"""
Get an ``IAnnounceableStorageServer`` provider that gives an announcement
for and an implementation of the server side of the storage protocol.
This will be exposed and offered to clients in the storage server's
announcement.
:param dict configuration: Any configuration given in the section for
this plugin in the node's configuration file. As an example, the
configuration for the original anonymous-access filesystem-based
storage server might look like::
{u"storedir": u"/foo/bar/storage",
u"nodeid": u"abcdefg...",
u"reserved_space": 0,
u"discard_storage": False,
u"readonly_storage": False,
u"expiration_enabled": False,
u"expiration_mode": u"age",
u"expiration_override_lease_duration": None,
u"expiration_cutoff_date": None,
u"expiration_sharetypes": (u"mutable, u"immutable"),
}
:param get_anonymous_storage_server: A no-argument callable which
returns a single instance of the original, anonymous-access
storage server. This may be helpful in providing actual storage
implementation behavior for a wrapper-style plugin. This is also
provided to keep the Python API offered by Tahoe-LAFS to plugin
developers narrow (do not try to find and instantiate the original
storage server yourself; if you want it, call this).
:rtype: ``Deferred`` firing with ``IAnnounceableStorageServer``
"""
def get_storage_client(configuration, announcement, get_rref):
"""
Get an ``IStorageServer`` provider that implements the client side of the
storage protocol.
:param allmydata.node._Config configuration: A representation of the
configuration for the node into which this plugin has been loaded.
:param dict announcement: The announcement for the corresponding
server portion of this plugin received from a storage server which
is offering it.
:param get_rref: A no-argument callable which returns a
``foolscap.referenceable.RemoteReference`` which refers to the
server portion of this plugin on the currently active connection,
or ``None`` if no connection has been established yet.
:rtype: ``IStorageServer``
"""
def get_client_resource(configuration):
"""
Get an ``IResource`` that can be published in the Tahoe-LAFS web interface
to expose information related to this plugin.
:param allmydata.node._Config configuration: A representation of the
configuration for the node into which this plugin has been loaded.
:rtype: ``IResource``
"""
class IAnnounceableStorageServer(Interface):
announcement = Attribute(
"""
Data for an announcement for the associated storage server.
:note: This does not include the storage server nickname nor Foolscap
fURL. These will be added to the announcement automatically. It
may be usual for this announcement to contain no information.
Once the client connects to this server it can use other methods
to query for additional information (eg, in the manner of
``RIStorageServer.remote_get_version``). The announcement only
needs to contain information to help the client determine how to
connect.
:type: ``dict`` of JSON-serializable types
"""
)
storage_server = Attribute(
"""
A Foolscap referenceable object implementing the server side of the
storage protocol.
:type: ``IReferenceable`` provider
"""
)

View File

@ -30,10 +30,7 @@ are set to disallow users other than its owner from reading the contents of
the files. See the 'configuration.rst' documentation file for details.
"""
def _valid_config_sections():
return node._common_config_sections()
_valid_config = node._common_valid_config
class FurlFileConflictError(Exception):
pass
@ -52,7 +49,7 @@ def create_introducer(basedir=u"."):
config = read_config(
basedir, u"client.port",
generated_files=["introducer.furl"],
_valid_config_sections=_valid_config_sections,
_valid_config=_valid_config(),
)
i2p_provider = create_i2p_provider(reactor, config)

View File

@ -25,8 +25,8 @@ from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.encodingutil import get_filesystem_encoding, quote_output
from allmydata.util import configutil
def _common_config_sections():
return {
def _common_valid_config():
return configutil.ValidConfiguration({
"connections": (
"tcp",
),
@ -63,7 +63,7 @@ def _common_config_sections():
"onion.external_port",
"onion.private_key_file",
),
}
})
# Add our application versions to the data that Foolscap's LogPublisher
# reports.
@ -152,7 +152,7 @@ def create_node_dir(basedir, readme_text):
f.write(readme_text)
def read_config(basedir, portnumfile, generated_files=[], _valid_config_sections=None):
def read_config(basedir, portnumfile, generated_files=[], _valid_config=None):
"""
Read and validate configuration.
@ -163,15 +163,14 @@ def read_config(basedir, portnumfile, generated_files=[], _valid_config_sections
:param list generated_files: a list of automatically-generated
configuration files.
:param dict _valid_config_sections: (internal use, optional) a
dict-of-dicts structure defining valid configuration sections and
keys
:param ValidConfiguration _valid_config: (internal use, optional) a
structure defining valid configuration sections and keys
:returns: :class:`allmydata.node._Config` instance
"""
basedir = abspath_expanduser_unicode(unicode(basedir))
if _valid_config_sections is None:
_valid_config_sections = _common_config_sections
if _valid_config is None:
_valid_config = _common_valid_config()
# complain if there's bad stuff in the config dir
_error_about_old_config_files(basedir, generated_files)
@ -188,7 +187,7 @@ def read_config(basedir, portnumfile, generated_files=[], _valid_config_sections
if e.errno != errno.ENOENT:
raise
configutil.validate_config(config_fname, parser, _valid_config_sections())
configutil.validate_config(config_fname, parser, _valid_config)
# make sure we have a private configuration area
fileutil.make_dirs(os.path.join(basedir, "private"), 0o700)
@ -196,14 +195,20 @@ def read_config(basedir, portnumfile, generated_files=[], _valid_config_sections
return _Config(parser, portnumfile, basedir, config_fname)
def config_from_string(basedir, portnumfile, config_str):
def config_from_string(basedir, portnumfile, config_str, _valid_config=None):
"""
load configuration from in-memory string
load and validate configuration from in-memory string
"""
if _valid_config is None:
_valid_config = _common_valid_config()
# load configuration from in-memory string
parser = ConfigParser.SafeConfigParser()
parser.readfp(BytesIO(config_str))
return _Config(parser, portnumfile, basedir, '<in-memory>')
fname = "<in-memory>"
configutil.validate_config(fname, parser, _valid_config)
return _Config(parser, portnumfile, basedir, fname)
def get_app_versions():
@ -287,6 +292,14 @@ class _Config(object):
"Unable to write config file '{}'".format(fn),
)
def items(self, section, default=_None):
try:
return self.config.items(section)
except ConfigParser.NoSectionError:
if default is _None:
raise
return default
def get_config(self, section, option, default=_None, boolean=False):
try:
if boolean:

View File

@ -115,6 +115,7 @@ class DaemonizeTheRealService(Service, HookMixin):
- 'running': triggered when startup has completed; it triggers
with None of successful or a Failure otherwise.
"""
stderr = sys.stderr
def __init__(self, nodetype, basedir, options):
super(DaemonizeTheRealService, self).__init__()
@ -145,10 +146,12 @@ class DaemonizeTheRealService(Service, HookMixin):
raise ValueError("unknown nodetype %s" % self.nodetype)
def handle_config_error(fail):
fail.trap(UnknownConfigError)
self.stderr.write("\nConfiguration error:\n{}\n\n".format(fail.value))
if fail.check(UnknownConfigError):
self.stderr.write("\nConfiguration error:\n{}\n\n".format(fail.value))
else:
self.stderr.write("\nUnknown error\n")
fail.printTraceback(self.stderr)
reactor.stop()
return
d = service_factory()

View File

@ -30,19 +30,33 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
import re, time, hashlib
from ConfigParser import (
NoSectionError,
)
import attr
from zope.interface import implementer
from zope.interface import (
Attribute,
Interface,
implementer,
)
from twisted.internet import defer
from twisted.application import service
from twisted.plugin import (
getPlugins,
)
from eliot import (
log_call,
)
from foolscap.api import eventually
from foolscap.reconnector import (
ReconnectionInfo,
)
from allmydata.interfaces import (
IStorageBroker,
IDisplayableServer,
IServer,
IStorageServer,
IFoolscapStoragePlugin,
)
from allmydata.util import log, base32, connection_status
from allmydata.util.assertutil import precondition
@ -66,6 +80,58 @@ from allmydata.util.hashutil import permute_server_hash
# look like?
# don't pass signatures: only pass validated blessed-objects
@attr.s
class StorageClientConfig(object):
"""
Configuration for a node acting as a storage client.
:ivar preferred_peers: An iterable of the server-ids (``bytes``) of the
storage servers where share placement is preferred, in order of
decreasing preference. See the *[client]peers.preferred*
documentation for details.
:ivar dict[unicode, dict[bytes, bytes]] storage_plugins: A mapping from
names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the
respective configuration.
"""
preferred_peers = attr.ib(default=())
storage_plugins = attr.ib(default=attr.Factory(dict))
@classmethod
def from_node_config(cls, config):
"""
Create a ``StorageClientConfig`` from a complete Tahoe-LAFS node
configuration.
:param _Config config: The loaded Tahoe-LAFS node configuration.
"""
ps = config.get_config("client", "peers.preferred", b"").split(b",")
preferred_peers = tuple([p.strip() for p in ps if p != b""])
enabled_storage_plugins = (
name.strip()
for name
in config.get_config(
b"client",
b"storage.plugins",
b"",
).decode("utf-8").split(u",")
if name.strip()
)
storage_plugins = {}
for plugin_name in enabled_storage_plugins:
try:
plugin_config = config.items(b"storageclient.plugins." + plugin_name)
except NoSectionError:
plugin_config = []
storage_plugins[plugin_name] = dict(plugin_config)
return cls(
preferred_peers,
storage_plugins,
)
@implementer(IStorageBroker)
class StorageFarmBroker(service.MultiService):
@ -74,13 +140,32 @@ class StorageFarmBroker(service.MultiService):
remember enough information to establish a connection to it on demand.
I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer.
:ivar StorageClientConfig storage_client_config: Values from the node
configuration file relating to storage behavior.
"""
def __init__(self, permute_peers, tub_maker, preferred_peers=()):
@property
def preferred_peers(self):
return self.storage_client_config.preferred_peers
def __init__(
self,
permute_peers,
tub_maker,
node_config,
storage_client_config=None,
):
service.MultiService.__init__(self)
assert permute_peers # False not implemented yet
self.permute_peers = permute_peers
self._tub_maker = tub_maker
self.preferred_peers = preferred_peers
self.node_config = node_config
if storage_client_config is None:
storage_client_config = StorageClientConfig()
self.storage_client_config = storage_client_config
# self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its
@ -102,6 +187,12 @@ class StorageFarmBroker(service.MultiService):
try:
storage_server = self._make_storage_server(server_id, server)
except Exception:
# TODO: The _make_storage_server failure is logged but maybe
# we should write a traceback here. Notably, tests don't
# automatically fail just because we hit this case. Well
# written tests will still fail if a surprising exception
# arrives here but they might be harder to debug without this
# information.
pass
else:
self._static_server_ids.add(server_id)
@ -109,6 +200,28 @@ class StorageFarmBroker(service.MultiService):
storage_server.setServiceParent(self)
storage_server.start_connecting(self._trigger_connections)
def get_client_storage_plugin_web_resources(self, node_config):
"""
Get all of the client-side ``IResource`` implementations provided by
enabled storage plugins.
:param allmydata.node._Config node_config: The complete node
configuration for the node from which these web resources will be
served.
:return dict[unicode, IResource]: Resources for all of the plugins.
"""
plugins = {
plugin.name: plugin
for plugin
in getPlugins(IFoolscapStoragePlugin)
}
return {
name: plugins[name].get_client_resource(node_config)
for (name, config)
in self.storage_client_config.storage_plugins.items()
}
@log_call(
action_type=u"storage-client:broker:make-storage-server",
include_args=["server_id"],
@ -118,8 +231,14 @@ class StorageFarmBroker(service.MultiService):
assert isinstance(server_id, unicode) # from YAML
server_id = server_id.encode("ascii")
handler_overrides = server.get("connections", {})
s = NativeStorageServer(server_id, server["ann"],
self._tub_maker, handler_overrides)
s = NativeStorageServer(
server_id,
server["ann"],
self._tub_maker,
handler_overrides,
self.node_config,
self.storage_client_config,
)
s.on_status_changed(lambda _: self._got_connection())
return s
@ -136,7 +255,10 @@ class StorageFarmBroker(service.MultiService):
# these two are used in unit tests
def test_add_rref(self, serverid, rref, ann):
s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {})
s = self._make_storage_server(
serverid.decode("ascii"),
{"ann": ann.copy()},
)
s._rref = rref
s._is_connected = True
self.servers[serverid] = s
@ -150,7 +272,7 @@ class StorageFarmBroker(service.MultiService):
ic.subscribe_to("storage", self._got_announcement)
def _got_connection(self):
# this is called by NativeStorageClient when it is connected
# this is called by NativeStorageServer when it is connected
self._check_connected_high_water_mark()
def _check_connected_high_water_mark(self):
@ -177,8 +299,10 @@ class StorageFarmBroker(service.MultiService):
facility="tahoe.storage_broker", umid="AlxzqA",
level=log.UNUSUAL)
return
s = NativeStorageServer(server_id, ann, self._tub_maker, {})
s.on_status_changed(lambda _: self._got_connection())
s = self._make_storage_server(
server_id.decode("utf-8"),
{u"ann": ann},
)
server_id = s.get_serverid()
old = self.servers.get(server_id)
if old:
@ -257,7 +381,7 @@ class StorageFarmBroker(service.MultiService):
# tubids. This clause maps the old tubids to our existing servers.
for s in self.servers.values():
if isinstance(s, NativeStorageServer):
if serverid == s._tubid:
if serverid == s.get_tubid():
return s
return StubServer(serverid)
@ -275,6 +399,207 @@ class StubServer(object):
return "?"
class IFoolscapStorageServer(Interface):
"""
An internal interface that mediates between ``NativeStorageServer`` and
Foolscap-based ``IStorageServer`` implementations.
"""
nickname = Attribute("""
A name for this server for presentation to users.
""")
permutation_seed = Attribute("""
A stable value associated with this server which a client can use as an
input to the server selection permutation ordering.
""")
tubid = Attribute("""
The identifier for the Tub in which the server is run.
""")
storage_server = Attribute("""
An IStorageServer provide which implements a concrete Foolscap-based
protocol for communicating with the server.
""")
name = Attribute("""
Another name for this server for presentation to users.
""")
longname = Attribute("""
*Another* name for this server for presentation to users.
""")
lease_seed = Attribute("""
A stable value associated with this server which a client can use as an
input to a lease secret generation function.
""")
def connect_to(tub, got_connection):
"""
Attempt to establish and maintain a connection to the server.
:param Tub tub: A Foolscap Tub from which the connection is to
originate.
:param got_connection: A one-argument callable which is called with a
Foolscap ``RemoteReference`` when a connection is established.
This may be called multiple times if the connection is lost and
then re-established.
:return foolscap.reconnector.Reconnector: An object which manages the
connection and reconnection attempts.
"""
@implementer(IFoolscapStorageServer)
@attr.s(frozen=True)
class _FoolscapStorage(object):
"""
Abstraction for connecting to a storage server exposed via Foolscap.
"""
nickname = attr.ib()
permutation_seed = attr.ib()
tubid = attr.ib()
storage_server = attr.ib(validator=attr.validators.provides(IStorageServer))
_furl = attr.ib()
_short_description = attr.ib()
_long_description = attr.ib()
@property
def name(self):
return self._short_description
@property
def longname(self):
return self._long_description
@property
def lease_seed(self):
return self.tubid
@classmethod
def from_announcement(cls, server_id, furl, ann, storage_server):
"""
Create an instance from a fURL and an announcement like::
{"permutation-seed-base32": "...",
"nickname": "...",
}
*nickname* is optional.
"""
m = re.match(r'pb://(\w+)@', furl)
assert m, furl
tubid_s = m.group(1).lower()
tubid = base32.a2b(tubid_s)
if "permutation-seed-base32" in ann:
ps = base32.a2b(str(ann["permutation-seed-base32"]))
elif re.search(r'^v0-[0-9a-zA-Z]{52}$', server_id):
ps = base32.a2b(server_id[3:])
else:
log.msg("unable to parse serverid '%(server_id)s as pubkey, "
"hashing it to get permutation-seed, "
"may not converge with other clients",
server_id=server_id,
facility="tahoe.storage_broker",
level=log.UNUSUAL, umid="qu86tw")
ps = hashlib.sha256(server_id).digest()
permutation_seed = ps
assert server_id
long_description = server_id
if server_id.startswith("v0-"):
# remove v0- prefix from abbreviated name
short_description = server_id[3:3+8]
else:
short_description = server_id[:8]
nickname = ann.get("nickname", "")
return cls(
nickname=nickname,
permutation_seed=permutation_seed,
tubid=tubid,
storage_server=storage_server,
furl=furl,
short_description=short_description,
long_description=long_description,
)
def connect_to(self, tub, got_connection):
return tub.connectTo(self._furl, got_connection)
@implementer(IFoolscapStorageServer)
class _NullStorage(object):
"""
Abstraction for *not* communicating with a storage server of a type with
which we can't communicate.
"""
nickname = ""
permutation_seed = hashlib.sha256("").digest()
tubid = hashlib.sha256("").digest()
storage_server = None
lease_seed = hashlib.sha256("").digest()
name = "<unsupported>"
longname = "<storage with unsupported protocol>"
def connect_to(self, tub, got_connection):
return NonReconnector()
class NonReconnector(object):
"""
A ``foolscap.reconnector.Reconnector``-alike that doesn't do anything.
"""
def stopConnecting(self):
pass
def reset(self):
pass
def getReconnectionInfo(self):
return ReconnectionInfo()
_null_storage = _NullStorage()
class AnnouncementNotMatched(Exception):
"""
A storage server announcement wasn't matched by any of the locally enabled
plugins.
"""
def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref):
"""
Construct an ``IStorageServer`` from the most locally-preferred plugin
that is offered in the given announcement.
:param allmydata.node._Config node_config: The node configuration to
pass to the plugin.
"""
plugins = {
plugin.name: plugin
for plugin
in getPlugins(IFoolscapStoragePlugin)
}
storage_options = announcement.get(u"storage-options", [])
for plugin_name, plugin_config in config.storage_plugins.items():
try:
plugin = plugins[plugin_name]
except KeyError:
raise ValueError("{} not installed".format(plugin_name))
for option in storage_options:
if plugin_name == option[u"name"]:
furl = option[u"storage-server-FURL"]
return furl, plugin.get_storage_client(
node_config,
option,
get_rref,
)
raise AnnouncementNotMatched()
@implementer(IServer)
class NativeStorageServer(service.MultiService):
"""I hold information about a storage server that we want to connect to.
@ -303,7 +628,7 @@ class NativeStorageServer(service.MultiService):
"application-version": "unknown: no get_version()",
}
def __init__(self, server_id, ann, tub_maker, handler_overrides):
def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=StorageClientConfig()):
service.MultiService.__init__(self)
assert isinstance(server_id, str)
self._server_id = server_id
@ -311,33 +636,7 @@ class NativeStorageServer(service.MultiService):
self._tub_maker = tub_maker
self._handler_overrides = handler_overrides
assert "anonymous-storage-FURL" in ann, ann
furl = str(ann["anonymous-storage-FURL"])
m = re.match(r'pb://(\w+)@', furl)
assert m, furl
tubid_s = m.group(1).lower()
self._tubid = base32.a2b(tubid_s)
if "permutation-seed-base32" in ann:
ps = base32.a2b(str(ann["permutation-seed-base32"]))
elif re.search(r'^v0-[0-9a-zA-Z]{52}$', server_id):
ps = base32.a2b(server_id[3:])
else:
log.msg("unable to parse serverid '%(server_id)s as pubkey, "
"hashing it to get permutation-seed, "
"may not converge with other clients",
server_id=server_id,
facility="tahoe.storage_broker",
level=log.UNUSUAL, umid="qu86tw")
ps = hashlib.sha256(server_id).digest()
self._permutation_seed = ps
assert server_id
self._long_description = server_id
if server_id.startswith("v0-"):
# remove v0- prefix from abbreviated name
self._short_description = server_id[3:3+8]
else:
self._short_description = server_id[:8]
self._storage = self._make_storage_system(node_config, config, ann)
self.last_connect_time = None
self.last_loss_time = None
@ -348,6 +647,80 @@ class NativeStorageServer(service.MultiService):
self._trigger_cb = None
self._on_status_changed = ObserverList()
def _make_storage_system(self, node_config, config, ann):
"""
:param allmydata.node._Config node_config: The node configuration to pass
to any configured storage plugins.
:param StorageClientConfig config: Configuration specifying desired
storage client behavior.
:param dict ann: The storage announcement from the storage server we
are meant to communicate with.
:return IFoolscapStorageServer: An object enabling communication via
Foolscap with the server which generated the announcement.
"""
# Try to match the announcement against a plugin.
try:
furl, storage_server = _storage_from_foolscap_plugin(
node_config,
config,
ann,
# Pass in an accessor for our _rref attribute. The value of
# the attribute may change over time as connections are lost
# and re-established. The _StorageServer should always be
# able to get the most up-to-date value.
self.get_rref,
)
except AnnouncementNotMatched:
# Nope.
pass
else:
return _FoolscapStorage.from_announcement(
self._server_id,
furl.encode("utf-8"),
ann,
storage_server,
)
# Try to match the announcement against the anonymous access scheme.
try:
furl = ann[u"anonymous-storage-FURL"]
except KeyError:
# Nope
pass
else:
# See comment above for the _storage_from_foolscap_plugin case
# about passing in get_rref.
storage_server = _StorageServer(get_rref=self.get_rref)
return _FoolscapStorage.from_announcement(
self._server_id,
furl.encode("utf-8"),
ann,
storage_server,
)
# Nothing matched so we can't talk to this server.
return _null_storage
def get_permutation_seed(self):
return self._storage.permutation_seed
def get_name(self): # keep methodname short
# TODO: decide who adds [] in the short description. It should
# probably be the output side, not here.
return self._storage.name
def get_longname(self):
return self._storage.longname
def get_tubid(self):
return self._storage.tubid
def get_lease_seed(self):
return self._storage.lease_seed
def get_foolscap_write_enabler_seed(self):
return self._storage.tubid
def get_nickname(self):
return self._storage.nickname
def on_status_changed(self, status_changed):
"""
:param status_changed: a callable taking a single arg (the
@ -368,25 +741,10 @@ class NativeStorageServer(service.MultiService):
return "<NativeStorageServer for %s>" % self.get_name()
def get_serverid(self):
return self._server_id
def get_permutation_seed(self):
return self._permutation_seed
def get_version(self):
if self._rref:
return self._rref.version
return None
def get_name(self): # keep methodname short
# TODO: decide who adds [] in the short description. It should
# probably be the output side, not here.
return self._short_description
def get_longname(self):
return self._long_description
def get_lease_seed(self):
return self._tubid
def get_foolscap_write_enabler_seed(self):
return self._tubid
def get_nickname(self):
return self.announcement.get("nickname", "")
def get_announcement(self):
return self.announcement
def get_remote_host(self):
@ -412,13 +770,11 @@ class NativeStorageServer(service.MultiService):
available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
return available_space
def start_connecting(self, trigger_cb):
self._tub = self._tub_maker(self._handler_overrides)
self._tub.setServiceParent(self)
furl = str(self.announcement["anonymous-storage-FURL"])
self._trigger_cb = trigger_cb
self._reconnector = self._tub.connectTo(furl, self._got_connection)
self._reconnector = self._storage.connect_to(self._tub, self._got_connection)
def _got_connection(self, rref):
lp = log.msg(format="got connection to %(name)s, getting versions",
@ -454,11 +810,7 @@ class NativeStorageServer(service.MultiService):
"""
if self._rref is None:
return None
# Pass in an accessor for our _rref attribute. The value of the
# attribute may change over time as connections are lost and
# re-established. The _StorageServer should always be able to get the
# most up-to-date value.
return _StorageServer(get_rref=self.get_rref)
return self._storage.storage_server
def _lost(self):
log.msg(format="lost connection to %(name)s", name=self.get_name(),

View File

@ -1,4 +1,7 @@
import os
from io import (
BytesIO,
)
from os.path import dirname, join
from mock import patch, Mock
from six.moves import StringIO
@ -52,6 +55,7 @@ class Util(unittest.TestCase):
def test_daemonize_no_keygen(self):
tmpdir = self.mktemp()
stderr = BytesIO()
plug = DaemonizeTahoeNodePlugin('key-generator', tmpdir)
with patch('twisted.internet.reactor') as r:
@ -59,8 +63,8 @@ class Util(unittest.TestCase):
d = fn()
d.addErrback(lambda _: None) # ignore the error we'll trigger
r.callWhenRunning = call
r.stop = 'foo'
service = plug.makeService(self.options)
service.stderr = stderr
service.parent = Mock()
# we'll raise ValueError because there's no key-generator
# .. BUT we do this in an async function called via
@ -70,7 +74,7 @@ class Util(unittest.TestCase):
def done(f):
self.assertIn(
"key-generator support removed",
str(f),
stderr.getvalue(),
)
return None
d.addBoth(done)

View File

@ -26,6 +26,8 @@ from errno import (
EADDRINUSE,
)
import attr
import treq
from zope.interface import implementer
@ -72,7 +74,14 @@ from allmydata.util.assertutil import precondition
from allmydata.util.consumer import download_to_data
import allmydata.test.common_util as testutil
from allmydata.immutable.upload import Uploader
from allmydata.client import (
config_from_string,
create_client_from_config,
)
from ..crypto import (
ed25519,
)
from .eliotutil import (
EliotLoggedRunTest,
)
@ -80,6 +89,194 @@ from .eliotutil import (
TEST_RSA_KEY_SIZE = 522
EMPTY_CLIENT_CONFIG = config_from_string(
b"/dev/null",
b"tub.port",
b""
)
@attr.s
class MemoryIntroducerClient(object):
"""
A model-only (no behavior) stand-in for ``IntroducerClient``.
"""
tub = attr.ib()
introducer_furl = attr.ib()
nickname = attr.ib()
my_version = attr.ib()
oldest_supported = attr.ib()
app_versions = attr.ib()
sequencer = attr.ib()
cache_filepath = attr.ib()
subscribed_to = attr.ib(default=attr.Factory(list))
published_announcements = attr.ib(default=attr.Factory(list))
def setServiceParent(self, parent):
pass
def subscribe_to(self, service_name, cb, *args, **kwargs):
self.subscribed_to.append(Subscription(service_name, cb, args, kwargs))
def publish(self, service_name, ann, signing_key):
self.published_announcements.append(Announcement(
service_name,
ann,
ed25519.string_from_signing_key(signing_key),
))
@attr.s
class Subscription(object):
"""
A model of an introducer subscription.
"""
service_name = attr.ib()
cb = attr.ib()
args = attr.ib()
kwargs = attr.ib()
@attr.s
class Announcement(object):
"""
A model of an introducer announcement.
"""
service_name = attr.ib()
ann = attr.ib()
signing_key_bytes = attr.ib(type=bytes)
@property
def signing_key(self):
return ed25519.signing_keypair_from_string(self.signing_key_bytes)[0]
def get_published_announcements(client):
"""
Get a flattened list of all announcements sent using all introducer
clients.
"""
return list(
announcement
for introducer_client
in client.introducer_clients
for announcement
in introducer_client.published_announcements
)
class UseTestPlugins(object):
"""
A fixture which enables loading Twisted plugins from the Tahoe-LAFS test
suite.
"""
def setUp(self):
"""
Add the testing package ``plugins`` directory to the ``twisted.plugins``
aggregate package.
"""
import twisted.plugins
testplugins = FilePath(__file__).sibling("plugins")
twisted.plugins.__path__.insert(0, testplugins.path)
def cleanUp(self):
"""
Remove the testing package ``plugins`` directory from the
``twisted.plugins`` aggregate package.
"""
import twisted.plugins
testplugins = FilePath(__file__).sibling("plugins")
twisted.plugins.__path__.remove(testplugins.path)
def getDetails(self):
return {}
@attr.s
class UseNode(object):
"""
A fixture which creates a client node.
:ivar dict[bytes, bytes] plugin_config: Configuration items to put in the
node's configuration.
:ivar bytes storage_plugin: The name of a storage plugin to enable.
:ivar FilePath basedir: The base directory of the node.
:ivar bytes introducer_furl: The introducer furl with which to
configure the client.
:ivar dict[bytes, bytes] node_config: Configuration items for the *node*
section of the configuration.
:ivar _Config config: The complete resulting configuration.
"""
plugin_config = attr.ib()
storage_plugin = attr.ib()
basedir = attr.ib()
introducer_furl = attr.ib()
node_config = attr.ib(default=attr.Factory(dict))
config = attr.ib(default=None)
def setUp(self):
def format_config_items(config):
return b"\n".join(
b" = ".join((key, value))
for (key, value)
in config.items()
)
if self.plugin_config is None:
plugin_config_section = b""
else:
plugin_config_section = b"""
[storageclient.plugins.{storage_plugin}]
{config}
""".format(
storage_plugin=self.storage_plugin,
config=format_config_items(self.plugin_config),
)
self.config = config_from_string(
self.basedir.asTextMode().path,
u"tub.port",
b"""
[node]
{node_config}
[client]
introducer.furl = {furl}
storage.plugins = {storage_plugin}
{plugin_config_section}
""".format(
furl=self.introducer_furl,
storage_plugin=self.storage_plugin,
node_config=format_config_items(self.node_config),
plugin_config_section=plugin_config_section,
)
)
def create_node(self):
return create_client_from_config(
self.config,
_introducer_factory=MemoryIntroducerClient,
)
def cleanUp(self):
pass
def getDetails(self):
return {}
@implementer(IPlugin, IStreamServerEndpointStringParser)
class AdoptedServerPort(object):
"""
@ -135,23 +332,17 @@ class SameProcessStreamEndpointAssigner(object):
"""
def setUp(self):
self._cleanups = []
# Make sure the `adopt-socket` endpoint is recognized. We do this
# instead of providing a dropin because we don't want to make this
# endpoint available to random other applications.
f = UseTestPlugins()
f.setUp()
self._cleanups.append(f.cleanUp)
def tearDown(self):
for c in self._cleanups:
c()
def _patch_plugins(self):
"""
Add the testing package ``plugins`` directory to the ``twisted.plugins``
aggregate package. Arrange for it to be removed again when the
fixture is torn down.
"""
import twisted.plugins
testplugins = FilePath(__file__).sibling("plugins")
twisted.plugins.__path__.insert(0, testplugins.path)
self._cleanups.append(lambda: twisted.plugins.__path__.remove(testplugins.path))
def assign(self, reactor):
"""
Make a new streaming server endpoint and return its string description.
@ -183,10 +374,6 @@ class SameProcessStreamEndpointAssigner(object):
host, port = s.getsockname()
location_hint = "tcp:%s:%d" % (host, port)
port_endpoint = "adopt-socket:fd=%d" % (s.fileno(),)
# Make sure `adopt-socket` is recognized. We do this instead of
# providing a dropin because we don't want to make this endpoint
# available to random other applications.
self._patch_plugins()
else:
# On other platforms, we blindly guess and hope we get lucky.
portnum = iputil.allocate_tcp_port()

View File

@ -0,0 +1,120 @@
"""
Testtools-style matchers useful to the Tahoe-LAFS test suite.
"""
import attr
from testtools.matchers import (
Mismatch,
AfterPreprocessing,
MatchesStructure,
MatchesDict,
MatchesListwise,
Always,
Equals,
)
from foolscap.furl import (
decode_furl,
)
from allmydata.util import (
base32,
)
from allmydata.node import (
read_config,
)
from allmydata.crypto import (
ed25519,
error,
)
@attr.s
class MatchesNodePublicKey(object):
"""
Match an object representing the node's private key.
To verify, the private key is loaded from the node's private config
directory at the time the match is checked.
"""
basedir = attr.ib()
def match(self, other):
"""
Match a private key which is the same as the private key in the node at
``self.basedir``.
:param other: A signing key (aka "private key") from
``allmydata.crypto.ed25519``. This is the key to check against
the node's key.
:return Mismatch: If the keys don't match.
"""
config = read_config(self.basedir, u"tub.port")
privkey_bytes = config.get_private_config("node.privkey")
private_key = ed25519.signing_keypair_from_string(privkey_bytes)[0]
signature = ed25519.sign_data(private_key, b"")
other_public_key = ed25519.verifying_key_from_signing_key(other)
try:
ed25519.verify_signature(other_public_key, signature, b"")
except error.BadSignature:
return Mismatch("The signature did not verify.")
def matches_storage_announcement(basedir, anonymous=True, options=None):
"""
Match a storage announcement.
:param bytes basedir: The path to the node base directory which is
expected to emit the announcement. This is used to determine the key
which is meant to sign the announcement.
:param bool anonymous: If True, matches a storage announcement containing
an anonymous access fURL. Otherwise, fails to match such an
announcement.
:param list[matcher]|NoneType options: If a list, matches a storage
announcement containing a list of storage plugin options matching the
elements of the list. If None, fails to match an announcement with
storage plugin options.
:return: A matcher with the requested behavior.
"""
announcement = {
u"permutation-seed-base32": matches_base32(),
}
if anonymous:
announcement[u"anonymous-storage-FURL"] = matches_furl()
if options:
announcement[u"storage-options"] = MatchesListwise(options)
return MatchesStructure(
# Has each of these keys with associated values that match
service_name=Equals(u"storage"),
ann=MatchesDict(announcement),
signing_key=MatchesNodePublicKey(basedir),
)
def matches_furl():
"""
Match any Foolscap fURL byte string.
"""
return AfterPreprocessing(decode_furl, Always())
def matches_base32():
"""
Match any base32 encoded byte string.
"""
return AfterPreprocessing(base32.a2b, Always())
class MatchesSameElements(object):
"""
Match if the two-tuple value given contains two elements that are equal to
each other.
"""
def match(self, value):
left, right = value
return Equals(left).match(right)

View File

@ -10,7 +10,10 @@ from allmydata.util.hashutil import tagged_hash
from allmydata.storage_client import StorageFarmBroker
from allmydata.mutable.layout import MDMFSlotReadProxy
from allmydata.mutable.publish import MutableData
from ..common import TEST_RSA_KEY_SIZE
from ..common import (
TEST_RSA_KEY_SIZE,
EMPTY_CLIENT_CONFIG,
)
def eventuaaaaaly(res=None):
d = fireEventually(res)
@ -219,10 +222,12 @@ def make_peer(s, i):
:rtype: ``Peer``
"""
peerid = tagged_hash("peerid", "%d" % i)[:20]
peerid = base32.b2a(tagged_hash("peerid", "%d" % i)[:20])
fss = FakeStorageServer(peerid, s)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
"permutation-seed-base32": base32.b2a(peerid) }
ann = {
"anonymous-storage-FURL": "pb://%s@nowhere/fake" % (peerid,),
"permutation-seed-base32": peerid,
}
return Peer(peerid=peerid, storage_server=fss, announcement=ann)
@ -252,7 +257,7 @@ def make_storagebroker_with_peers(peers):
:param list peers: The storage servers to associate with the storage
broker.
"""
storage_broker = StorageFarmBroker(True, None)
storage_broker = StorageFarmBroker(True, None, EMPTY_CLIENT_CONFIG)
for peer in peers:
storage_broker.test_add_rref(
peer.peerid,

View File

@ -2,4 +2,11 @@ from allmydata.test.common import (
AdoptedServerPort,
)
from allmydata.test.storage_plugin import (
DummyStorage,
)
adoptedEndpointParser = AdoptedServerPort()
dummyStoragev1 = DummyStorage(u"tahoe-lafs-dummy-v1")
dummyStoragev2 = DummyStorage(u"tahoe-lafs-dummy-v2")

View File

@ -0,0 +1,102 @@
"""
A storage server plugin the test suite can use to validate the
functionality.
"""
from json import (
dumps,
)
import attr
from zope.interface import (
implementer,
)
from twisted.internet.defer import (
succeed,
)
from twisted.web.static import (
Data,
)
from foolscap.api import (
RemoteInterface,
)
from allmydata.interfaces import (
IFoolscapStoragePlugin,
IStorageServer,
)
from allmydata.client import (
AnnounceableStorageServer,
)
class RIDummy(RemoteInterface):
__remote_name__ = "RIDummy.tahoe.allmydata.com"
def just_some_method():
"""
Just some method so there is something callable on this object. We won't
pretend to actually offer any storage capabilities.
"""
@implementer(IFoolscapStoragePlugin)
@attr.s
class DummyStorage(object):
name = attr.ib()
@property
def _client_section_name(self):
return u"storageclient.plugins.{}".format(self.name)
def get_storage_server(self, configuration, get_anonymous_storage_server):
if u"invalid" in configuration:
raise Exception("The plugin is unhappy.")
announcement = {u"value": configuration.get(u"some", u"default-value")}
storage_server = DummyStorageServer(get_anonymous_storage_server)
return succeed(
AnnounceableStorageServer(
announcement,
storage_server,
),
)
def get_storage_client(self, configuration, announcement, get_rref):
return DummyStorageClient(
get_rref,
dict(configuration.items(self._client_section_name, [])),
announcement,
)
def get_client_resource(self, configuration):
"""
:return: A static data resource that produces the given configuration when
rendered, as an aid to testing.
"""
items = configuration.items(self._client_section_name, [])
return Data(
dumps(dict(items)),
b"text/json",
)
@implementer(RIDummy)
@attr.s(cmp=True, hash=True)
class DummyStorageServer(object):
# TODO Requirement of some interface that instances be hashable
get_anonymous_storage_server = attr.ib(cmp=False)
def remote_just_some_method(self):
pass
@implementer(IStorageServer)
@attr.s
class DummyStorageClient(object):
get_rref = attr.ib()
configuration = attr.ib()
announcement = attr.ib()

View File

@ -15,6 +15,10 @@ from allmydata.immutable.upload import Data
from allmydata.test.common_web import WebRenderingMixin
from allmydata.mutable.publish import MutableData
from .common import (
EMPTY_CLIENT_CONFIG,
)
class FakeClient(object):
def get_storage_broker(self):
return self.storage_broker
@ -22,7 +26,7 @@ class FakeClient(object):
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
def create_fake_client(self):
sb = StorageFarmBroker(True, None)
sb = StorageFarmBroker(True, None, EMPTY_CLIENT_CONFIG)
# s.get_name() (the "short description") will be "v0-00000000".
# s.get_longname() will include the -long suffix.
servers = [("v0-00000000-long", "\x00"*20, "peer-0"),
@ -41,7 +45,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
"my-version": "ver",
"oldest-supported": "oldest",
}
s = NativeStorageServer(server_id, ann, None, None)
s = NativeStorageServer(server_id, ann, None, None, None)
sb.test_add_server(server_id, s)
c = FakeClient()
c.storage_broker = sb

View File

@ -1,5 +1,9 @@
import os, sys
import mock
from functools import (
partial,
)
import twisted
from yaml import (
safe_dump,
@ -21,33 +25,56 @@ from twisted.python.filepath import (
from testtools.matchers import (
Equals,
AfterPreprocessing,
MatchesListwise,
MatchesDict,
Always,
Is,
raises,
)
from testtools.twistedsupport import (
succeeded,
failed,
)
import allmydata
import allmydata.frontends.magic_folder
import allmydata.util.log
from allmydata.node import OldConfigError, OldConfigOptionError, UnescapedHashError, _Config, read_config, create_node_dir
from allmydata.node import config_from_string
from allmydata.node import OldConfigError, OldConfigOptionError, UnescapedHashError, _Config, create_node_dir
from allmydata.frontends.auth import NeedRootcapLookupScheme
from allmydata.version_checks import (
get_package_versions_string,
)
from allmydata import client
from allmydata.storage_client import StorageFarmBroker
from allmydata.util import base32, fileutil, encodingutil
from allmydata.storage_client import (
StorageClientConfig,
StorageFarmBroker,
)
from allmydata.util import (
base32,
fileutil,
encodingutil,
configutil,
)
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.interfaces import IFilesystemNode, IFileNode, \
IImmutableFileNode, IMutableFileNode, IDirectoryNode
from foolscap.api import flushEventualQueue
import allmydata.test.common_util as testutil
from allmydata.test.common import (
from .common import (
EMPTY_CLIENT_CONFIG,
SyncTestCase,
UseTestPlugins,
MemoryIntroducerClient,
get_published_announcements,
)
from .matchers import (
MatchesSameElements,
matches_storage_announcement,
matches_furl,
)
SOME_FURL = b"pb://abcde@nowhere/fake"
BASECONFIG = ("[client]\n"
"introducer.furl = \n"
@ -127,10 +154,9 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
try:
e = self.assertRaises(
EnvironmentError,
read_config,
client.read_config,
basedir,
"client.port",
_valid_config_sections=client._valid_config_sections,
)
self.assertIn("Permission denied", str(e))
finally:
@ -155,10 +181,9 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
e = self.failUnlessRaises(
OldConfigError,
read_config,
client.read_config,
basedir,
"client.port",
_valid_config_sections=client._valid_config_sections,
)
abs_basedir = fileutil.abspath_expanduser_unicode(unicode(basedir)).encode(sys.getfilesystemencoding())
self.failUnlessIn(os.path.join(abs_basedir, "introducer.furl"), e.args[0])
@ -222,6 +247,69 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
c = yield client.create_client(basedir)
self.failUnless(c.get_long_nodeid().startswith("v0-"))
def test_storage_anonymous_enabled_by_default(self):
"""
Anonymous storage access is enabled if storage is enabled and *anonymous*
is not set.
"""
config = client.config_from_string(
b"test_storage_default_anonymous_enabled",
b"tub.port",
BASECONFIG + (
b"[storage]\n"
b"enabled = true\n"
)
)
self.assertTrue(client.anonymous_storage_enabled(config))
def test_storage_anonymous_enabled_explicitly(self):
"""
Anonymous storage access is enabled if storage is enabled and *anonymous*
is set to true.
"""
config = client.config_from_string(
self.id(),
b"tub.port",
BASECONFIG + (
b"[storage]\n"
b"enabled = true\n"
b"anonymous = true\n"
)
)
self.assertTrue(client.anonymous_storage_enabled(config))
def test_storage_anonymous_disabled_explicitly(self):
"""
Anonymous storage access is disabled if storage is enabled and *anonymous*
is set to false.
"""
config = client.config_from_string(
self.id(),
b"tub.port",
BASECONFIG + (
b"[storage]\n"
b"enabled = true\n"
b"anonymous = false\n"
)
)
self.assertFalse(client.anonymous_storage_enabled(config))
def test_storage_anonymous_disabled_by_storage(self):
"""
Anonymous storage access is disabled if storage is disabled and *anonymous*
is set to true.
"""
config = client.config_from_string(
self.id(),
b"tub.port",
BASECONFIG + (
b"[storage]\n"
b"enabled = false\n"
b"anonymous = true\n"
)
)
self.assertFalse(client.anonymous_storage_enabled(config))
@defer.inlineCallbacks
def test_reserved_1(self):
"""
@ -493,9 +581,9 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
def test_permute(self):
sb = StorageFarmBroker(True, None)
sb = StorageFarmBroker(True, None, EMPTY_CLIENT_CONFIG)
for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
ann = {"anonymous-storage-FURL": SOME_FURL,
"permutation-seed-base32": base32.b2a(k) }
sb.test_add_rref(k, "rref", ann)
@ -505,9 +593,14 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
self.failUnlessReallyEqual(self._permute(sb, "one"), [])
def test_permute_with_preferred(self):
sb = StorageFarmBroker(True, None, preferred_peers=['1','4'])
sb = StorageFarmBroker(
True,
None,
EMPTY_CLIENT_CONFIG,
StorageClientConfig(preferred_peers=['1','4']),
)
for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
ann = {"anonymous-storage-FURL": SOME_FURL,
"permutation-seed-base32": base32.b2a(k) }
sb.test_add_rref(k, "rref", ann)
@ -672,6 +765,129 @@ def flush_but_dont_ignore(res):
return d
class AnonymousStorage(SyncTestCase):
"""
Tests for behaviors of the client object with respect to the anonymous
storage service.
"""
@defer.inlineCallbacks
def test_anonymous_storage_enabled(self):
"""
If anonymous storage access is enabled then the client announces it.
"""
basedir = self.id()
os.makedirs(basedir + b"/private")
config = client.config_from_string(
basedir,
b"tub.port",
BASECONFIG_I % (SOME_FURL,) + (
b"[storage]\n"
b"enabled = true\n"
b"anonymous = true\n"
)
)
node = yield client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
)
self.assertThat(
get_published_announcements(node),
MatchesListwise([
matches_storage_announcement(
basedir,
anonymous=True,
),
]),
)
@defer.inlineCallbacks
def test_anonymous_storage_disabled(self):
"""
If anonymous storage access is disabled then the client does not announce
it nor does it write a fURL for it to beneath the node directory.
"""
basedir = self.id()
os.makedirs(basedir + b"/private")
config = client.config_from_string(
basedir,
b"tub.port",
BASECONFIG_I % (SOME_FURL,) + (
b"[storage]\n"
b"enabled = true\n"
b"anonymous = false\n"
)
)
node = yield client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
)
self.expectThat(
get_published_announcements(node),
MatchesListwise([
matches_storage_announcement(
basedir,
anonymous=False,
),
]),
)
self.expectThat(
config.get_private_config(b"storage.furl", default=None),
Is(None),
)
@defer.inlineCallbacks
def test_anonymous_storage_enabled_then_disabled(self):
"""
If a node is run with anonymous storage enabled and then later anonymous
storage is disabled in the configuration for that node, it is not
possible to reach the anonymous storage server via the originally
published fURL.
"""
basedir = self.id()
os.makedirs(basedir + b"/private")
enabled_config = client.config_from_string(
basedir,
b"tub.port",
BASECONFIG_I % (SOME_FURL,) + (
b"[storage]\n"
b"enabled = true\n"
b"anonymous = true\n"
)
)
node = yield client.create_client_from_config(
enabled_config,
_introducer_factory=MemoryIntroducerClient,
)
anonymous_storage_furl = enabled_config.get_private_config(b"storage.furl")
def check_furl():
return node.tub.getReferenceForURL(anonymous_storage_furl)
# Perform a sanity check that our test code makes sense: is this a
# legit way to verify whether a fURL will refer to an object?
self.assertThat(
check_furl(),
# If it doesn't raise a KeyError we're in business.
Always(),
)
disabled_config = client.config_from_string(
basedir,
b"tub.port",
BASECONFIG_I % (SOME_FURL,) + (
b"[storage]\n"
b"enabled = true\n"
b"anonymous = false\n"
)
)
node = yield client.create_client_from_config(
disabled_config,
_introducer_factory=MemoryIntroducerClient,
)
self.assertThat(
check_furl,
raises(KeyError),
)
class IntroducerClients(unittest.TestCase):
def test_invalid_introducer_furl(self):
@ -683,7 +899,7 @@ class IntroducerClients(unittest.TestCase):
"[client]\n"
"introducer.furl = None\n"
)
config = config_from_string("basedir", "client.port", cfg)
config = client.config_from_string("basedir", "client.port", cfg)
with self.assertRaises(ValueError) as ctx:
client.create_introducer_clients(config, main_tub=None)
@ -985,3 +1201,326 @@ class NodeMaker(testutil.ReallyEqualMixin, unittest.TestCase):
self.failUnlessReallyEqual(n.get_uri(), unknown_rw)
self.failUnlessReallyEqual(n.get_write_uri(), unknown_rw)
self.failUnlessReallyEqual(n.get_readonly_uri(), "ro." + unknown_ro)
def matches_dummy_announcement(name, value):
"""
Matches the portion of an announcement for the ``DummyStorage`` storage
server plugin.
:param unicode name: The name of the dummy plugin.
:param unicode value: The arbitrary value in the dummy plugin
announcement.
:return: a testtools-style matcher
"""
return MatchesDict({
# Everyone gets a name and a fURL added to their announcement.
u"name": Equals(name),
u"storage-server-FURL": matches_furl(),
# The plugin can contribute things, too.
u"value": Equals(value),
})
class StorageAnnouncementTests(SyncTestCase):
"""
Tests for the storage announcement published by the client.
"""
def setUp(self):
super(StorageAnnouncementTests, self).setUp()
self.basedir = self.useFixture(TempDir()).path
create_node_dir(self.basedir, u"")
def get_config(self, storage_enabled, more_storage=b"", more_sections=b""):
return b"""
[node]
tub.location = tcp:192.0.2.0:1234
[storage]
enabled = {storage_enabled}
{more_storage}
[client]
introducer.furl = pb://abcde@nowhere/fake
{more_sections}
""".format(
storage_enabled=storage_enabled,
more_storage=more_storage,
more_sections=more_sections,
)
def test_no_announcement(self):
"""
No storage announcement is published if storage is not enabled.
"""
config = client.config_from_string(
self.basedir,
u"tub.port",
self.get_config(storage_enabled=False),
)
self.assertThat(
client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
),
succeeded(AfterPreprocessing(
get_published_announcements,
Equals([]),
)),
)
def test_anonymous_storage_announcement(self):
"""
A storage announcement with the anonymous storage fURL is published when
storage is enabled.
"""
config = client.config_from_string(
self.basedir,
u"tub.port",
self.get_config(storage_enabled=True),
)
client_deferred = client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
)
self.assertThat(
client_deferred,
# The Deferred succeeds
succeeded(AfterPreprocessing(
# The announcements published by the client should ...
get_published_announcements,
# Match the following list (of one element) ...
MatchesListwise([
# The only element in the list ...
matches_storage_announcement(self.basedir),
]),
)),
)
def test_single_storage_plugin_announcement(self):
"""
The announcement from a single enabled storage plugin is published when
storage is enabled.
"""
self.useFixture(UseTestPlugins())
value = u"thing"
config = client.config_from_string(
self.basedir,
u"tub.port",
self.get_config(
storage_enabled=True,
more_storage=b"plugins=tahoe-lafs-dummy-v1",
more_sections=(
b"[storageserver.plugins.tahoe-lafs-dummy-v1]\n"
b"some = {}\n".format(value)
),
),
)
self.assertThat(
client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
),
succeeded(AfterPreprocessing(
get_published_announcements,
MatchesListwise([
matches_storage_announcement(
self.basedir,
options=[
matches_dummy_announcement(
u"tahoe-lafs-dummy-v1",
value,
),
],
),
]),
)),
)
def test_multiple_storage_plugin_announcements(self):
"""
The announcements from several enabled storage plugins are published when
storage is enabled.
"""
self.useFixture(UseTestPlugins())
config = client.config_from_string(
self.basedir,
u"tub.port",
self.get_config(
storage_enabled=True,
more_storage=b"plugins=tahoe-lafs-dummy-v1,tahoe-lafs-dummy-v2",
more_sections=(
b"[storageserver.plugins.tahoe-lafs-dummy-v1]\n"
b"some = thing-1\n"
b"[storageserver.plugins.tahoe-lafs-dummy-v2]\n"
b"some = thing-2\n"
),
),
)
self.assertThat(
client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
),
succeeded(AfterPreprocessing(
get_published_announcements,
MatchesListwise([
matches_storage_announcement(
self.basedir,
options=[
matches_dummy_announcement(
u"tahoe-lafs-dummy-v1",
u"thing-1",
),
matches_dummy_announcement(
u"tahoe-lafs-dummy-v2",
u"thing-2",
),
],
),
]),
)),
)
def test_stable_storage_server_furl(self):
"""
The value for the ``storage-server-FURL`` item in the announcement for a
particular storage server plugin is stable across different node
instantiations.
"""
self.useFixture(UseTestPlugins())
config = client.config_from_string(
self.basedir,
u"tub.port",
self.get_config(
storage_enabled=True,
more_storage=b"plugins=tahoe-lafs-dummy-v1",
more_sections=(
b"[storageserver.plugins.tahoe-lafs-dummy-v1]\n"
b"some = thing\n"
),
),
)
node_a = client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
)
node_b = client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
)
self.assertThat(
defer.gatherResults([node_a, node_b]),
succeeded(AfterPreprocessing(
partial(map, get_published_announcements),
MatchesSameElements(),
)),
)
def test_storage_plugin_without_configuration(self):
"""
A storage plugin with no configuration is loaded and announced.
"""
self.useFixture(UseTestPlugins())
config = client.config_from_string(
self.basedir,
u"tub.port",
self.get_config(
storage_enabled=True,
more_storage=b"plugins=tahoe-lafs-dummy-v1",
),
)
self.assertThat(
client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
),
succeeded(AfterPreprocessing(
get_published_announcements,
MatchesListwise([
matches_storage_announcement(
self.basedir,
options=[
matches_dummy_announcement(
u"tahoe-lafs-dummy-v1",
u"default-value",
),
],
),
]),
)),
)
def test_broken_storage_plugin(self):
"""
A storage plugin that raises an exception from ``get_storage_server``
causes ``client.create_client_from_config`` to return ``Deferred``
that fails.
"""
self.useFixture(UseTestPlugins())
config = client.config_from_string(
self.basedir,
u"tub.port",
self.get_config(
storage_enabled=True,
more_storage=b"plugins=tahoe-lafs-dummy-v1",
more_sections=(
b"[storageserver.plugins.tahoe-lafs-dummy-v1]\n"
# This will make it explode on instantiation.
b"invalid = configuration\n"
)
),
)
self.assertThat(
client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
),
failed(Always()),
)
def test_storage_plugin_not_found(self):
"""
``client.create_client_from_config`` raises ``UnknownConfigError`` when
called with a configuration which enables a storage plugin that is not
available on the system.
"""
config = client.config_from_string(
self.basedir,
u"tub.port",
self.get_config(
storage_enabled=True,
more_storage=b"plugins=tahoe-lafs-dummy-vX",
),
)
self.assertThat(
client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
),
failed(
AfterPreprocessing(
lambda f: f.type,
Equals(configutil.UnknownConfigError),
),
),
)

View File

@ -9,6 +9,16 @@ from .. import client
class ConfigUtilTests(GridTestMixin, unittest.TestCase):
def setUp(self):
super(ConfigUtilTests, self).setUp()
self.static_valid_config = configutil.ValidConfiguration(
dict(node=['valid']),
)
self.dynamic_valid_config = configutil.ValidConfiguration(
dict(),
lambda section_name: section_name == "node",
lambda section_name, item_name: (section_name, item_name) == ("node", "valid"),
)
def test_config_utils(self):
self.basedir = "cli/ConfigUtilTests/test-config-utils"
@ -44,7 +54,32 @@ class ConfigUtilTests(GridTestMixin, unittest.TestCase):
config = configutil.get_config(fname)
# should succeed, no exceptions
configutil.validate_config(fname, config, dict(node=['valid']))
configutil.validate_config(
fname,
config,
self.static_valid_config,
)
def test_config_dynamic_validation_success(self):
"""
A configuration with sections and items that are not matched by the static
validation but are matched by the dynamic validation is considered
valid.
"""
d = self.mktemp()
os.mkdir(d)
fname = os.path.join(d, 'tahoe.cfg')
with open(fname, 'w') as f:
f.write('[node]\nvalid = foo\n')
config = configutil.get_config(fname)
# should succeed, no exceptions
configutil.validate_config(
fname,
config,
self.dynamic_valid_config,
)
def test_config_validation_invalid_item(self):
d = self.mktemp()
@ -58,11 +93,16 @@ class ConfigUtilTests(GridTestMixin, unittest.TestCase):
e = self.assertRaises(
configutil.UnknownConfigError,
configutil.validate_config,
fname, config, dict(node=['valid']),
fname, config,
self.static_valid_config,
)
self.assertIn("section [node] contains unknown option 'invalid'", str(e))
def test_config_validation_invalid_section(self):
"""
A configuration with a section that is matched by neither the static nor
dynamic validators is rejected.
"""
d = self.mktemp()
os.mkdir(d)
fname = os.path.join(d, 'tahoe.cfg')
@ -74,10 +114,53 @@ class ConfigUtilTests(GridTestMixin, unittest.TestCase):
e = self.assertRaises(
configutil.UnknownConfigError,
configutil.validate_config,
fname, config, dict(node=['valid']),
fname, config,
self.static_valid_config,
)
self.assertIn("contains unknown section [invalid]", str(e))
def test_config_dynamic_validation_invalid_section(self):
"""
A configuration with a section that is matched by neither the static nor
dynamic validators is rejected.
"""
d = self.mktemp()
os.mkdir(d)
fname = os.path.join(d, 'tahoe.cfg')
with open(fname, 'w') as f:
f.write('[node]\nvalid = foo\n[invalid]\n')
config = configutil.get_config(fname)
e = self.assertRaises(
configutil.UnknownConfigError,
configutil.validate_config,
fname, config,
self.dynamic_valid_config,
)
self.assertIn("contains unknown section [invalid]", str(e))
def test_config_dynamic_validation_invalid_item(self):
"""
A configuration with a section, item pair that is matched by neither the
static nor dynamic validators is rejected.
"""
d = self.mktemp()
os.mkdir(d)
fname = os.path.join(d, 'tahoe.cfg')
with open(fname, 'w') as f:
f.write('[node]\nvalid = foo\ninvalid = foo\n')
config = configutil.get_config(fname)
e = self.assertRaises(
configutil.UnknownConfigError,
configutil.validate_config,
fname, config,
self.dynamic_valid_config,
)
self.assertIn("section [node] contains unknown option 'invalid'", str(e))
def test_create_client_config(self):
d = self.mktemp()
os.mkdir(d)
@ -97,5 +180,8 @@ class ConfigUtilTests(GridTestMixin, unittest.TestCase):
config = configutil.get_config(fname)
# should succeed, no exceptions
configutil.validate_config(fname, config,
client._valid_config_sections())
configutil.validate_config(
fname,
config,
client._valid_config(),
)

View File

@ -12,9 +12,7 @@ from ..util.i2p_provider import create as create_i2p_provider
from ..util.tor_provider import create as create_tor_provider
BASECONFIG = ("[client]\n"
"introducer.furl = \n"
)
BASECONFIG = ""
class TCP(unittest.TestCase):
@ -568,4 +566,3 @@ class Status(unittest.TestCase):
{"h1 via hand1": "st1", "h2": "st2"})
self.assertEqual(cs.last_connection_time, None)
self.assertEqual(cs.last_received_time, 5)

View File

@ -12,6 +12,10 @@ from allmydata.immutable import offloaded, upload
from allmydata import uri, client
from allmydata.util import hashutil, fileutil, mathutil
from .common import (
EMPTY_CLIENT_CONFIG,
)
MiB = 1024*1024
DATA = "I need help\n" * 1000
@ -118,7 +122,11 @@ class AssistedUpload(unittest.TestCase):
self.tub = t = Tub()
t.setOption("expose-remote-exception-types", False)
self.s = FakeClient()
self.s.storage_broker = StorageFarmBroker(True, lambda h: self.tub)
self.s.storage_broker = StorageFarmBroker(
True,
lambda h: self.tub,
EMPTY_CLIENT_CONFIG,
)
self.s.secret_holder = client.SecretHolder("lease secret", "converge")
self.s.startService()

View File

@ -4,6 +4,7 @@ import stat
import sys
import time
import mock
from textwrap import dedent
from unittest import skipIf
@ -175,6 +176,29 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
with self.assertRaises(Exception):
config.get_config_from_file("it_does_not_exist", required=True)
def test_config_items(self):
"""
All items in a config section can be retrieved.
"""
basedir = u"test_node/test_config_items"
create_node_dir(basedir, "testing")
with open(os.path.join(basedir, 'tahoe.cfg'), 'wt') as f:
f.write(dedent(
"""
[node]
nickname = foo
timeout.disconnect = 12
"""
))
config = read_config(basedir, "portnum")
self.assertEqual(
config.items("node"),
[(b"nickname", b"foo"),
(b"timeout.disconnect", b"12"),
],
)
@skipIf(
"win32" in sys.platform.lower() or "cygwin" in sys.platform.lower(),
"We don't know how to set permissions on Windows.",

View File

@ -1,13 +1,75 @@
import hashlib
from mock import Mock
from allmydata.util import base32, yamlutil
from json import (
dumps,
)
from fixtures import (
TempDir,
)
from testtools.content import (
text_content,
)
from testtools.matchers import (
MatchesAll,
IsInstance,
MatchesStructure,
Equals,
Is,
AfterPreprocessing,
)
from zope.interface import (
implementer,
)
from zope.interface.verify import (
verifyObject,
)
from twisted.application.service import (
Service,
)
from twisted.trial import unittest
from twisted.internet.defer import succeed, inlineCallbacks
from twisted.python.filepath import (
FilePath,
)
from allmydata.storage_client import NativeStorageServer
from allmydata.storage_client import StorageFarmBroker
from foolscap.api import (
Tub,
)
from .common import (
EMPTY_CLIENT_CONFIG,
SyncTestCase,
AsyncTestCase,
UseTestPlugins,
UseNode,
SameProcessStreamEndpointAssigner,
)
from .common_web import (
do_http,
)
from .storage_plugin import (
DummyStorageClient,
)
from allmydata.webish import (
WebishServer,
)
from allmydata.util import base32, yamlutil
from allmydata.storage_client import (
IFoolscapStorageServer,
NativeStorageServer,
StorageFarmBroker,
_FoolscapStorage,
_NullStorage,
)
from allmydata.interfaces import (
IConnectionStatus,
IStorageServer,
)
SOME_FURL = b"pb://abcde@nowhere/fake"
class NativeStorageServerWithVersion(NativeStorageServer):
def __init__(self, version):
@ -40,22 +102,385 @@ class TestNativeStorageServer(unittest.TestCase):
ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x",
"permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3",
}
nss = NativeStorageServer("server_id", ann, None, {})
nss = NativeStorageServer("server_id", ann, None, {}, EMPTY_CLIENT_CONFIG)
self.assertEqual(nss.get_nickname(), "")
class GetConnectionStatus(unittest.TestCase):
"""
Tests for ``NativeStorageServer.get_connection_status``.
"""
def test_unrecognized_announcement(self):
"""
When ``NativeStorageServer`` is constructed with a storage announcement it
doesn't recognize, its ``get_connection_status`` nevertheless returns
an object which provides ``IConnectionStatus``.
"""
# Pretty hard to recognize anything from an empty announcement.
ann = {}
nss = NativeStorageServer("server_id", ann, Tub, {}, EMPTY_CLIENT_CONFIG)
nss.start_connecting(lambda: None)
connection_status = nss.get_connection_status()
self.assertTrue(IConnectionStatus.providedBy(connection_status))
class UnrecognizedAnnouncement(unittest.TestCase):
"""
Tests for handling of announcements that aren't recognized and don't use
*anonymous-storage-FURL*.
Recognition failure is created by making up something completely novel for
these tests. In real use, recognition failure would most likely come from
an announcement generated by a storage server plugin which is not loaded
in the client.
"""
ann = {
u"name": u"tahoe-lafs-testing-v1",
u"any-parameter": 12345,
}
server_id = b"abc"
def _tub_maker(self, overrides):
return Service()
def native_storage_server(self):
"""
Make a ``NativeStorageServer`` out of an unrecognizable announcement.
"""
return NativeStorageServer(
self.server_id,
self.ann,
self._tub_maker,
{},
EMPTY_CLIENT_CONFIG,
)
def test_no_exceptions(self):
"""
``NativeStorageServer`` can be instantiated with an unrecognized
announcement.
"""
self.native_storage_server()
def test_start_connecting(self):
"""
``NativeStorageServer.start_connecting`` does not raise an exception.
"""
server = self.native_storage_server()
server.start_connecting(None)
def test_stop_connecting(self):
"""
``NativeStorageServer.stop_connecting`` does not raise an exception.
"""
server = self.native_storage_server()
server.start_connecting(None)
server.stop_connecting()
def test_try_to_connect(self):
"""
``NativeStorageServer.try_to_connect`` does not raise an exception.
"""
server = self.native_storage_server()
server.start_connecting(None)
server.try_to_connect()
def test_various_data_methods(self):
"""
The data accessors of ``NativeStorageServer`` that depend on the
announcement do not raise an exception.
"""
server = self.native_storage_server()
server.get_permutation_seed()
server.get_name()
server.get_longname()
server.get_tubid()
server.get_lease_seed()
server.get_foolscap_write_enabler_seed()
server.get_nickname()
class PluginMatchedAnnouncement(SyncTestCase):
"""
Tests for handling by ``NativeStorageServer`` of storage server
announcements that are handled by an ``IFoolscapStoragePlugin``.
"""
@inlineCallbacks
def make_node(self, introducer_furl, storage_plugin, plugin_config):
"""
Create a client node with the given configuration.
:param bytes introducer_furl: The introducer furl with which to
configure the client.
:param bytes storage_plugin: The name of a storage plugin to enable.
:param dict[bytes, bytes] plugin_config: Configuration to supply to
the enabled plugin. May also be ``None`` for no configuration
section (distinct from ``{}`` which creates an empty configuration
section).
"""
tempdir = TempDir()
self.useFixture(tempdir)
self.basedir = FilePath(tempdir.path)
self.basedir.child(u"private").makedirs()
self.useFixture(UseTestPlugins())
self.node_fixture = self.useFixture(UseNode(
plugin_config,
storage_plugin,
self.basedir,
introducer_furl,
))
self.config = self.node_fixture.config
self.node = yield self.node_fixture.create_node()
[self.introducer_client] = self.node.introducer_clients
def publish(self, server_id, announcement, introducer_client):
for subscription in introducer_client.subscribed_to:
if subscription.service_name == u"storage":
subscription.cb(
server_id,
announcement,
*subscription.args,
**subscription.kwargs
)
def get_storage(self, server_id, node):
storage_broker = node.get_storage_broker()
native_storage_server = storage_broker.servers[server_id]
return native_storage_server._storage
def set_rref(self, server_id, node, rref):
storage_broker = node.get_storage_broker()
native_storage_server = storage_broker.servers[server_id]
native_storage_server._rref = rref
@inlineCallbacks
def test_ignored_non_enabled_plugin(self):
"""
An announcement that could be matched by a plugin that is not enabled is
not matched.
"""
yield self.make_node(
introducer_furl=SOME_FURL,
storage_plugin=b"tahoe-lafs-dummy-v1",
plugin_config=None,
)
server_id = b"v0-abcdef"
ann = {
u"service-name": u"storage",
u"storage-options": [{
# notice how the announcement is for a different storage plugin
# than the one that is enabled.
u"name": u"tahoe-lafs-dummy-v2",
u"storage-server-FURL": SOME_FURL.decode("ascii"),
}],
}
self.publish(server_id, ann, self.introducer_client)
storage = self.get_storage(server_id, self.node)
self.assertIsInstance(storage, _NullStorage)
@inlineCallbacks
def test_enabled_plugin(self):
"""
An announcement that could be matched by a plugin that is enabled with
configuration is matched and the plugin's storage client is used.
"""
plugin_config = {
b"abc": b"xyz",
}
plugin_name = b"tahoe-lafs-dummy-v1"
yield self.make_node(
introducer_furl=SOME_FURL,
storage_plugin=plugin_name,
plugin_config=plugin_config,
)
server_id = b"v0-abcdef"
ann = {
u"service-name": u"storage",
u"storage-options": [{
# and this announcement is for a plugin with a matching name
u"name": plugin_name,
u"storage-server-FURL": SOME_FURL.decode("ascii"),
}],
}
self.publish(server_id, ann, self.introducer_client)
storage = self.get_storage(server_id, self.node)
self.assertTrue(
verifyObject(
IFoolscapStorageServer,
storage,
),
)
expected_rref = object()
# Can't easily establish a real Foolscap connection so fake the result
# of doing so...
self.set_rref(server_id, self.node, expected_rref)
self.expectThat(
storage.storage_server,
MatchesAll(
IsInstance(DummyStorageClient),
MatchesStructure(
get_rref=AfterPreprocessing(
lambda get_rref: get_rref(),
Is(expected_rref),
),
configuration=Equals(plugin_config),
announcement=Equals({
u'name': plugin_name,
u'storage-server-FURL': u'pb://abcde@nowhere/fake',
}),
),
),
)
@inlineCallbacks
def test_enabled_no_configuration_plugin(self):
"""
An announcement that could be matched by a plugin that is enabled with no
configuration is matched and the plugin's storage client is used.
"""
plugin_name = b"tahoe-lafs-dummy-v1"
yield self.make_node(
introducer_furl=SOME_FURL,
storage_plugin=plugin_name,
plugin_config=None,
)
server_id = b"v0-abcdef"
ann = {
u"service-name": u"storage",
u"storage-options": [{
# and this announcement is for a plugin with a matching name
u"name": plugin_name,
u"storage-server-FURL": SOME_FURL.decode("ascii"),
}],
}
self.publish(server_id, ann, self.introducer_client)
storage = self.get_storage(server_id, self.node)
self.addDetail("storage", text_content(str(storage)))
self.expectThat(
storage.storage_server,
MatchesAll(
IsInstance(DummyStorageClient),
MatchesStructure(
configuration=Equals({}),
),
),
)
class FoolscapStorageServers(unittest.TestCase):
"""
Tests for implementations of ``IFoolscapStorageServer``.
"""
def test_null_provider(self):
"""
Instances of ``_NullStorage`` provide ``IFoolscapStorageServer``.
"""
self.assertTrue(
verifyObject(
IFoolscapStorageServer,
_NullStorage(),
),
)
def test_foolscap_provider(self):
"""
Instances of ``_FoolscapStorage`` provide ``IFoolscapStorageServer``.
"""
@implementer(IStorageServer)
class NotStorageServer(object):
pass
self.assertTrue(
verifyObject(
IFoolscapStorageServer,
_FoolscapStorage.from_announcement(
u"server-id",
SOME_FURL,
{u"permutation-seed-base32": base32.b2a(b"permutationseed")},
NotStorageServer(),
),
),
)
class StoragePluginWebPresence(AsyncTestCase):
"""
Tests for the web resources ``IFoolscapStorageServer`` plugins may expose.
"""
@inlineCallbacks
def setUp(self):
super(StoragePluginWebPresence, self).setUp()
self.useFixture(UseTestPlugins())
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
self.storage_plugin = b"tahoe-lafs-dummy-v1"
from twisted.internet import reactor
_, port_endpoint = self.port_assigner.assign(reactor)
tempdir = TempDir()
self.useFixture(tempdir)
self.basedir = FilePath(tempdir.path)
self.basedir.child(u"private").makedirs()
self.node_fixture = self.useFixture(UseNode(
plugin_config={
b"web": b"1",
},
node_config={
b"tub.location": b"127.0.0.1:1",
b"web.port": port_endpoint,
},
storage_plugin=self.storage_plugin,
basedir=self.basedir,
introducer_furl=SOME_FURL,
))
self.node = yield self.node_fixture.create_node()
self.webish = self.node.getServiceNamed(WebishServer.name)
self.node.startService()
self.addCleanup(self.node.stopService)
self.port = self.webish.getPortnum()
@inlineCallbacks
def test_plugin_resource_path(self):
"""
The plugin's resource is published at */storage-plugins/<plugin name>*.
"""
url = u"http://127.0.0.1:{port}/storage-plugins/{plugin_name}".format(
port=self.port,
plugin_name=self.storage_plugin,
).encode("utf-8")
result = yield do_http(b"get", url)
self.assertThat(result, Equals(dumps({b"web": b"1"})))
def make_broker(tub_maker=lambda h: Mock()):
"""
Create a ``StorageFarmBroker`` with the given tub maker and an empty
client configuration.
"""
return StorageFarmBroker(True, tub_maker, EMPTY_CLIENT_CONFIG)
class TestStorageFarmBroker(unittest.TestCase):
def test_static_servers(self):
broker = StorageFarmBroker(True, lambda h: Mock())
broker = make_broker()
key_s = 'v0-1234-1'
servers_yaml = """\
servers_yaml = b"""\
storage:
v0-1234-1:
ann:
anonymous-storage-FURL: pb://ge@nowhere/fake
anonymous-storage-FURL: {furl}
permutation-seed-base32: aaaaaaaaaaaaaaaaaaaaaaaa
"""
""".format(furl=SOME_FURL)
servers = yamlutil.safe_load(servers_yaml)
permseed = base32.a2b("aaaaaaaaaaaaaaaaaaaaaaaa")
broker.set_static_servers(servers["storage"])
@ -80,22 +505,22 @@ storage:
self.assertEqual(s2.get_permutation_seed(), permseed)
def test_static_permutation_seed_pubkey(self):
broker = StorageFarmBroker(True, lambda h: Mock())
broker = make_broker()
server_id = "v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
k = "4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
ann = {
"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"anonymous-storage-FURL": SOME_FURL,
}
broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}})
s = broker.servers[server_id]
self.assertEqual(s.get_permutation_seed(), base32.a2b(k))
def test_static_permutation_seed_explicit(self):
broker = StorageFarmBroker(True, lambda h: Mock())
broker = make_broker()
server_id = "v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia"
k = "w5gl5igiexhwmftwzhai5jy2jixn7yx7"
ann = {
"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"anonymous-storage-FURL": SOME_FURL,
"permutation-seed-base32": k,
}
broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}})
@ -103,10 +528,10 @@ storage:
self.assertEqual(s.get_permutation_seed(), base32.a2b(k))
def test_static_permutation_seed_hashed(self):
broker = StorageFarmBroker(True, lambda h: Mock())
broker = make_broker()
server_id = "unparseable"
ann = {
"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"anonymous-storage-FURL": SOME_FURL,
}
broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}})
s = broker.servers[server_id]
@ -119,7 +544,7 @@ storage:
new_tubs = []
def make_tub(*args, **kwargs):
return new_tubs.pop()
broker = StorageFarmBroker(True, make_tub)
broker = make_broker(make_tub)
done = broker.when_connected_enough(5)
broker.use_introducer(introducer)
# subscribes to "storage" to learn of new storage nodes

View File

@ -22,6 +22,10 @@ from allmydata.storage_client import StorageFarmBroker
from allmydata.storage.server import storage_index_to_dir
from allmydata.client import _Client
from .common import (
EMPTY_CLIENT_CONFIG,
)
MiB = 1024*1024
def extract_uri(results):
@ -217,7 +221,11 @@ class FakeClient(object):
("%20d" % fakeid, FakeStorageServer(mode[fakeid], reactor=reactor))
for fakeid in range(self.num_servers)
]
self.storage_broker = StorageFarmBroker(permute_peers=True, tub_maker=None)
self.storage_broker = StorageFarmBroker(
permute_peers=True,
tub_maker=None,
node_config=EMPTY_CLIENT_CONFIG,
)
for (serverid, rref) in servers:
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
"permutation-seed-base32": base32.b2a(serverid) }

View File

@ -13,6 +13,10 @@ from hypothesis import given
from hypothesis.strategies import text
from ..common import (
EMPTY_CLIENT_CONFIG,
)
class FakeRoot(Root):
def __init__(self):
pass
@ -86,7 +90,7 @@ class RenderServiceRow(unittest.TestCase):
ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x",
"permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3",
}
s = NativeStorageServer("server_id", ann, None, {})
s = NativeStorageServer("server_id", ann, None, {}, EMPTY_CLIENT_CONFIG)
cs = ConnectionStatus(False, "summary", {}, 0, 0)
s.get_connection_status = lambda: cs

View File

@ -40,9 +40,15 @@ from allmydata.util import fileutil, base32, hashutil
from allmydata.util.consumer import download_to_data
from allmydata.util.encodingutil import to_str
from ...util.connection_status import ConnectionStatus
from ..common import FakeCHKFileNode, FakeMutableFileNode, \
create_chk_filenode, WebErrorMixin, \
make_mutable_file_uri, create_mutable_filenode
from ..common import (
EMPTY_CLIENT_CONFIG,
FakeCHKFileNode,
FakeMutableFileNode,
create_chk_filenode,
WebErrorMixin,
make_mutable_file_uri,
create_mutable_filenode,
)
from allmydata.interfaces import IMutableFileNode, SDMF_VERSION, MDMF_VERSION
from allmydata.mutable import servermap, publish, retrieve
from .. import common_util as testutil
@ -280,7 +286,11 @@ class FakeClient(_Client):
self._secret_holder = SecretHolder("lease secret", "convergence secret")
self.helper = None
self.convergence = "some random string"
self.storage_broker = StorageFarmBroker(permute_peers=True, tub_maker=None)
self.storage_broker = StorageFarmBroker(
permute_peers=True,
tub_maker=None,
node_config=EMPTY_CLIENT_CONFIG,
)
# fake knowledge of another server
self.storage_broker.test_add_server("other_nodeid",
FakeDisplayableServer(

View File

@ -1,6 +1,7 @@
from ConfigParser import SafeConfigParser
import attr
class UnknownConfigError(Exception):
"""
@ -36,15 +37,16 @@ def write_config(tahoe_cfg, config):
finally:
f.close()
def validate_config(fname, cfg, valid_sections):
def validate_config(fname, cfg, valid_config):
"""
raises UnknownConfigError if there are any unknown sections or config
values.
:param ValidConfiguration valid_config: The definition of a valid
configuration.
:raises UnknownConfigError: if there are any unknown sections or config
values.
"""
for section in cfg.sections():
try:
valid_in_section = valid_sections[section]
except KeyError:
if not valid_config.is_valid_section(section):
raise UnknownConfigError(
"'{fname}' contains unknown section [{section}]".format(
fname=fname,
@ -52,7 +54,7 @@ def validate_config(fname, cfg, valid_sections):
)
)
for option in cfg.options(section):
if option not in valid_in_section:
if not valid_config.is_valid_item(section, option):
raise UnknownConfigError(
"'{fname}' section [{section}] contains unknown option '{option}'".format(
fname=fname,
@ -60,3 +62,57 @@ def validate_config(fname, cfg, valid_sections):
option=option,
)
)
@attr.s
class ValidConfiguration(object):
"""
:ivar dict[bytes, tuple[bytes]] _static_valid_sections: A mapping from
valid section names to valid items in those sections.
:ivar _is_valid_section: A callable which accepts a section name as bytes
and returns True if that section name is valid, False otherwise.
:ivar _is_valid_item: A callable which accepts a section name as bytes and
an item name as bytes and returns True if that section, item pair is
valid, False otherwise.
"""
_static_valid_sections = attr.ib()
_is_valid_section = attr.ib(default=lambda section_name: False)
_is_valid_item = attr.ib(default=lambda section_name, item_name: False)
def is_valid_section(self, section_name):
"""
:return: True if the given section name is valid, False otherwise.
"""
return (
section_name in self._static_valid_sections or
self._is_valid_section(section_name)
)
def is_valid_item(self, section_name, item_name):
"""
:return: True if the given section name, ite name pair is valid, False
otherwise.
"""
return (
item_name in self._static_valid_sections.get(section_name, ()) or
self._is_valid_item(section_name, item_name)
)
def update(self, valid_config):
static_valid_sections = self._static_valid_sections.copy()
static_valid_sections.update(valid_config._static_valid_sections)
return ValidConfiguration(
static_valid_sections,
_either(self._is_valid_section, valid_config._is_valid_section),
_either(self._is_valid_item, valid_config._is_valid_item),
)
def _either(f, g):
"""
:return: A function which returns True if either f or g returns True.
"""
return lambda *a, **kw: f(*a, **kw) or g(*a, **kw)

View File

@ -12,6 +12,20 @@ class ConnectionStatus(object):
self.last_connection_time = last_connection_time
self.last_received_time = last_received_time
@classmethod
def unstarted(cls):
"""
Create a ``ConnectionStatus`` representing a connection for which no
attempts have yet been made.
"""
return cls(
connected=False,
summary=u"unstarted",
non_connected_statuses=[],
last_connection_time=None,
last_received_time=None,
)
def _hint_statuses(which, handlers, statuses):
non_connected_statuses = {}
for hint in which:
@ -23,10 +37,12 @@ def _hint_statuses(which, handlers, statuses):
def from_foolscap_reconnector(rc, last_received):
ri = rc.getReconnectionInfo()
# See foolscap/reconnector.py, ReconnectionInfo, for details about
# possible states.
state = ri.state
# the Reconnector shouldn't even be exposed until it is started, so we
# should never see "unstarted"
assert state in ("connected", "connecting", "waiting"), state
if state == "unstarted":
return ConnectionStatus.unstarted()
ci = ri.connectionInfo
connected = False
last_connected = None

View File

@ -196,12 +196,6 @@ class Root(MultiFormatPage):
rend.Page.__init__(self, client)
self.client = client
self.now_fn = now_fn
try:
s = client.getServiceNamed("storage")
except KeyError:
s = None
self.putChild("storage", storage.StorageStatus(s, self.client.nickname))
self.putChild("uri", URIHandler(client))
self.putChild("cap", URIHandler(client))
@ -237,6 +231,16 @@ class Root(MultiFormatPage):
# the Helper isn't attached until after the Tub starts, so this child
# needs to created on each request
return status.HelperStatus(self.client.helper)
if path == "storage":
# Storage isn't initialized until after the web hierarchy is
# constructed so this child needs to be created later than
# `__init__`.
try:
storage_server = self.client.getServiceNamed("storage")
except KeyError:
storage_server = None
return storage.StorageStatus(storage_server, self.client.nickname)
# FIXME: This code is duplicated in root.py and introweb.py.
def data_rendered_at(self, ctx, data):

View File

@ -0,0 +1,34 @@
"""
This module implements a resource which has as children the web resources
of all enabled storage client plugins.
"""
from twisted.web.resource import (
Resource,
NoResource,
)
class StoragePlugins(Resource, object):
"""
The parent resource of all enabled storage client plugins' web resources.
"""
def __init__(self, client):
"""
:param _Client client: The Tahoe-LAFS client node object which will be
used to find the storage plugin web resources.
"""
Resource.__init__(self)
self._client = client
def getChild(self, segment, request):
"""
Get an ``IResource`` from the loaded, enabled plugin with a name that
equals ``segment``.
:see: ``twisted.web.iweb.IResource.getChild``
"""
resources = self._client.get_client_storage_plugin_web_resources()
try:
return resources[segment]
except KeyError:
return NoResource()

View File

@ -13,6 +13,10 @@ from allmydata.web import introweb, root
from allmydata.web.common import MyExceptionHandler
from allmydata.web.operations import OphandleTable
from .web.storage_plugins import (
StoragePlugins,
)
# we must override twisted.web.http.Request.requestReceived with a version
# that doesn't use cgi.parse_multipart() . Since we actually use Nevow, we
# override the nevow-specific subclass, nevow.appserver.NevowRequest . This
@ -172,6 +176,8 @@ class WebishServer(service.MultiService):
self._operations.setServiceParent(self)
self.root.putChild("operations", self._operations)
self.root.putChild(b"storage-plugins", StoragePlugins(client))
def buildServer(self, webport, nodeurl_path, staticdir):
self.webport = webport
self.site = site = appserver.NevowSite(self.root)