Resolve merge conflicts

This commit is contained in:
dlee 2023-03-15 15:50:45 -05:00
commit 3ff30c675f
58 changed files with 3361 additions and 669 deletions

View File

@ -51,12 +51,13 @@ jobs:
- "3.8" - "3.8"
- "3.9" - "3.9"
- "3.10" - "3.10"
- "3.11"
include: include:
# On macOS don't bother with 3.8, just to get faster builds. # On macOS don't bother with 3.8, just to get faster builds.
- os: macos-latest - os: macos-latest
python-version: "3.9" python-version: "3.9"
- os: macos-latest - os: macos-latest
python-version: "3.10" python-version: "3.11"
# We only support PyPy on Linux at the moment. # We only support PyPy on Linux at the moment.
- os: ubuntu-latest - os: ubuntu-latest
python-version: "pypy-3.8" python-version: "pypy-3.8"
@ -174,7 +175,7 @@ jobs:
# 22.04 has some issue with Tor at the moment: # 22.04 has some issue with Tor at the moment:
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3943 # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3943
- os: ubuntu-20.04 - os: ubuntu-20.04
python-version: "3.9" python-version: "3.11"
force-foolscap: false force-foolscap: false
steps: steps:

View File

@ -1,10 +0,0 @@
FROM python:2.7
ADD . /tahoe-lafs
RUN \
cd /tahoe-lafs && \
git pull --depth=100 && \
pip install . && \
rm -rf ~/.cache/
WORKDIR /root

View File

@ -1,25 +0,0 @@
FROM debian:9
LABEL maintainer "gordon@leastauthority.com"
RUN apt-get update
RUN DEBIAN_FRONTEND=noninteractive apt-get -yq upgrade
RUN DEBIAN_FRONTEND=noninteractive apt-get -yq install build-essential python-dev libffi-dev libssl-dev python-virtualenv git
RUN \
git clone https://github.com/tahoe-lafs/tahoe-lafs.git /root/tahoe-lafs; \
cd /root/tahoe-lafs; \
virtualenv --python=python2.7 venv; \
./venv/bin/pip install --upgrade setuptools; \
./venv/bin/pip install --editable .; \
./venv/bin/tahoe --version;
RUN \
cd /root; \
mkdir /root/.tahoe-client; \
mkdir /root/.tahoe-introducer; \
mkdir /root/.tahoe-server;
RUN /root/tahoe-lafs/venv/bin/tahoe create-introducer --location=tcp:introducer:3458 --port=tcp:3458 /root/.tahoe-introducer
RUN /root/tahoe-lafs/venv/bin/tahoe start /root/.tahoe-introducer
RUN /root/tahoe-lafs/venv/bin/tahoe create-node --location=tcp:server:3457 --port=tcp:3457 --introducer=$(cat /root/.tahoe-introducer/private/introducer.furl) /root/.tahoe-server
RUN /root/tahoe-lafs/venv/bin/tahoe create-client --webport=3456 --introducer=$(cat /root/.tahoe-introducer/private/introducer.furl) --basedir=/root/.tahoe-client --shares-needed=1 --shares-happy=1 --shares-total=1
VOLUME ["/root/.tahoe-client", "/root/.tahoe-server", "/root/.tahoe-introducer"]
EXPOSE 3456 3457 3458
ENTRYPOINT ["/root/tahoe-lafs/venv/bin/tahoe"]
CMD []

View File

@ -5,6 +5,23 @@ To run:
$ pytest benchmarks/upload_download.py -s -v -Wignore $ pytest benchmarks/upload_download.py -s -v -Wignore
To add latency of e.g. 60ms on Linux:
$ tc qdisc add dev lo root netem delay 30ms
To reset:
$ tc qdisc del dev lo root netem
Frequency scaling can spoil the results.
To see the range of frequency scaling on a Linux system:
$ cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_available_frequencies
And to pin the CPU frequency to the lower bound found in these files:
$ sudo cpupower frequency-set -f <lowest available frequency>
TODO Parameterization (pytest?) TODO Parameterization (pytest?)
- Foolscap vs not foolscap - Foolscap vs not foolscap
@ -30,6 +47,7 @@ from tempfile import mkdtemp
import os import os
from twisted.trial.unittest import TestCase from twisted.trial.unittest import TestCase
from twisted.internet.defer import gatherResults
from allmydata.util.deferredutil import async_to_deferred from allmydata.util.deferredutil import async_to_deferred
from allmydata.util.consumer import MemoryConsumer from allmydata.util.consumer import MemoryConsumer
@ -56,6 +74,10 @@ class ImmutableBenchmarks(SystemTestMixin, TestCase):
# To use Foolscap, change to True: # To use Foolscap, change to True:
FORCE_FOOLSCAP_FOR_STORAGE = False FORCE_FOOLSCAP_FOR_STORAGE = False
# Don't reduce HTTP connection timeouts, that messes up the more aggressive
# benchmarks:
REDUCE_HTTP_CLIENT_TIMEOUT = False
@async_to_deferred @async_to_deferred
async def setUp(self): async def setUp(self):
SystemTestMixin.setUp(self) SystemTestMixin.setUp(self)
@ -104,3 +126,13 @@ class ImmutableBenchmarks(SystemTestMixin, TestCase):
with timeit("download"): with timeit("download"):
data = await result.download_best_version() data = await result.download_best_version()
self.assertEqual(data, DATA) self.assertEqual(data, DATA)
@async_to_deferred
async def test_upload_mutable_in_parallel(self):
# To test larger files, change this:
DATA = b"Some data to upload\n" * 1_000_000
with timeit(" upload"):
await gatherResults([
self.clients[0].create_mutable_file(MutableData(DATA))
for _ in range(20)
])

View File

@ -1,49 +0,0 @@
version: '2'
services:
client:
build:
context: .
dockerfile: ./Dockerfile.dev
volumes:
- ./misc:/root/tahoe-lafs/misc
- ./integration:/root/tahoe-lafs/integration
- ./src:/root/tahoe-lafs/static
- ./setup.cfg:/root/tahoe-lafs/setup.cfg
- ./setup.py:/root/tahoe-lafs/setup.py
ports:
- "127.0.0.1:3456:3456"
depends_on:
- "introducer"
- "server"
entrypoint: /root/tahoe-lafs/venv/bin/tahoe
command: ["run", "/root/.tahoe-client"]
server:
build:
context: .
dockerfile: ./Dockerfile.dev
volumes:
- ./misc:/root/tahoe-lafs/misc
- ./integration:/root/tahoe-lafs/integration
- ./src:/root/tahoe-lafs/static
- ./setup.cfg:/root/tahoe-lafs/setup.cfg
- ./setup.py:/root/tahoe-lafs/setup.py
ports:
- "127.0.0.1:3457:3457"
depends_on:
- "introducer"
entrypoint: /root/tahoe-lafs/venv/bin/tahoe
command: ["run", "/root/.tahoe-server"]
introducer:
build:
context: .
dockerfile: ./Dockerfile.dev
volumes:
- ./misc:/root/tahoe-lafs/misc
- ./integration:/root/tahoe-lafs/integration
- ./src:/root/tahoe-lafs/static
- ./setup.cfg:/root/tahoe-lafs/setup.cfg
- ./setup.py:/root/tahoe-lafs/setup.py
ports:
- "127.0.0.1:3458:3458"
entrypoint: /root/tahoe-lafs/venv/bin/tahoe
command: ["run", "/root/.tahoe-introducer"]

View File

@ -980,6 +980,9 @@ the node will not use an Introducer at all.
Such "introducerless" clients must be configured with static servers (described Such "introducerless" clients must be configured with static servers (described
below), or they will not be able to upload and download files. below), or they will not be able to upload and download files.
.. _server_list:
Static Server Definitions Static Server Definitions
========================= =========================

View File

@ -32,6 +32,7 @@ Contents:
gpg-setup gpg-setup
servers servers
managed-grid
helper helper
convergence-secret convergence-secret
garbage-collection garbage-collection

342
docs/managed-grid.rst Normal file
View File

@ -0,0 +1,342 @@
Managed Grid
============
This document explains the "Grid Manager" concept and the
`grid-manager` command. Someone operating a grid may choose to use a
Grid Manager. Operators of storage-servers and clients will then be
given additional configuration in this case.
Overview and Motivation
-----------------------
In a grid using an Introducer, a client will use any storage-server
the Introducer announces (and the Introducer will announce any
storage-server that connects to it). This means that anyone with the
Introducer fURL can connect storage to the grid.
Sometimes, this is just what you want!
For some use-cases, though, you want to have clients only use certain
servers. One case might be a "managed" grid, where some entity runs
the grid; clients of this grid don't want their uploads to go to
"unmanaged" storage if some other client decides to provide storage.
One way to limit which storage servers a client connects to is via the
"server list" (:ref:`server_list`) (aka "Introducerless"
mode). Clients are given static lists of storage-servers, and connect
only to those. This means manually updating these lists if the storage
servers change, however.
Another method is for clients to use `[client] peers.preferred=`
configuration option (:ref:`Client Configuration`), which suffers
from a similar disadvantage.
Grid Manager
------------
A "grid-manager" consists of some data defining a keypair (along with
some other details) and Tahoe sub-commands to manipulate the data and
produce certificates to give to storage-servers. Certificates assert
the statement: "Grid Manager X suggests you use storage-server Y to
upload shares to" (X and Y are public-keys). Such a certificate
consists of:
- the version of the format the certificate conforms to (`1`)
- the public-key of a storage-server
- an expiry timestamp
- a signature of the above
A client will always use any storage-server for downloads (expired
certificate, or no certificate) because clients check the ciphertext
and re-assembled plaintext against the keys in the capability;
"grid-manager" certificates only control uploads.
Clients make use of this functionality by configuring one or more Grid Manager public keys.
This tells the client to only upload to storage-servers that have a currently-valid certificate from any of the Grid Managers their client allows.
In case none are configured, the default behavior (of using any storage server) prevails.
Grid Manager Data Storage
-------------------------
The data defining the grid-manager is stored in an arbitrary
directory, which you indicate with the ``--config`` option (in the
future, we may add the ability to store the data directly in a grid,
at which time you may be able to pass a directory-capability to this
option).
If you don't want to store the configuration on disk at all, you may
use ``--config -`` (the last character is a dash) and write a valid
JSON configuration to stdin.
All commands require the ``--config`` option and they all behave
similarly for "data from stdin" versus "data from disk". A directory
(and not a file) is used on disk because in that mode, each
certificate issued is also stored alongside the configuration
document; in "stdin / stdout" mode, an issued certificate is only
ever available on stdout.
The configuration is a JSON document. It is subject to change as Grid
Manager evolves. It contains a version number in the
`grid_manager_config_version` key which will increment whenever the
document schema changes.
grid-manager create
```````````````````
Create a new grid-manager.
If you specify ``--config -`` then a new grid-manager configuration is
written to stdout. Otherwise, a new grid-manager is created in the
directory specified by the ``--config`` option. It is an error if the
directory already exists.
grid-manager public-identity
````````````````````````````
Print out a grid-manager's public key. This key is derived from the
private-key of the grid-manager, so a valid grid-manager config must
be given via ``--config``
This public key is what is put in clients' configuration to actually
validate and use grid-manager certificates.
grid-manager add
````````````````
Takes two args: ``name pubkey``. The ``name`` is an arbitrary local
identifier for the new storage node (also sometimes called "a petname"
or "nickname"). The pubkey is the tahoe-encoded key from a ``node.pubkey``
file in the storage-server's node directory (minus any
whitespace). For example, if ``~/storage0`` contains a storage-node,
you might do something like this::
grid-manager --config ./gm0 add storage0 $(cat ~/storage0/node.pubkey)
This adds a new storage-server to a Grid Manager's
configuration. (Since it mutates the configuration, if you used
``--config -`` the new configuration will be printed to stdout). The
usefulness of the ``name`` is solely for reference within this Grid
Manager.
grid-manager list
`````````````````
Lists all storage-servers that have previously been added using
``grid-manager add``.
grid-manager sign
`````````````````
Takes two args: ``name expiry_days``. The ``name`` is a nickname used
previously in a ``grid-manager add`` command and ``expiry_days`` is
the number of days in the future when the certificate should expire.
Note that this mutates the state of the grid-manager if it is on disk,
by adding this certificate to our collection of issued
certificates. If you used ``--config -``, the certificate isn't
persisted anywhere except to stdout (so if you wish to keep it
somewhere, that is up to you).
This command creates a new "version 1" certificate for a
storage-server (identified by its public key). The new certificate is
printed to stdout. If you stored the config on disk, the new
certificate will (also) be in a file named like ``alice.cert.0``.
Enrolling a Storage Server: CLI
-------------------------------
tahoe admin add-grid-manager-cert
`````````````````````````````````
- `--filename`: the file to read the cert from
- `--name`: the name of this certificate
Import a "version 1" storage-certificate produced by a grid-manager A
storage server may have zero or more such certificates installed; for
now just one is sufficient. You will have to re-start your node after
this. Subsequent announcements to the Introducer will include this
certificate.
.. note::
This command will simply edit the `tahoe.cfg` file and direct you
to re-start. In the Future(tm), we should consider (in exarkun's
words):
"A python program you run as a new process" might not be the
best abstraction to layer on top of the configuration
persistence system, though. It's a nice abstraction for users
(although most users would probably rather have a GUI) but it's
not a great abstraction for automation. So at some point it
may be better if there is CLI -> public API -> configuration
persistence system. And maybe "public API" is even a network
API for the storage server so it's equally easy to access from
an agent implemented in essentially any language and maybe if
the API is exposed by the storage node itself then this also
gives you live-configuration-updates, avoiding the need for
node restarts (not that this is the only way to accomplish
this, but I think it's a good way because it avoids the need
for messes like inotify and it supports the notion that the
storage node process is in charge of its own configuration
persistence system, not just one consumer among many ... which
has some nice things going for it ... though how this interacts
exactly with further node management automation might bear
closer scrutiny).
Enrolling a Storage Server: Config
----------------------------------
You may edit the ``[storage]`` section of the ``tahoe.cfg`` file to
turn on grid-management with ``grid_management = true``. You then must
also provide a ``[grid_management_certificates]`` section in the
config-file which lists ``name = path/to/certificate`` pairs.
These certificate files are issued by the ``grid-manager sign``
command; these should be transmitted to the storage server operator
who includes them in the config for the storage server. Relative paths
are based from the node directory. Example::
[storage]
grid_management = true
[grid_management_certificates]
default = example_grid.cert
This will cause us to give this certificate to any Introducers we
connect to (and subsequently, the Introducer will give the certificate
out to clients).
Enrolling a Client: Config
--------------------------
You may instruct a Tahoe client to use only storage servers from given
Grid Managers. If there are no such keys, any servers are used
(but see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3979). If
there are one or more keys, the client will only upload to a storage
server that has a valid certificate (from any of the keys).
To specify public-keys, add a ``[grid_managers]`` section to the
config. This consists of ``name = value`` pairs where ``name`` is an
arbitrary name and ``value`` is a public-key of a Grid
Manager. Example::
[grid_managers]
example_grid = pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq
See also https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3507 which
proposes a command to edit the config.
Example Setup of a New Managed Grid
-----------------------------------
This example creates an actual grid, but it's all just on one machine
with different "node directories" and a separate tahoe process for
each node. Usually of course each storage server would be on a
separate computer.
Note that we use the ``daemonize`` command in the following but that's
only one way to handle "running a command in the background". You
could instead run commands that start with ``daemonize ...`` in their
own shell/terminal window or via something like ``systemd``
We'll store our Grid Manager configuration on disk, in
``./gm0``. To initialize this directory::
grid-manager --config ./gm0 create
(If you already have a grid, you can :ref:`skip ahead <skip_ahead>`.)
First of all, create an Introducer. Note that we actually have to run
it briefly before it creates the "Introducer fURL" we want for the
next steps::
tahoe create-introducer --listen=tcp --port=5555 --location=tcp:localhost:5555 ./introducer
daemonize tahoe -d introducer run
Next, we attach a couple of storage nodes::
tahoe create-node --introducer $(cat introducer/private/introducer.furl) --nickname storage0 --webport 6001 --location tcp:localhost:6003 --port 6003 ./storage0
tahoe create-node --introducer $(cat introducer/private/introducer.furl) --nickname storage1 --webport 6101 --location tcp:localhost:6103 --port 6103 ./storage1
daemonize tahoe -d storage0 run
daemonize tahoe -d storage1 run
.. _skip_ahead:
We can now tell the Grid Manager about our new storage servers::
grid-manager --config ./gm0 add storage0 $(cat storage0/node.pubkey)
grid-manager --config ./gm0 add storage1 $(cat storage1/node.pubkey)
To produce a new certificate for each node, we do this::
grid-manager --config ./gm0 sign storage0 > ./storage0/gridmanager.cert
grid-manager --config ./gm0 sign storage1 > ./storage1/gridmanager.cert
Now, we want our storage servers to actually announce these
certificates into the grid. We do this by adding some configuration
(in ``tahoe.cfg``)::
[storage]
grid_management = true
[grid_manager_certificates]
default = gridmanager.cert
Add the above bit to each node's ``tahoe.cfg`` and re-start the
storage nodes. (Alternatively, use the ``tahoe add-grid-manager``
command).
Now try adding a new storage server ``storage2``. This client can join
the grid just fine, and announce itself to the Introducer as providing
storage::
tahoe create-node --introducer $(cat introducer/private/introducer.furl) --nickname storage2 --webport 6301 --location tcp:localhost:6303 --port 6303 ./storage2
daemonize tahoe -d storage2 run
At this point any client will upload to any of these three
storage-servers. Make a client "alice" and try!
::
tahoe create-client --introducer $(cat introducer/private/introducer.furl) --nickname alice --webport 6401 --shares-total=3 --shares-needed=2 --shares-happy=3 ./alice
daemonize tahoe -d alice run
tahoe -d alice put README.rst # prints out a read-cap
find storage2/storage/shares # confirm storage2 has a share
Now we want to make Alice only upload to the storage servers that the
grid-manager has given certificates to (``storage0`` and
``storage1``). We need the grid-manager's public key to put in Alice's
configuration::
grid-manager --config ./gm0 public-identity
Put the key printed out above into Alice's ``tahoe.cfg`` in section
``client``::
[grid_managers]
example_name = pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq
Now, re-start the "alice" client. Since we made Alice's parameters
require 3 storage servers to be reachable (``--happy=3``), all their
uploads should now fail (so ``tahoe put`` will fail) because they
won't use storage2 and thus can't "achieve happiness".
A proposal to expose more information about Grid Manager and
certificate status in the Welcome page is discussed in
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3506

View File

@ -3,11 +3,14 @@ Integration tests for getting and putting files, including reading from stdin
and stdout. and stdout.
""" """
from subprocess import Popen, PIPE from subprocess import Popen, PIPE, check_output, check_call
import sys
import pytest import pytest
from pytest_twisted import ensureDeferred
from twisted.internet import reactor
from .util import run_in_thread, cli from .util import run_in_thread, cli, reconfigure
DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11" DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11"
try: try:
@ -62,3 +65,69 @@ def test_get_to_stdout(alice, get_put_alias, tmpdir):
) )
assert p.stdout.read() == DATA assert p.stdout.read() == DATA
assert p.wait() == 0 assert p.wait() == 0
def test_large_file(alice, get_put_alias, tmp_path):
"""
It's possible to upload and download a larger file.
We avoid stdin/stdout since that's flaky on Windows.
"""
tempfile = tmp_path / "file"
with tempfile.open("wb") as f:
f.write(DATA * 1_000_000)
cli(alice, "put", str(tempfile), "getput:largefile")
outfile = tmp_path / "out"
check_call(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:largefile", str(outfile)],
)
assert outfile.read_bytes() == tempfile.read_bytes()
@pytest.mark.skipif(
sys.platform.startswith("win"),
reason="reconfigure() has issues on Windows"
)
@ensureDeferred
async def test_upload_download_immutable_different_default_max_segment_size(alice, get_put_alias, tmpdir, request):
"""
Tahoe-LAFS used to have a default max segment size of 128KB, and is now
1MB. Test that an upload created when 128KB was the default can be
downloaded with 1MB as the default (i.e. old uploader, new downloader), and
vice versa, (new uploader, old downloader).
"""
tempfile = tmpdir.join("file")
large_data = DATA * 100_000
assert len(large_data) > 2 * 1024 * 1024
with tempfile.open("wb") as f:
f.write(large_data)
async def set_segment_size(segment_size):
await reconfigure(
reactor,
request,
alice,
(1, 1, 1),
None,
max_segment_size=segment_size
)
# 1. Upload file 1 with default segment size set to 1MB
await set_segment_size(1024 * 1024)
cli(alice, "put", str(tempfile), "getput:seg1024kb")
# 2. Download file 1 with default segment size set to 128KB
await set_segment_size(128 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg1024kb", "-"]
)
# 3. Upload file 2 with default segment size set to 128KB
cli(alice, "put", str(tempfile), "getput:seg128kb")
# 4. Download file 2 with default segment size set to 1MB
await set_segment_size(1024 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg128kb", "-"]
)

View File

@ -55,9 +55,12 @@ def i2p_network(reactor, temp_dir, request):
proto, proto,
which("docker"), which("docker"),
( (
"docker", "run", "-p", "7656:7656", "purplei2p/i2pd:release-2.43.0", "docker", "run", "-p", "7656:7656", "purplei2p/i2pd:release-2.45.1",
# Bad URL for reseeds, so it can't talk to other routers. # Bad URL for reseeds, so it can't talk to other routers.
"--reseed.urls", "http://localhost:1/", "--reseed.urls", "http://localhost:1/",
# Make sure we see the "ephemeral keys message"
"--log=stdout",
"--loglevel=info"
), ),
) )

View File

@ -36,7 +36,8 @@ async def test_capability(reactor, request, alice, case, expected):
computed value. computed value.
""" """
# rewrite alice's config to match params and convergence # rewrite alice's config to match params and convergence
await reconfigure(reactor, request, alice, (1, case.params.required, case.params.total), case.convergence) await reconfigure(
reactor, request, alice, (1, case.params.required, case.params.total), case.convergence, case.segment_size)
# upload data in the correct format # upload data in the correct format
actual = upload(alice, case.fmt, case.data) actual = upload(alice, case.fmt, case.data)
@ -110,7 +111,8 @@ async def generate(
request, request,
alice, alice,
(happy, case.params.required, case.params.total), (happy, case.params.required, case.params.total),
case.convergence case.convergence,
case.segment_size
) )
# Give the format a chance to make an RSA key if it needs it. # Give the format a chance to make an RSA key if it needs it.

View File

@ -46,6 +46,7 @@ from allmydata.util.configutil import (
write_config, write_config,
) )
from allmydata import client from allmydata import client
from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
import pytest_twisted import pytest_twisted
@ -366,6 +367,12 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
'force_foolscap', 'force_foolscap',
str(force_foolscap), str(force_foolscap),
) )
set_config(
config,
'client',
'force_foolscap',
str(force_foolscap),
)
write_config(FilePath(config_path), config) write_config(FilePath(config_path), config)
created_d.addCallback(created) created_d.addCallback(created)
@ -729,11 +736,16 @@ def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str:
return cli(*argv).decode("utf-8").strip() return cli(*argv).decode("utf-8").strip()
async def reconfigure(reactor, request, node: TahoeProcess, params: tuple[int, int, int], convergence: None | bytes) -> None: async def reconfigure(reactor, request, node: TahoeProcess,
params: tuple[int, int, int],
convergence: None | bytes,
max_segment_size: None | int = None) -> None:
""" """
Reconfigure a Tahoe-LAFS node with different ZFEC parameters and Reconfigure a Tahoe-LAFS node with different ZFEC parameters and
convergence secret. convergence secret.
TODO This appears to have issues on Windows.
If the current configuration is different from the specified If the current configuration is different from the specified
configuration, the node will be restarted so it takes effect. configuration, the node will be restarted so it takes effect.
@ -769,7 +781,22 @@ async def reconfigure(reactor, request, node: TahoeProcess, params: tuple[int, i
changed = True changed = True
config.write_private_config("convergence", base32.b2a(convergence)) config.write_private_config("convergence", base32.b2a(convergence))
if max_segment_size is not None:
cur_segment_size = int(config.get_config("client", "shares._max_immutable_segment_size_for_testing", DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE))
if cur_segment_size != max_segment_size:
changed = True
config.set_config(
"client",
"shares._max_immutable_segment_size_for_testing",
str(max_segment_size)
)
if changed: if changed:
# TODO reconfigure() seems to have issues on Windows. If you need to
# use it there, delete this assert and try to figure out what's going
# on...
assert not sys.platform.startswith("win")
# restart the node # restart the node
print(f"Restarting {node.node_dir} for ZFEC reconfiguration") print(f"Restarting {node.node_dir} for ZFEC reconfiguration")
await node.restart_async(reactor, request) await node.restart_async(reactor, request)

View File

@ -5,7 +5,7 @@ from __future__ import print_function
import sys, math import sys, math
from allmydata import uri, storage from allmydata import uri, storage
from allmydata.immutable import upload from allmydata.immutable import upload
from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
from allmydata.util import mathutil from allmydata.util import mathutil
def roundup(size, blocksize=4096): def roundup(size, blocksize=4096):
@ -26,7 +26,7 @@ class BigFakeString(object):
def tell(self): def tell(self):
return self.fp return self.fp
def calc(filesize, params=(3,7,10), segsize=DEFAULT_MAX_SEGMENT_SIZE): def calc(filesize, params=(3,7,10), segsize=DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE):
num_shares = params[2] num_shares = params[2]
if filesize <= upload.Uploader.URI_LIT_SIZE_THRESHOLD: if filesize <= upload.Uploader.URI_LIT_SIZE_THRESHOLD:
urisize = len(uri.LiteralFileURI("A"*filesize).to_string()) urisize = len(uri.LiteralFileURI("A"*filesize).to_string())

View File

@ -0,0 +1 @@
Tahoe-LAFS now includes a new "Grid Manager" specification and implementation adding more options to control which storage servers a client will use for uploads.

0
newsfragments/3917.minor Normal file
View File

0
newsfragments/3936.minor Normal file
View File

View File

@ -0,0 +1 @@
Downloads of large immutables should now finish much faster.

0
newsfragments/3959.minor Normal file
View File

0
newsfragments/3965.minor Normal file
View File

0
newsfragments/3968.minor Normal file
View File

0
newsfragments/3974.minor Normal file
View File

View File

@ -0,0 +1 @@
Added support for Python 3.11.

View File

@ -55,7 +55,9 @@ install_requires = [
# * foolscap >= 0.12.6 has an i2p.sam_endpoint() that takes kwargs # * foolscap >= 0.12.6 has an i2p.sam_endpoint() that takes kwargs
# * foolscap 0.13.2 drops i2p support completely # * foolscap 0.13.2 drops i2p support completely
# * foolscap >= 21.7 is necessary for Python 3 with i2p support. # * foolscap >= 21.7 is necessary for Python 3 with i2p support.
# * foolscap >= 23.3 is necessary for Python 3.11.
"foolscap >= 21.7.0", "foolscap >= 21.7.0",
"foolscap >= 23.3.0; python_version > '3.10'",
# * cryptography 2.6 introduced some ed25519 APIs we rely on. Note that # * cryptography 2.6 introduced some ed25519 APIs we rely on. Note that
# Twisted[conch] also depends on cryptography and Twisted[tls] # Twisted[conch] also depends on cryptography and Twisted[tls]
@ -144,6 +146,9 @@ install_requires = [
# amount of copying involved. # amount of copying involved.
"pycddl >= 0.4", "pycddl >= 0.4",
# Command-line parsing
"click >= 7.0",
# for pid-file support # for pid-file support
"psutil", "psutil",
"filelock", "filelock",
@ -377,8 +382,8 @@ setup(name="tahoe-lafs", # also set in __init__.py
package_dir = {'':'src'}, package_dir = {'':'src'},
packages=find_packages('src') + ['allmydata.test.plugins'], packages=find_packages('src') + ['allmydata.test.plugins'],
classifiers=trove_classifiers, classifiers=trove_classifiers,
# We support Python 3.8 or later. 3.11 is not supported yet. # We support Python 3.8 or later, 3.12 is untested for now
python_requires=">=3.8, <3.11", python_requires=">=3.8, <3.12",
install_requires=install_requires, install_requires=install_requires,
extras_require={ extras_require={
# Duplicate the Twisted pywin32 dependency here. See # Duplicate the Twisted pywin32 dependency here. See
@ -428,6 +433,11 @@ setup(name="tahoe-lafs", # also set in __init__.py
}, },
include_package_data=True, include_package_data=True,
setup_requires=setup_requires, setup_requires=setup_requires,
entry_points = { 'console_scripts': [ 'tahoe = allmydata.scripts.runner:run' ] }, entry_points={
'console_scripts': [
'tahoe = allmydata.scripts.runner:run',
'grid-manager = allmydata.cli.grid_manager:grid_manager',
]
},
**setup_args **setup_args
) )

View File

View File

@ -0,0 +1,224 @@
"""
A CLI for configuring a grid manager.
"""
from typing import Optional
from datetime import (
timedelta,
)
import click
from twisted.python.filepath import (
FilePath,
)
from allmydata.crypto import (
ed25519,
)
from allmydata.util.abbreviate import (
abbreviate_time,
)
from allmydata.grid_manager import (
create_grid_manager,
save_grid_manager,
load_grid_manager,
current_datetime_with_zone,
)
from allmydata.util import jsonbytes as json
@click.group()
@click.option(
'--config', '-c',
type=click.Path(),
help="Configuration directory (or - for stdin)",
required=True,
)
@click.pass_context
def grid_manager(ctx, config):
"""
A Tahoe Grid Manager issues certificates to storage-servers
A Tahoe client with one or more Grid Manager public keys
configured will only upload to a Storage Server that presents a
valid certificate signed by one of the configured Grid
Manager keys.
Grid Manager configuration can be in a local directory or given
via stdin. It contains long-term secret information (a private
signing key) and should be kept safe.
"""
class Config(object):
"""
Available to all sub-commands as Click's context.obj
"""
_grid_manager = None
@property
def grid_manager(self):
if self._grid_manager is None:
config_path = _config_path_from_option(config)
try:
self._grid_manager = load_grid_manager(config_path)
except ValueError as e:
raise click.ClickException(
"Error loading Grid Manager from '{}': {}".format(config, e)
)
return self._grid_manager
ctx.obj = Config()
@grid_manager.command()
@click.pass_context
def create(ctx):
"""
Make a new Grid Manager
"""
config_location = ctx.parent.params["config"]
fp = None
if config_location != '-':
fp = FilePath(config_location)
gm = create_grid_manager()
try:
save_grid_manager(fp, gm)
except OSError as e:
raise click.ClickException(
"Can't create '{}': {}".format(config_location, e)
)
@grid_manager.command()
@click.pass_obj
def public_identity(config):
"""
Show the public identity key of a Grid Manager
This is what you give to clients to add to their configuration so
they use announcements from this Grid Manager
"""
click.echo(config.grid_manager.public_identity())
@grid_manager.command()
@click.argument("name")
@click.argument("public_key", type=click.STRING)
@click.pass_context
def add(ctx, name, public_key):
"""
Add a new storage-server by name to a Grid Manager
PUBLIC_KEY is the contents of a node.pubkey file from a Tahoe
node-directory. NAME is an arbitrary label.
"""
public_key = public_key.encode("ascii")
try:
ctx.obj.grid_manager.add_storage_server(
name,
ed25519.verifying_key_from_string(public_key),
)
except KeyError:
raise click.ClickException(
"A storage-server called '{}' already exists".format(name)
)
save_grid_manager(
_config_path_from_option(ctx.parent.params["config"]),
ctx.obj.grid_manager,
create=False,
)
return 0
@grid_manager.command()
@click.argument("name")
@click.pass_context
def remove(ctx, name):
"""
Remove an existing storage-server by name from a Grid Manager
"""
fp = _config_path_from_option(ctx.parent.params["config"])
try:
ctx.obj.grid_manager.remove_storage_server(name)
except KeyError:
raise click.ClickException(
"No storage-server called '{}' exists".format(name)
)
cert_count = 0
if fp is not None:
while fp.child('{}.cert.{}'.format(name, cert_count)).exists():
fp.child('{}.cert.{}'.format(name, cert_count)).remove()
cert_count += 1
save_grid_manager(fp, ctx.obj.grid_manager, create=False)
@grid_manager.command() # noqa: F811
@click.pass_context
def list(ctx):
"""
List all storage-servers known to a Grid Manager
"""
for name in sorted(ctx.obj.grid_manager.storage_servers.keys()):
blank_name = " " * len(name)
click.echo("{}: {}".format(
name,
str(ctx.obj.grid_manager.storage_servers[name].public_key_string(), "utf-8")))
for cert in ctx.obj.grid_manager.storage_servers[name].certificates:
delta = current_datetime_with_zone() - cert.expires
click.echo("{} cert {}: ".format(blank_name, cert.index), nl=False)
if delta.total_seconds() < 0:
click.echo("valid until {} ({})".format(cert.expires, abbreviate_time(delta)))
else:
click.echo("expired {} ({})".format(cert.expires, abbreviate_time(delta)))
@grid_manager.command()
@click.argument("name")
@click.argument(
"expiry_days",
type=click.IntRange(1, 5*365), # XXX is 5 years a good maximum?
)
@click.pass_context
def sign(ctx, name, expiry_days):
"""
sign a new certificate
"""
fp = _config_path_from_option(ctx.parent.params["config"])
expiry = timedelta(days=expiry_days)
try:
certificate = ctx.obj.grid_manager.sign(name, expiry)
except KeyError:
raise click.ClickException(
"No storage-server called '{}' exists".format(name)
)
certificate_data = json.dumps(certificate.marshal(), indent=4)
click.echo(certificate_data)
if fp is not None:
next_serial = 0
f = None
while f is None:
fname = "{}.cert.{}".format(name, next_serial)
try:
f = fp.child(fname).create()
except FileExistsError:
f = None
except OSError as e:
raise click.ClickException(f"{fname}: {e}")
next_serial += 1
with f:
f.write(certificate_data.encode("ascii"))
def _config_path_from_option(config: str) -> Optional[FilePath]:
"""
:param str config: a path or -
:returns: a FilePath instance or None
"""
if config == "-":
return None
return FilePath(config)

View File

@ -3,8 +3,11 @@ Ported to Python 3.
""" """
from __future__ import annotations from __future__ import annotations
import os
import stat
import time
import weakref
from typing import Optional from typing import Optional
import os, stat, time, weakref
from base64 import urlsafe_b64encode from base64 import urlsafe_b64encode
from functools import partial from functools import partial
# On Python 2 this will be the backported package: # On Python 2 this will be the backported package:
@ -26,6 +29,7 @@ from twisted.application.internet import TimerService
from twisted.python.filepath import FilePath from twisted.python.filepath import FilePath
import allmydata import allmydata
from allmydata import node
from allmydata.crypto import rsa, ed25519 from allmydata.crypto import rsa, ed25519
from allmydata.crypto.util import remove_prefix from allmydata.crypto.util import remove_prefix
from allmydata.storage.server import StorageServer, FoolscapStorageServer from allmydata.storage.server import StorageServer, FoolscapStorageServer
@ -50,14 +54,13 @@ from allmydata.interfaces import (
IStatsProducer, IStatsProducer,
SDMF_VERSION, SDMF_VERSION,
MDMF_VERSION, MDMF_VERSION,
DEFAULT_MAX_SEGMENT_SIZE, DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE,
IFoolscapStoragePlugin, IFoolscapStoragePlugin,
IAnnounceableStorageServer, IAnnounceableStorageServer,
) )
from allmydata.nodemaker import NodeMaker from allmydata.nodemaker import NodeMaker
from allmydata.blacklist import Blacklist from allmydata.blacklist import Blacklist
from allmydata import node from allmydata.node import _Config
KiB=1024 KiB=1024
MiB=1024*KiB MiB=1024*KiB
@ -73,7 +76,8 @@ def _is_valid_section(section_name):
""" """
return ( return (
section_name.startswith("storageserver.plugins.") or section_name.startswith("storageserver.plugins.") or
section_name.startswith("storageclient.plugins.") section_name.startswith("storageclient.plugins.") or
section_name in ("grid_managers", "grid_manager_certificates")
) )
@ -88,7 +92,9 @@ _client_config = configutil.ValidConfiguration(
"shares.happy", "shares.happy",
"shares.needed", "shares.needed",
"shares.total", "shares.total",
"shares._max_immutable_segment_size_for_testing",
"storage.plugins", "storage.plugins",
"force_foolscap",
), ),
"storage": ( "storage": (
"debug_discard", "debug_discard",
@ -105,6 +111,7 @@ _client_config = configutil.ValidConfiguration(
"reserved_space", "reserved_space",
"storage_dir", "storage_dir",
"plugins", "plugins",
"grid_management",
"force_foolscap", "force_foolscap",
), ),
"sftpd": ( "sftpd": (
@ -459,7 +466,7 @@ def create_introducer_clients(config, main_tub, _introducer_factory=None):
return introducer_clients return introducer_clients
def create_storage_farm_broker(config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients): def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients):
""" """
Create a StorageFarmBroker object, for use by Uploader/Downloader Create a StorageFarmBroker object, for use by Uploader/Downloader
(and everybody else who wants to use storage servers) (and everybody else who wants to use storage servers)
@ -489,6 +496,7 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con
**kwargs **kwargs
) )
# create the actual storage-broker
sb = storage_client.StorageFarmBroker( sb = storage_client.StorageFarmBroker(
permute_peers=True, permute_peers=True,
tub_maker=tub_creator, tub_maker=tub_creator,
@ -606,7 +614,7 @@ class _Client(node.Node, pollmixin.PollMixin):
DEFAULT_ENCODING_PARAMETERS = {"k": 3, DEFAULT_ENCODING_PARAMETERS = {"k": 3,
"happy": 7, "happy": 7,
"n": 10, "n": 10,
"max_segment_size": DEFAULT_MAX_SEGMENT_SIZE, "max_segment_size": DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE,
} }
def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients, def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients,
@ -795,16 +803,18 @@ class _Client(node.Node, pollmixin.PollMixin):
sharetypes.append("mutable") sharetypes.append("mutable")
expiration_sharetypes = tuple(sharetypes) expiration_sharetypes = tuple(sharetypes)
ss = StorageServer(storedir, self.nodeid, ss = StorageServer(
reserved_space=reserved, storedir, self.nodeid,
discard_storage=discard, reserved_space=reserved,
readonly_storage=readonly, discard_storage=discard,
stats_provider=self.stats_provider, readonly_storage=readonly,
expiration_enabled=expire, stats_provider=self.stats_provider,
expiration_mode=mode, expiration_enabled=expire,
expiration_override_lease_duration=o_l_d, expiration_mode=mode,
expiration_cutoff_date=cutoff_date, expiration_override_lease_duration=o_l_d,
expiration_sharetypes=expiration_sharetypes) expiration_cutoff_date=cutoff_date,
expiration_sharetypes=expiration_sharetypes,
)
ss.setServiceParent(self) ss.setServiceParent(self)
return ss return ss
@ -846,6 +856,14 @@ class _Client(node.Node, pollmixin.PollMixin):
announcement.update(plugins_announcement) announcement.update(plugins_announcement)
if self.config.get_config("storage", "grid_management", default=False, boolean=True):
grid_manager_certificates = self.config.get_grid_manager_certificates()
announcement[u"grid-manager-certificates"] = grid_manager_certificates
# Note: certificates are not verified for validity here, but
# that may be useful. See:
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3977
for ic in self.introducer_clients: for ic in self.introducer_clients:
ic.publish("storage", announcement, self._node_private_key) ic.publish("storage", announcement, self._node_private_key)
@ -896,6 +914,13 @@ class _Client(node.Node, pollmixin.PollMixin):
DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"])) DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"]))
DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"])) DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"]))
DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"])) DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"]))
# At the moment this is only used for testing, thus the janky config
# attribute name.
DEP["max_segment_size"] = int(self.config.get_config(
"client",
"shares._max_immutable_segment_size_for_testing",
DEP["max_segment_size"])
)
# for the CLI to authenticate to local JSON endpoints # for the CLI to authenticate to local JSON endpoints
self._create_auth_token() self._create_auth_token()

View File

@ -13,20 +13,7 @@ cut-and-pasteability. The base62 encoding is shorter than the base32 form,
but the minor usability improvement is not worth the documentation and but the minor usability improvement is not worth the documentation and
specification confusion of using a non-standard encoding. So we stick with specification confusion of using a non-standard encoding. So we stick with
base32. base32.
Ported to Python 3.
''' '''
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import six
from cryptography.exceptions import ( from cryptography.exceptions import (
InvalidSignature, InvalidSignature,
@ -72,7 +59,7 @@ def verifying_key_from_signing_key(private_key):
return private_key.public_key() return private_key.public_key()
def sign_data(private_key, data): def sign_data(private_key, data: bytes) -> bytes:
""" """
Sign the given data using the given private key Sign the given data using the given private key
@ -86,7 +73,7 @@ def sign_data(private_key, data):
""" """
_validate_private_key(private_key) _validate_private_key(private_key)
if not isinstance(data, six.binary_type): if not isinstance(data, bytes):
raise ValueError('data must be bytes') raise ValueError('data must be bytes')
return private_key.sign(data) return private_key.sign(data)
@ -110,7 +97,7 @@ def string_from_signing_key(private_key):
return PRIVATE_KEY_PREFIX + b2a(raw_key_bytes) return PRIVATE_KEY_PREFIX + b2a(raw_key_bytes)
def signing_keypair_from_string(private_key_bytes): def signing_keypair_from_string(private_key_bytes: bytes):
""" """
Load a signing keypair from a string of bytes (which includes the Load a signing keypair from a string of bytes (which includes the
PRIVATE_KEY_PREFIX) PRIVATE_KEY_PREFIX)
@ -118,7 +105,7 @@ def signing_keypair_from_string(private_key_bytes):
:returns: a 2-tuple of (private_key, public_key) :returns: a 2-tuple of (private_key, public_key)
""" """
if not isinstance(private_key_bytes, six.binary_type): if not isinstance(private_key_bytes, bytes):
raise ValueError('private_key_bytes must be bytes') raise ValueError('private_key_bytes must be bytes')
private_key = Ed25519PrivateKey.from_private_bytes( private_key = Ed25519PrivateKey.from_private_bytes(
@ -127,7 +114,7 @@ def signing_keypair_from_string(private_key_bytes):
return private_key, private_key.public_key() return private_key, private_key.public_key()
def verify_signature(public_key, alleged_signature, data): def verify_signature(public_key, alleged_signature: bytes, data: bytes):
""" """
:param public_key: a verifying key :param public_key: a verifying key
@ -139,10 +126,10 @@ def verify_signature(public_key, alleged_signature, data):
:returns: None (or raises an exception). :returns: None (or raises an exception).
""" """
if not isinstance(alleged_signature, six.binary_type): if not isinstance(alleged_signature, bytes):
raise ValueError('alleged_signature must be bytes') raise ValueError('alleged_signature must be bytes')
if not isinstance(data, six.binary_type): if not isinstance(data, bytes):
raise ValueError('data must be bytes') raise ValueError('data must be bytes')
_validate_public_key(public_key) _validate_public_key(public_key)
@ -159,7 +146,7 @@ def verifying_key_from_string(public_key_bytes):
:returns: a public_key :returns: a public_key
""" """
if not isinstance(public_key_bytes, six.binary_type): if not isinstance(public_key_bytes, bytes):
raise ValueError('public_key_bytes must be bytes') raise ValueError('public_key_bytes must be bytes')
return Ed25519PublicKey.from_public_bytes( return Ed25519PublicKey.from_public_bytes(
@ -167,7 +154,7 @@ def verifying_key_from_string(public_key_bytes):
) )
def string_from_verifying_key(public_key): def string_from_verifying_key(public_key) -> bytes:
""" """
Encode a public key to a string of bytes Encode a public key to a string of bytes
@ -183,7 +170,7 @@ def string_from_verifying_key(public_key):
return PUBLIC_KEY_PREFIX + b2a(raw_key_bytes) return PUBLIC_KEY_PREFIX + b2a(raw_key_bytes)
def _validate_public_key(public_key): def _validate_public_key(public_key: Ed25519PublicKey):
""" """
Internal helper. Verify that `public_key` is an appropriate object Internal helper. Verify that `public_key` is an appropriate object
""" """
@ -192,7 +179,7 @@ def _validate_public_key(public_key):
return None return None
def _validate_private_key(private_key): def _validate_private_key(private_key: Ed25519PrivateKey):
""" """
Internal helper. Verify that `private_key` is an appropriate object Internal helper. Verify that `private_key` is an appropriate object
""" """

View File

@ -0,0 +1,495 @@
"""
Functions and classes relating to the Grid Manager internal state
"""
import sys
from datetime import (
datetime,
timezone,
)
from typing import (
Optional,
Union,
List,
)
from twisted.python.filepath import FilePath
from allmydata.crypto import (
ed25519,
)
from allmydata.util import (
base32,
jsonbytes as json,
dictutil,
)
from attrs import (
frozen,
Factory,
)
@frozen
class SignedCertificate(object):
"""
A signed certificate.
"""
# A JSON-encoded, UTF-8-encoded certificate.
certificate : bytes
# The signature (although the signature is in base32 in "public",
# this contains the decoded raw bytes -- not base32)
signature : bytes
@classmethod
def load(cls, file_like):
data = json.load(file_like)
return cls(
certificate=data["certificate"].encode("utf-8"),
signature=base32.a2b(data["signature"].encode("ascii")),
)
def marshal(self):
"""
:return dict: a json-able dict
"""
return dict(
certificate=self.certificate,
signature=base32.b2a(self.signature),
)
@frozen
class _GridManagerStorageServer(object):
"""
A Grid Manager's notion of a storage server
"""
name : str
public_key : ed25519.Ed25519PublicKey
certificates : list = Factory(list) # SignedCertificates
def add_certificate(self, certificate):
"""
Add ``certificate``
"""
self.certificates.append(certificate)
def public_key_string(self) -> bytes:
"""
:returns: the public key as bytes.
"""
return ed25519.string_from_verifying_key(self.public_key)
def marshal(self):
"""
:returns: a dict suitable for JSON representing this object
"""
return {
u"public_key": self.public_key_string(),
}
@frozen
class _GridManagerCertificate(object):
"""
Represents a single certificate for a single storage-server
"""
filename : str
index : int
expires : datetime
public_key : ed25519.Ed25519PublicKey
def create_grid_manager():
"""
Create a new Grid Manager with a fresh keypair
"""
private_key, public_key = ed25519.create_signing_keypair()
return _GridManager(
ed25519.string_from_signing_key(private_key),
{},
)
def current_datetime_with_zone():
"""
:returns: a timezone-aware datetime object representing the
current timestamp in UTC
"""
return datetime.now(timezone.utc)
def _load_certificates_for(config_path: FilePath, name: str, gm_key=Optional[ed25519.Ed25519PublicKey]) -> List[_GridManagerCertificate]:
"""
Load any existing certificates for the given storage-server.
:param FilePath config_path: the configuration location (or None for
stdin)
:param str name: the name of an existing storage-server
:param ed25519.Ed25519PublicKey gm_key: an optional Grid Manager
public key. If provided, certificates will be verified against it.
:returns: list containing any known certificates (may be empty)
:raises: ed25519.BadSignature if any certificate signature fails to verify
"""
cert_index = 0
cert_path = config_path.child('{}.cert.{}'.format(name, cert_index))
certificates = []
while cert_path.exists():
container = SignedCertificate.load(cert_path.open('r'))
if gm_key is not None:
validate_grid_manager_certificate(gm_key, container)
cert_data = json.loads(container.certificate)
if cert_data['version'] != 1:
raise ValueError(
"Unknown certificate version '{}' in '{}'".format(
cert_data['version'],
cert_path.path,
)
)
certificates.append(
_GridManagerCertificate(
filename=cert_path.path,
index=cert_index,
expires=datetime.fromisoformat(cert_data['expires']),
public_key=ed25519.verifying_key_from_string(cert_data['public_key'].encode('ascii')),
)
)
cert_index += 1
cert_path = config_path.child('{}.cert.{}'.format(name, cert_index))
return certificates
def load_grid_manager(config_path: Optional[FilePath]):
"""
Load a Grid Manager from existing configuration.
:param FilePath config_path: the configuration location (or None for
stdin)
:returns: a GridManager instance
:raises: ValueError if the confguration is invalid or IOError if
expected files can't be opened.
"""
if config_path is None:
config_file = sys.stdin
else:
# this might raise IOError or similar but caller must handle it
config_file = config_path.child("config.json").open("r")
with config_file:
config = json.load(config_file)
gm_version = config.get(u'grid_manager_config_version', None)
if gm_version != 0:
raise ValueError(
"Missing or unknown version '{}' of Grid Manager config".format(
gm_version
)
)
if 'private_key' not in config:
raise ValueError(
"'private_key' required in config"
)
private_key_bytes = config['private_key'].encode('ascii')
try:
private_key, public_key = ed25519.signing_keypair_from_string(private_key_bytes)
except Exception as e:
raise ValueError(
"Invalid Grid Manager private_key: {}".format(e)
)
storage_servers = dict()
for name, srv_config in list(config.get(u'storage_servers', {}).items()):
if 'public_key' not in srv_config:
raise ValueError(
"No 'public_key' for storage server '{}'".format(name)
)
storage_servers[name] = _GridManagerStorageServer(
name,
ed25519.verifying_key_from_string(srv_config['public_key'].encode('ascii')),
[] if config_path is None else _load_certificates_for(config_path, name, public_key),
)
return _GridManager(private_key_bytes, storage_servers)
class _GridManager(object):
"""
A Grid Manager's configuration.
"""
def __init__(self, private_key_bytes, storage_servers):
self._storage_servers = dictutil.UnicodeKeyDict(
{} if storage_servers is None else storage_servers
)
assert isinstance(private_key_bytes, bytes)
self._private_key_bytes = private_key_bytes
self._private_key, self._public_key = ed25519.signing_keypair_from_string(self._private_key_bytes)
self._version = 0
@property
def storage_servers(self):
return self._storage_servers
def public_identity(self):
"""
:returns: public key as a string
"""
return ed25519.string_from_verifying_key(self._public_key)
def sign(self, name, expiry):
"""
Create a new signed certificate for a particular server
:param str name: the server to create a certificate for
:param timedelta expiry: how far in the future the certificate
should expire.
:returns SignedCertificate: the signed certificate.
"""
assert isinstance(name, str) # must be unicode
try:
srv = self._storage_servers[name]
except KeyError:
raise KeyError(
"No storage server named '{}'".format(name)
)
expiration = current_datetime_with_zone() + expiry
cert_info = {
"expires": expiration.isoformat(),
"public_key": srv.public_key_string(),
"version": 1,
}
cert_data = json.dumps_bytes(cert_info, separators=(',',':'), sort_keys=True)
sig = ed25519.sign_data(self._private_key, cert_data)
certificate = SignedCertificate(
certificate=cert_data,
signature=sig,
)
vk = ed25519.verifying_key_from_signing_key(self._private_key)
ed25519.verify_signature(vk, sig, cert_data)
srv.add_certificate(certificate)
return certificate
def add_storage_server(self, name, public_key):
"""
:param name: a user-meaningful name for the server
:param public_key: ed25519.VerifyingKey the public-key of the
storage provider (e.g. from the contents of node.pubkey
for the client)
"""
assert isinstance(name, str) # must be unicode
if name in self._storage_servers:
raise KeyError(
"Already have a storage server called '{}'".format(name)
)
ss = _GridManagerStorageServer(name, public_key, [])
self._storage_servers[name] = ss
return ss
def remove_storage_server(self, name):
"""
:param name: a user-meaningful name for the server
"""
assert isinstance(name, str) # must be unicode
try:
del self._storage_servers[name]
except KeyError:
raise KeyError(
"No storage server called '{}'".format(name)
)
def marshal(self):
"""
:returns: a dict suitable for JSON representing this object
"""
data = {
u"grid_manager_config_version": self._version,
u"private_key": self._private_key_bytes.decode('ascii'),
}
if self._storage_servers:
data[u"storage_servers"] = {
name: srv.marshal()
for name, srv
in self._storage_servers.items()
}
return data
def save_grid_manager(file_path, grid_manager, create=True):
"""
Writes a Grid Manager configuration.
:param file_path: a FilePath specifying where to write the config
(if None, stdout is used)
:param grid_manager: a _GridManager instance
:param bool create: if True (the default) we are creating a new
grid-manager and will fail if the directory already exists.
"""
data = json.dumps(
grid_manager.marshal(),
indent=4,
)
if file_path is None:
print("{}\n".format(data))
else:
try:
file_path.makedirs()
file_path.chmod(0o700)
except OSError:
if create:
raise
with file_path.child("config.json").open("w") as f:
f.write(data.encode("utf-8"))
f.write(b"\n")
def parse_grid_manager_certificate(gm_data: Union[str, bytes]):
"""
:param gm_data: some data that might be JSON that might be a valid
Grid Manager Certificate
:returns: json data of a valid Grid Manager certificate, or an
exception if the data is not valid.
"""
required_keys = {
'certificate',
'signature',
}
js = json.loads(gm_data)
if not isinstance(js, dict):
raise ValueError(
"Grid Manager certificate must be a dict"
)
if set(js.keys()) != required_keys:
raise ValueError(
"Grid Manager certificate must contain: {}".format(
", ".join("'{}'".format(k) for k in required_keys),
)
)
return js
def validate_grid_manager_certificate(gm_key, alleged_cert):
"""
:param gm_key: a VerifyingKey instance, a Grid Manager's public
key.
:param alleged_cert SignedCertificate: A signed certificate.
:return: a dict consisting of the deserialized certificate data or
None if the signature is invalid. Note we do NOT check the
expiry time in this function.
"""
try:
ed25519.verify_signature(
gm_key,
alleged_cert.signature,
alleged_cert.certificate,
)
except ed25519.BadSignature:
return None
# signature is valid; now we can load the actual data
cert = json.loads(alleged_cert.certificate)
return cert
def create_grid_manager_verifier(keys, certs, public_key, now_fn=None, bad_cert=None):
"""
Creates a predicate for confirming some Grid Manager-issued
certificates against Grid Manager keys. A predicate is used
(instead of just returning True/False here) so that the
expiry-time can be tested on each call.
:param list keys: 0 or more ``VerifyingKey`` instances
:param list certs: 1 or more Grid Manager certificates each of
which is a ``SignedCertificate``.
:param str public_key: the identifier of the server we expect
certificates for.
:param callable now_fn: a callable which returns the current UTC
timestamp (or current_datetime_with_zone() if None).
:param callable bad_cert: a two-argument callable which is invoked
when a certificate verification fails. The first argument is
the verifying key and the second is the certificate. If None
(the default) errors are print()-ed. Note that we may have
several certificates and only one must be valid, so this may
be called (multiple times) even if the function ultimately
returns successfully.
:returns: a callable which will return True only-if there is at
least one valid certificate (that has not at this moment
expired) in `certs` signed by one of the keys in `keys`.
"""
now_fn = current_datetime_with_zone if now_fn is None else now_fn
valid_certs = []
# if we have zero grid-manager keys then everything is valid
if not keys:
return lambda: True
if bad_cert is None:
def bad_cert(key, alleged_cert):
"""
We might want to let the user know about this failed-to-verify
certificate .. but also if you have multiple grid-managers
then a bunch of these messages would appear. Better would
be to bubble this up to some sort of status API (or maybe
on the Welcome page?)
The only thing that might actually be interesting, though,
is whether this whole function returns false or not..
"""
print(
"Grid Manager certificate signature failed. Certificate: "
"\"{cert}\" for key \"{key}\".".format(
cert=alleged_cert,
key=ed25519.string_from_verifying_key(key),
)
)
# validate the signatures on any certificates we have (not yet the expiry dates)
for alleged_cert in certs:
for key in keys:
cert = validate_grid_manager_certificate(key, alleged_cert)
if cert is not None:
valid_certs.append(cert)
else:
bad_cert(key, alleged_cert)
def validate():
"""
:returns: True if *any* certificate is still valid for a server
"""
now = now_fn()
for cert in valid_certs:
expires = datetime.fromisoformat(cert["expires"])
if cert['public_key'].encode("ascii") == public_key:
if expires > now:
# not-expired
return True
return False
return validate

View File

@ -19,7 +19,7 @@ from foolscap.api import eventually
from allmydata import uri from allmydata import uri
from allmydata.codec import CRSDecoder from allmydata.codec import CRSDecoder
from allmydata.util import base32, log, hashutil, mathutil, observer from allmydata.util import base32, log, hashutil, mathutil, observer
from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
from allmydata.hashtree import IncompleteHashTree, BadHashError, \ from allmydata.hashtree import IncompleteHashTree, BadHashError, \
NotEnoughHashesError NotEnoughHashesError
@ -49,6 +49,8 @@ class DownloadNode(object):
"""Internal class which manages downloads and holds state. External """Internal class which manages downloads and holds state. External
callers use CiphertextFileNode instead.""" callers use CiphertextFileNode instead."""
default_max_segment_size = DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
# Share._node points to me # Share._node points to me
def __init__(self, verifycap, storage_broker, secret_holder, def __init__(self, verifycap, storage_broker, secret_holder,
terminator, history, download_status): terminator, history, download_status):
@ -76,7 +78,7 @@ class DownloadNode(object):
# .guessed_segment_size, .guessed_num_segments, and # .guessed_segment_size, .guessed_num_segments, and
# .ciphertext_hash_tree (with a dummy, to let us guess which hashes # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
# we'll need) # we'll need)
self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE) self._build_guessed_tables(self.default_max_segment_size)
# filled in when we parse a valid UEB # filled in when we parse a valid UEB
self.have_UEB = False self.have_UEB = False

View File

@ -48,7 +48,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \ NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE, IPeerSelector DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE, IPeerSelector
from allmydata.immutable import layout from allmydata.immutable import layout
from io import BytesIO from io import BytesIO
@ -543,7 +543,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# 0. Start with an ordered list of servers. Maybe *2N* of them. # 0. Start with an ordered list of servers. Maybe *2N* of them.
# #
all_servers = storage_broker.get_servers_for_psi(storage_index) all_servers = storage_broker.get_servers_for_psi(storage_index, for_upload=True)
if not all_servers: if not all_servers:
raise NoServersError("client gave us zero servers") raise NoServersError("client gave us zero servers")
@ -1692,7 +1692,7 @@ class AssistedUploader(object):
class BaseUploadable(object): class BaseUploadable(object):
# this is overridden by max_segment_size # this is overridden by max_segment_size
default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE default_max_segment_size = DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
default_params_set = False default_params_set = False
max_segment_size = None max_segment_size = None

View File

@ -41,7 +41,8 @@ URI = StringConstraint(300) # kind of arbitrary
MAX_BUCKETS = 256 # per peer -- zfec offers at most 256 shares per file MAX_BUCKETS = 256 # per peer -- zfec offers at most 256 shares per file
DEFAULT_MAX_SEGMENT_SIZE = 128*1024 # The default size for segments of new CHK ("immutable") uploads.
DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE = 1024*1024
ShareData = StringConstraint(None) ShareData = StringConstraint(None)
URIExtensionData = StringConstraint(1000) URIExtensionData = StringConstraint(1000)
@ -560,6 +561,12 @@ class IServer(IDisplayableServer):
once the connection is lost. once the connection is lost.
""" """
def upload_permitted():
"""
:return: True if we should use this server for uploads, False
otherwise.
"""
def get_storage_server(): def get_storage_server():
""" """
Once a server is connected, I return an ``IStorageServer``. Once a server is connected, I return an ``IStorageServer``.
@ -570,8 +577,6 @@ class IServer(IDisplayableServer):
""" """
class IMutableSlotWriter(Interface): class IMutableSlotWriter(Interface):
""" """
The interface for a writer around a mutable slot on a remote server. The interface for a writer around a mutable slot on a remote server.

View File

@ -35,8 +35,13 @@ from allmydata.mutable.layout import get_version_from_checkstring,\
MDMFSlotWriteProxy, \ MDMFSlotWriteProxy, \
SDMFSlotWriteProxy SDMFSlotWriteProxy
from eliot import (
Message,
start_action,
)
KiB = 1024 KiB = 1024
DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB DEFAULT_MUTABLE_MAX_SEGMENT_SIZE = 128 * KiB
PUSHING_BLOCKS_STATE = 0 PUSHING_BLOCKS_STATE = 0
PUSHING_EVERYTHING_ELSE_STATE = 1 PUSHING_EVERYTHING_ELSE_STATE = 1
DONE_STATE = 2 DONE_STATE = 2
@ -367,7 +372,7 @@ class Publish(object):
self.data = newdata self.data = newdata
self.datalength = newdata.get_size() self.datalength = newdata.get_size()
#if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE: #if self.datalength >= DEFAULT_MUTABLE_MAX_SEGMENT_SIZE:
# self._version = MDMF_VERSION # self._version = MDMF_VERSION
#else: #else:
# self._version = SDMF_VERSION # self._version = SDMF_VERSION
@ -551,7 +556,7 @@ class Publish(object):
def setup_encoding_parameters(self, offset=0): def setup_encoding_parameters(self, offset=0):
if self._version == MDMF_VERSION: if self._version == MDMF_VERSION:
segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default segment_size = DEFAULT_MUTABLE_MAX_SEGMENT_SIZE # 128 KiB by default
else: else:
segment_size = self.datalength # SDMF is only one segment segment_size = self.datalength # SDMF is only one segment
# this must be a multiple of self.required_shares # this must be a multiple of self.required_shares
@ -955,12 +960,31 @@ class Publish(object):
old_assignments.add(server, shnum) old_assignments.add(server, shnum)
serverlist = [] serverlist = []
for i, server in enumerate(self.full_serverlist):
serverid = server.get_serverid() action = start_action(
if server in self.bad_servers: action_type=u"mutable:upload:update_goal",
continue homeless_shares=len(homeless_shares),
entry = (len(old_assignments.get(server, [])), i, serverid, server) )
serverlist.append(entry) with action:
for i, server in enumerate(self.full_serverlist):
serverid = server.get_serverid()
if server in self.bad_servers:
Message.log(
message_type=u"mutable:upload:bad-server",
server_id=serverid,
)
continue
# if we have >= 1 grid-managers, this checks that we have
# a valid certificate for this server
if not server.upload_permitted():
Message.log(
message_type=u"mutable:upload:no-gm-certs",
server_id=serverid,
)
continue
entry = (len(old_assignments.get(server, [])), i, serverid, server)
serverlist.append(entry)
serverlist.sort() serverlist.sort()
if not serverlist: if not serverlist:

View File

@ -14,6 +14,7 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six import ensure_str, ensure_text from six import ensure_str, ensure_text
import json
import datetime import datetime
import os.path import os.path
import re import re
@ -350,6 +351,19 @@ class _Config(object):
"Unable to write config file '{}'".format(fn), "Unable to write config file '{}'".format(fn),
) )
def enumerate_section(self, section):
"""
returns a dict containing all items in a configuration section. an
empty dict is returned if the section doesn't exist.
"""
answer = dict()
try:
for k in self.config.options(section):
answer[k] = self.config.get(section, k)
except configparser.NoSectionError:
pass
return answer
def items(self, section, default=_None): def items(self, section, default=_None):
try: try:
return self.config.items(section) return self.config.items(section)
@ -484,6 +498,12 @@ class _Config(object):
""" """
returns an absolute path inside the 'private' directory with any returns an absolute path inside the 'private' directory with any
extra args join()-ed extra args join()-ed
This exists for historical reasons. New code should ideally
not call this because it makes it harder for e.g. a SQL-based
_Config object to exist. Code that needs to call this method
should probably be a _Config method itself. See
e.g. get_grid_manager_certificates()
""" """
return os.path.join(self._basedir, "private", *args) return os.path.join(self._basedir, "private", *args)
@ -491,6 +511,12 @@ class _Config(object):
""" """
returns an absolute path inside the config directory with any returns an absolute path inside the config directory with any
extra args join()-ed extra args join()-ed
This exists for historical reasons. New code should ideally
not call this because it makes it harder for e.g. a SQL-based
_Config object to exist. Code that needs to call this method
should probably be a _Config method itself. See
e.g. get_grid_manager_certificates()
""" """
# note: we re-expand here (_basedir already went through this # note: we re-expand here (_basedir already went through this
# expanduser function) in case the path we're being asked for # expanduser function) in case the path we're being asked for
@ -499,6 +525,35 @@ class _Config(object):
os.path.join(self._basedir, *args) os.path.join(self._basedir, *args)
) )
def get_grid_manager_certificates(self):
"""
Load all Grid Manager certificates in the config.
:returns: A list of all certificates. An empty list is
returned if there are none.
"""
grid_manager_certificates = []
cert_fnames = list(self.enumerate_section("grid_manager_certificates").values())
for fname in cert_fnames:
fname = self.get_config_path(fname)
if not os.path.exists(fname):
raise ValueError(
"Grid Manager certificate file '{}' doesn't exist".format(
fname
)
)
with open(fname, 'r') as f:
cert = json.load(f)
if set(cert.keys()) != {"certificate", "signature"}:
raise ValueError(
"Unknown key in Grid Manager certificate '{}'".format(
fname
)
)
grid_manager_certificates.append(cert)
return grid_manager_certificates
def get_introducer_configuration(self): def get_introducer_configuration(self):
""" """
Get configuration for introducers. Get configuration for introducers.

View File

@ -89,7 +89,7 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
certificate=cls.tub.myCertificate.original, certificate=cls.tub.myCertificate.original,
) )
http_storage_server = HTTPServer(storage_server, swissnum) http_storage_server = HTTPServer(reactor, storage_server, swissnum)
cls.https_factory = TLSMemoryBIOFactory( cls.https_factory = TLSMemoryBIOFactory(
certificate_options, certificate_options,
False, False,

View File

@ -12,11 +12,6 @@ if PY2:
from six import ensure_binary from six import ensure_binary
try:
from allmydata.scripts.types_ import SubCommands
except ImportError:
pass
from twisted.python import usage from twisted.python import usage
from twisted.python.filepath import ( from twisted.python.filepath import (
FilePath, FilePath,
@ -29,6 +24,14 @@ from allmydata.storage import (
crawler, crawler,
expirer, expirer,
) )
from allmydata.scripts.types_ import SubCommands
from allmydata.client import read_config
from allmydata.grid_manager import (
parse_grid_manager_certificate,
)
from allmydata.scripts.cli import _default_nodedir
from allmydata.util.encodingutil import argv_to_abspath
from allmydata.util import jsonbytes
class GenerateKeypairOptions(BaseOptions): class GenerateKeypairOptions(BaseOptions):
@ -75,6 +78,7 @@ def derive_pubkey(options):
print("public:", str(ed25519.string_from_verifying_key(public_key), "ascii"), file=out) print("public:", str(ed25519.string_from_verifying_key(public_key), "ascii"), file=out)
return 0 return 0
class MigrateCrawlerOptions(BasedirOptions): class MigrateCrawlerOptions(BasedirOptions):
def getSynopsis(self): def getSynopsis(self):
@ -94,6 +98,61 @@ class MigrateCrawlerOptions(BasedirOptions):
return t return t
class AddGridManagerCertOptions(BaseOptions):
"""
Options for add-grid-manager-cert
"""
optParameters = [
['filename', 'f', None, "Filename of the certificate ('-', a dash, for stdin)"],
['name', 'n', None, "Name to give this certificate"],
]
def getSynopsis(self):
return "Usage: tahoe [global-options] admin add-grid-manager-cert [options]"
def postOptions(self) -> None:
if self['name'] is None:
raise usage.UsageError(
"Must provide --name option"
)
if self['filename'] is None:
raise usage.UsageError(
"Must provide --filename option"
)
data: str
if self['filename'] == '-':
print("reading certificate from stdin", file=self.parent.parent.stderr)
data = self.parent.parent.stdin.read()
if len(data) == 0:
raise usage.UsageError(
"Reading certificate from stdin failed"
)
else:
with open(self['filename'], 'r') as f:
data = f.read()
try:
self.certificate_data = parse_grid_manager_certificate(data)
except ValueError as e:
raise usage.UsageError(
"Error parsing certificate: {}".format(e)
)
def getUsage(self, width=None):
t = BaseOptions.getUsage(self, width)
t += (
"Adds a Grid Manager certificate to a Storage Server.\n\n"
"The certificate will be copied into the base-dir and config\n"
"will be added to 'tahoe.cfg', which will be re-written. A\n"
"restart is required for changes to take effect.\n\n"
"The human who operates a Grid Manager would produce such a\n"
"certificate and communicate it securely to you.\n"
)
return t
def migrate_crawler(options): def migrate_crawler(options):
out = options.stdout out = options.stdout
storage = FilePath(options['basedir']).child("storage") storage = FilePath(options['basedir']).child("storage")
@ -116,6 +175,44 @@ def migrate_crawler(options):
print("Not found: '{}'".format(fp.path), file=out) print("Not found: '{}'".format(fp.path), file=out)
def add_grid_manager_cert(options):
"""
Add a new Grid Manager certificate to our config
"""
# XXX is there really not already a function for this?
if options.parent.parent['node-directory']:
nd = argv_to_abspath(options.parent.parent['node-directory'])
else:
nd = _default_nodedir
config = read_config(nd, "portnum")
cert_fname = "{}.cert".format(options['name'])
cert_path = FilePath(config.get_config_path(cert_fname))
cert_bytes = jsonbytes.dumps_bytes(options.certificate_data, indent=4) + b'\n'
cert_name = options['name']
if cert_path.exists():
msg = "Already have certificate for '{}' (at {})".format(
options['name'],
cert_path.path,
)
print(msg, file=options.stderr)
return 1
config.set_config("storage", "grid_management", "True")
config.set_config("grid_manager_certificates", cert_name, cert_fname)
# write all the data out
with cert_path.open("wb") as f:
f.write(cert_bytes)
cert_count = len(config.enumerate_section("grid_manager_certificates"))
print("There are now {} certificates".format(cert_count),
file=options.stderr)
return 0
class AdminCommand(BaseOptions): class AdminCommand(BaseOptions):
subCommands = [ subCommands = [
("generate-keypair", None, GenerateKeypairOptions, ("generate-keypair", None, GenerateKeypairOptions,
@ -124,6 +221,9 @@ class AdminCommand(BaseOptions):
"Derive a public key from a private key."), "Derive a public key from a private key."),
("migrate-crawler", None, MigrateCrawlerOptions, ("migrate-crawler", None, MigrateCrawlerOptions,
"Write the crawler-history data as JSON."), "Write the crawler-history data as JSON."),
("add-grid-manager-cert", None, AddGridManagerCertOptions,
"Add a Grid Manager-provided certificate to a storage "
"server's config."),
] ]
def postOptions(self): def postOptions(self):
if not hasattr(self, 'subOptions'): if not hasattr(self, 'subOptions'):
@ -138,11 +238,14 @@ each subcommand.
""" """
return t return t
subDispatch = { subDispatch = {
"generate-keypair": print_keypair, "generate-keypair": print_keypair,
"derive-pubkey": derive_pubkey, "derive-pubkey": derive_pubkey,
"migrate-crawler": migrate_crawler, "migrate-crawler": migrate_crawler,
} "add-grid-manager-cert": add_grid_manager_cert,
}
def do_admin(options): def do_admin(options):
so = options.subOptions so = options.subOptions
@ -158,4 +261,4 @@ subCommands = [
dispatch = { dispatch = {
"admin": do_admin, "admin": do_admin,
} }

View File

@ -165,6 +165,8 @@ def parse_or_exit(config, argv, stdout, stderr):
:return: ``config``, after using it to parse the argument list. :return: ``config``, after using it to parse the argument list.
""" """
try: try:
config.stdout = stdout
config.stderr = stderr
parse_options(argv[1:], config=config) parse_options(argv[1:], config=config)
except usage.error as e: except usage.error as e:
# `parse_options` may have the side-effect of initializing a # `parse_options` may have the side-effect of initializing a
@ -199,6 +201,7 @@ def dispatch(config,
so.stdout = stdout so.stdout = stdout
so.stderr = stderr so.stderr = stderr
so.stdin = stdin so.stdin = stdin
config.stdin = stdin
if command in create_dispatch: if command in create_dispatch:
f = create_dispatch[command] f = create_dispatch[command]

View File

@ -2,8 +2,6 @@
Type definitions used by modules in this package. Type definitions used by modules in this package.
""" """
# Python 3 only
from typing import List, Tuple, Type, Sequence, Any from typing import List, Tuple, Type, Sequence, Any
from twisted.python.usage import Options from twisted.python.usage import Options

View File

@ -24,6 +24,7 @@ from twisted.internet.interfaces import (
from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
from twisted.internet.interfaces import IReactorFromThreads
from twisted.web.server import Site, Request from twisted.web.server import Site, Request
from twisted.protocols.tls import TLSMemoryBIOFactory from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.filepath import FilePath from twisted.python.filepath import FilePath
@ -56,6 +57,8 @@ from .common import si_a2b
from .immutable import BucketWriter, ConflictingWriteError from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet from ..util.base32 import rfc3548_alphabet
from ..util.deferredutil import async_to_deferred
from ..util.cputhreadpool import defer_to_thread
from allmydata.interfaces import BadWriteEnablerError from allmydata.interfaces import BadWriteEnablerError
@ -486,8 +489,12 @@ class HTTPServer(object):
return str(failure.value).encode("utf-8") return str(failure.value).encode("utf-8")
def __init__( def __init__(
self, storage_server, swissnum self,
): # type: (StorageServer, bytes) -> None reactor: IReactorFromThreads,
storage_server: StorageServer,
swissnum: bytes,
):
self._reactor = reactor
self._storage_server = storage_server self._storage_server = storage_server
self._swissnum = swissnum self._swissnum = swissnum
# Maps storage index to StorageIndexUploads: # Maps storage index to StorageIndexUploads:
@ -529,7 +536,7 @@ class HTTPServer(object):
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861 # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE) raise _HTTPError(http.NOT_ACCEPTABLE)
def _read_encoded( async def _read_encoded(
self, request, schema: Schema, max_size: int = 1024 * 1024 self, request, schema: Schema, max_size: int = 1024 * 1024
) -> Any: ) -> Any:
""" """
@ -542,10 +549,11 @@ class HTTPServer(object):
raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE) raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
# Make sure it's not too large: # Make sure it's not too large:
request.content.seek(SEEK_END, 0) request.content.seek(0, SEEK_END)
if request.content.tell() > max_size: size = request.content.tell()
if size > max_size:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE) raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
request.content.seek(SEEK_SET, 0) request.content.seek(0, SEEK_SET)
# We don't want to load the whole message into memory, cause it might # We don't want to load the whole message into memory, cause it might
# be quite large. The CDDL validator takes a read-only bytes-like # be quite large. The CDDL validator takes a read-only bytes-like
@ -562,12 +570,21 @@ class HTTPServer(object):
message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ) message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
else: else:
message = request.content.read() message = request.content.read()
schema.validate_cbor(message)
# Pycddl will release the GIL when validating larger documents, so
# let's take advantage of multiple CPUs:
if size > 10_000:
await defer_to_thread(self._reactor, schema.validate_cbor, message)
else:
schema.validate_cbor(message)
# The CBOR parser will allocate more memory, but at least we can feed # The CBOR parser will allocate more memory, but at least we can feed
# it the file-like object, so that if it's large it won't be make two # it the file-like object, so that if it's large it won't be make two
# copies. # copies.
request.content.seek(SEEK_SET, 0) request.content.seek(SEEK_SET, 0)
# Typically deserialization to Python will not release the GIL, and
# indeed as of Jan 2023 cbor2 didn't have any code to release the GIL
# in the decode path. As such, running it in a different thread has no benefit.
return cbor2.load(request.content) return cbor2.load(request.content)
##### Generic APIs ##### ##### Generic APIs #####
@ -585,10 +602,14 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>", "/storage/v1/immutable/<storage_index:storage_index>",
methods=["POST"], methods=["POST"],
) )
def allocate_buckets(self, request, authorization, storage_index): @async_to_deferred
async def allocate_buckets(self, request, authorization, storage_index):
"""Allocate buckets.""" """Allocate buckets."""
upload_secret = authorization[Secrets.UPLOAD] upload_secret = authorization[Secrets.UPLOAD]
info = self._read_encoded(request, _SCHEMAS["allocate_buckets"]) # It's just a list of up to ~256 shares, shouldn't use many bytes.
info = await self._read_encoded(
request, _SCHEMAS["allocate_buckets"], max_size=8192
)
# We do NOT validate the upload secret for existing bucket uploads. # We do NOT validate the upload secret for existing bucket uploads.
# Another upload may be happening in parallel, with a different upload # Another upload may be happening in parallel, with a different upload
@ -610,7 +631,7 @@ class HTTPServer(object):
storage_index, share_number, upload_secret, bucket storage_index, share_number, upload_secret, bucket
) )
return self._send_encoded( return await self._send_encoded(
request, request,
{"already-have": set(already_got), "allocated": set(sharenum_to_bucket)}, {"already-have": set(already_got), "allocated": set(sharenum_to_bucket)},
) )
@ -745,7 +766,8 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt", "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"], methods=["POST"],
) )
def advise_corrupt_share_immutable( @async_to_deferred
async def advise_corrupt_share_immutable(
self, request, authorization, storage_index, share_number self, request, authorization, storage_index, share_number
): ):
"""Indicate that given share is corrupt, with a text reason.""" """Indicate that given share is corrupt, with a text reason."""
@ -754,7 +776,11 @@ class HTTPServer(object):
except KeyError: except KeyError:
raise _HTTPError(http.NOT_FOUND) raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"]) # The reason can be a string with explanation, so in theory it could be
# longish?
info = await self._read_encoded(
request, _SCHEMAS["advise_corrupt_share"], max_size=32768,
)
bucket.advise_corrupt_share(info["reason"].encode("utf-8")) bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
return b"" return b""
@ -766,9 +792,10 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/read-test-write", "/storage/v1/mutable/<storage_index:storage_index>/read-test-write",
methods=["POST"], methods=["POST"],
) )
def mutable_read_test_write(self, request, authorization, storage_index): @async_to_deferred
async def mutable_read_test_write(self, request, authorization, storage_index):
"""Read/test/write combined operation for mutables.""" """Read/test/write combined operation for mutables."""
rtw_request = self._read_encoded( rtw_request = await self._read_encoded(
request, _SCHEMAS["mutable_read_test_write"], max_size=2**48 request, _SCHEMAS["mutable_read_test_write"], max_size=2**48
) )
secrets = ( secrets = (
@ -795,7 +822,9 @@ class HTTPServer(object):
) )
except BadWriteEnablerError: except BadWriteEnablerError:
raise _HTTPError(http.UNAUTHORIZED) raise _HTTPError(http.UNAUTHORIZED)
return self._send_encoded(request, {"success": success, "data": read_data}) return await self._send_encoded(
request, {"success": success, "data": read_data}
)
@_authorized_route( @_authorized_route(
_app, _app,
@ -840,7 +869,8 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt", "/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"], methods=["POST"],
) )
def advise_corrupt_share_mutable( @async_to_deferred
async def advise_corrupt_share_mutable(
self, request, authorization, storage_index, share_number self, request, authorization, storage_index, share_number
): ):
"""Indicate that given share is corrupt, with a text reason.""" """Indicate that given share is corrupt, with a text reason."""
@ -849,7 +879,11 @@ class HTTPServer(object):
}: }:
raise _HTTPError(http.NOT_FOUND) raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"]) # The reason can be a string with explanation, so in theory it could be
# longish?
info = await self._read_encoded(
request, _SCHEMAS["advise_corrupt_share"], max_size=32768
)
self._storage_server.advise_corrupt_share( self._storage_server.advise_corrupt_share(
b"mutable", storage_index, share_number, info["reason"].encode("utf-8") b"mutable", storage_index, share_number, info["reason"].encode("utf-8")
) )

View File

@ -33,9 +33,13 @@ Ported to Python 3.
from __future__ import annotations from __future__ import annotations
from six import ensure_text from six import ensure_text
from typing import Union
import re, time, hashlib from typing import Union, Any
from os import urandom from os import urandom
import re
import time
import hashlib
from configparser import NoSectionError from configparser import NoSectionError
import attr import attr
@ -67,6 +71,12 @@ from allmydata.interfaces import (
IStorageServer, IStorageServer,
IFoolscapStoragePlugin, IFoolscapStoragePlugin,
) )
from allmydata.grid_manager import (
create_grid_manager_verifier,
)
from allmydata.crypto import (
ed25519,
)
from allmydata.util import log, base32, connection_status from allmydata.util import log, base32, connection_status
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.observer import ObserverList from allmydata.util.observer import ObserverList
@ -79,6 +89,7 @@ from allmydata.storage.http_client import (
ClientException as HTTPClientException, StorageClientMutables, ClientException as HTTPClientException, StorageClientMutables,
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException
) )
from .node import _Config
ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs" ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs"
@ -112,9 +123,15 @@ class StorageClientConfig(object):
:ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from :ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from
names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the
respective configuration. respective configuration.
:ivar list[ed25519.VerifyKey] grid_manager_keys: with no keys in
this list, we'll upload to any storage server. Otherwise, we will
only upload to a storage-server that has a valid certificate
signed by at least one of these keys.
""" """
preferred_peers = attr.ib(default=()) preferred_peers = attr.ib(default=())
storage_plugins = attr.ib(default=attr.Factory(dict)) storage_plugins = attr.ib(default=attr.Factory(dict))
grid_manager_keys = attr.ib(default=attr.Factory(list))
@classmethod @classmethod
def from_node_config(cls, config): def from_node_config(cls, config):
@ -146,9 +163,17 @@ class StorageClientConfig(object):
plugin_config = [] plugin_config = []
storage_plugins[plugin_name] = dict(plugin_config) storage_plugins[plugin_name] = dict(plugin_config)
grid_manager_keys = []
for name, gm_key in config.enumerate_section('grid_managers').items():
grid_manager_keys.append(
ed25519.verifying_key_from_string(gm_key.encode("ascii"))
)
return cls( return cls(
preferred_peers, preferred_peers,
storage_plugins, storage_plugins,
grid_manager_keys,
) )
@ -175,7 +200,7 @@ class StorageFarmBroker(service.MultiService):
self, self,
permute_peers, permute_peers,
tub_maker, tub_maker,
node_config, node_config: _Config,
storage_client_config=None, storage_client_config=None,
): ):
service.MultiService.__init__(self) service.MultiService.__init__(self)
@ -194,9 +219,9 @@ class StorageFarmBroker(service.MultiService):
# own Reconnector, and will give us a RemoteReference when we ask # own Reconnector, and will give us a RemoteReference when we ask
# them for it. # them for it.
self.servers = BytesKeyDict() self.servers = BytesKeyDict()
self._static_server_ids = set() # ignore announcements for these self._static_server_ids : set[bytes] = set() # ignore announcements for these
self.introducer_client = None self.introducer_client = None
self._threshold_listeners = [] # tuples of (threshold, Deferred) self._threshold_listeners : list[tuple[float,defer.Deferred[Any]]]= [] # tuples of (threshold, Deferred)
self._connected_high_water_mark = 0 self._connected_high_water_mark = 0
@log_call(action_type=u"storage-client:broker:set-static-servers") @log_call(action_type=u"storage-client:broker:set-static-servers")
@ -250,6 +275,16 @@ class StorageFarmBroker(service.MultiService):
in self.storage_client_config.storage_plugins.items() in self.storage_client_config.storage_plugins.items()
}) })
@staticmethod
def _should_we_use_http(node_config: _Config, announcement: dict) -> bool:
"""
Given an announcement dictionary and config, return whether we should
connect to storage server over HTTP.
"""
return not node_config.get_config(
"client", "force_foolscap", default=True, boolean=True,
) and len(announcement.get(ANONYMOUS_STORAGE_NURLS, [])) > 0
@log_call( @log_call(
action_type=u"storage-client:broker:make-storage-server", action_type=u"storage-client:broker:make-storage-server",
include_args=["server_id"], include_args=["server_id"],
@ -269,10 +304,21 @@ class StorageFarmBroker(service.MultiService):
by the given announcement. by the given announcement.
""" """
assert isinstance(server_id, bytes) assert isinstance(server_id, bytes)
if len(server["ann"].get(ANONYMOUS_STORAGE_NURLS, [])) > 0: gm_verifier = create_grid_manager_verifier(
s = HTTPNativeStorageServer(server_id, server["ann"]) self.storage_client_config.grid_manager_keys,
server["ann"].get("grid-manager-certificates", []),
"pub-{}".format(str(server_id, "ascii")), # server_id is v0-<key> not pub-v0-key .. for reasons?
)
if self._should_we_use_http(self.node_config, server["ann"]):
s = HTTPNativeStorageServer(
server_id,
server["ann"],
grid_manager_verifier=gm_verifier,
)
s.on_status_changed(lambda _: self._got_connection()) s.on_status_changed(lambda _: self._got_connection())
return s return s
handler_overrides = server.get("connections", {}) handler_overrides = server.get("connections", {})
s = NativeStorageServer( s = NativeStorageServer(
server_id, server_id,
@ -281,6 +327,7 @@ class StorageFarmBroker(service.MultiService):
handler_overrides, handler_overrides,
self.node_config, self.node_config,
self.storage_client_config, self.storage_client_config,
gm_verifier,
) )
s.on_status_changed(lambda _: self._got_connection()) s.on_status_changed(lambda _: self._got_connection())
return s return s
@ -429,11 +476,26 @@ class StorageFarmBroker(service.MultiService):
for dsc in list(self.servers.values()): for dsc in list(self.servers.values()):
dsc.try_to_connect() dsc.try_to_connect()
def get_servers_for_psi(self, peer_selection_index): def get_servers_for_psi(self, peer_selection_index, for_upload=False):
"""
:param for_upload: used to determine if we should include any
servers that are invalid according to Grid Manager
processing. When for_upload is True and we have any Grid
Manager keys configured, any storage servers with invalid or
missing certificates will be excluded.
"""
# return a list of server objects (IServers) # return a list of server objects (IServers)
assert self.permute_peers == True assert self.permute_peers == True
connected_servers = self.get_connected_servers() connected_servers = self.get_connected_servers()
preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers) preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
if for_upload:
# print("upload processing: {}".format([srv.upload_permitted() for srv in connected_servers]))
connected_servers = [
srv
for srv in connected_servers
if srv.upload_permitted()
]
def _permuted(server): def _permuted(server):
seed = server.get_permutation_seed() seed = server.get_permutation_seed()
is_unpreferred = server not in preferred_servers is_unpreferred = server not in preferred_servers
@ -609,9 +671,10 @@ class _FoolscapStorage(object):
{"permutation-seed-base32": "...", {"permutation-seed-base32": "...",
"nickname": "...", "nickname": "...",
"grid-manager-certificates": [..],
} }
*nickname* is optional. *nickname* and *grid-manager-certificates* are optional.
The furl will be a Unicode string on Python 3; on Python 2 it will be The furl will be a Unicode string on Python 3; on Python 2 it will be
either a native (bytes) string or a Unicode string. either a native (bytes) string or a Unicode string.
@ -741,7 +804,8 @@ class NativeStorageServer(service.MultiService):
"application-version": "unknown: no get_version()", "application-version": "unknown: no get_version()",
}) })
def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=StorageClientConfig()): def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=None,
grid_manager_verifier=None):
service.MultiService.__init__(self) service.MultiService.__init__(self)
assert isinstance(server_id, bytes) assert isinstance(server_id, bytes)
self._server_id = server_id self._server_id = server_id
@ -749,6 +813,11 @@ class NativeStorageServer(service.MultiService):
self._tub_maker = tub_maker self._tub_maker = tub_maker
self._handler_overrides = handler_overrides self._handler_overrides = handler_overrides
if config is None:
config = StorageClientConfig()
self._grid_manager_verifier = grid_manager_verifier
self._storage = self._make_storage_system(node_config, config, ann) self._storage = self._make_storage_system(node_config, config, ann)
self.last_connect_time = None self.last_connect_time = None
@ -759,6 +828,21 @@ class NativeStorageServer(service.MultiService):
self._trigger_cb = None self._trigger_cb = None
self._on_status_changed = ObserverList() self._on_status_changed = ObserverList()
def upload_permitted(self):
"""
If our client is configured with Grid Manager public-keys, we will
only upload to storage servers that have a currently-valid
certificate signed by at least one of the Grid Managers we
accept.
:return: True if we should use this server for uploads, False
otherwise.
"""
# if we have no Grid Manager keys configured, choice is easy
if self._grid_manager_verifier is None:
return True
return self._grid_manager_verifier()
def _make_storage_system(self, node_config, config, ann): def _make_storage_system(self, node_config, config, ann):
""" """
:param allmydata.node._Config node_config: The node configuration to pass :param allmydata.node._Config node_config: The node configuration to pass
@ -945,13 +1029,14 @@ class HTTPNativeStorageServer(service.MultiService):
"connected". "connected".
""" """
def __init__(self, server_id: bytes, announcement, reactor=reactor): def __init__(self, server_id: bytes, announcement, reactor=reactor, grid_manager_verifier=None):
service.MultiService.__init__(self) service.MultiService.__init__(self)
assert isinstance(server_id, bytes) assert isinstance(server_id, bytes)
self._server_id = server_id self._server_id = server_id
self.announcement = announcement self.announcement = announcement
self._on_status_changed = ObserverList() self._on_status_changed = ObserverList()
self._reactor = reactor self._reactor = reactor
self._grid_manager_verifier = grid_manager_verifier
furl = announcement["anonymous-storage-FURL"].encode("utf-8") furl = announcement["anonymous-storage-FURL"].encode("utf-8")
( (
self._nickname, self._nickname,
@ -1001,6 +1086,21 @@ class HTTPNativeStorageServer(service.MultiService):
""" """
return self._on_status_changed.subscribe(status_changed) return self._on_status_changed.subscribe(status_changed)
def upload_permitted(self):
"""
If our client is configured with Grid Manager public-keys, we will
only upload to storage servers that have a currently-valid
certificate signed by at least one of the Grid Managers we
accept.
:return: True if we should use this server for uploads, False
otherwise.
"""
# if we have no Grid Manager keys configured, choice is easy
if self._grid_manager_verifier is None:
return True
return self._grid_manager_verifier()
# Special methods used by copy.copy() and copy.deepcopy(). When those are # Special methods used by copy.copy() and copy.deepcopy(). When those are
# used in allmydata.immutable.filenode to copy CheckResults during # used in allmydata.immutable.filenode to copy CheckResults during
# repair, we want it to treat the IServer instances as singletons, and # repair, we want it to treat the IServer instances as singletons, and

View File

@ -10,26 +10,33 @@ from future.utils import PY2
if PY2: if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six.moves import StringIO # We're going to override stdin/stderr, so want to match their behavior on respective Python versions.
from io import StringIO
from twisted.python.usage import (
UsageError,
)
from twisted.python.filepath import (
FilePath,
)
from testtools.matchers import ( from testtools.matchers import (
Contains, Contains,
) )
from twisted.python.filepath import (
FilePath,
)
from allmydata.scripts.admin import ( from allmydata.scripts.admin import (
migrate_crawler, migrate_crawler,
add_grid_manager_cert,
) )
from allmydata.scripts.runner import ( from allmydata.scripts.runner import (
Options, Options,
) )
from allmydata.util import jsonbytes as json
from ..common import ( from ..common import (
SyncTestCase, SyncTestCase,
) )
class AdminMigrateCrawler(SyncTestCase): class AdminMigrateCrawler(SyncTestCase):
""" """
Tests related to 'tahoe admin migrate-crawler' Tests related to 'tahoe admin migrate-crawler'
@ -85,3 +92,162 @@ class AdminMigrateCrawler(SyncTestCase):
str(options), str(options),
Contains("security issues with pickle") Contains("security issues with pickle")
) )
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":1}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
}
class AddCertificateOptions(SyncTestCase):
"""
Tests for 'tahoe admin add-grid-manager-cert' option validation
"""
def setUp(self):
self.tahoe = Options()
return super(AddCertificateOptions, self).setUp()
def test_parse_no_data(self):
"""
When no data is passed to stdin an error is produced
"""
self.tahoe.stdin = StringIO("")
self.tahoe.stderr = StringIO() # suppress message
with self.assertRaises(UsageError) as ctx:
self.tahoe.parseOptions(
[
"admin", "add-grid-manager-cert",
"--name", "random-name",
"--filename", "-",
]
)
self.assertIn(
"Reading certificate from stdin failed",
str(ctx.exception)
)
def test_read_cert_file(self):
"""
A certificate can be read from a file
"""
tmp = self.mktemp()
with open(tmp, "wb") as f:
f.write(json.dumps_bytes(fake_cert))
# certificate should be loaded
self.tahoe.parseOptions(
[
"admin", "add-grid-manager-cert",
"--name", "random-name",
"--filename", tmp,
]
)
opts = self.tahoe.subOptions.subOptions
self.assertEqual(
fake_cert,
opts.certificate_data
)
def test_bad_certificate(self):
"""
Unparseable data produces an error
"""
self.tahoe.stdin = StringIO("{}")
self.tahoe.stderr = StringIO() # suppress message
with self.assertRaises(UsageError) as ctx:
self.tahoe.parseOptions(
[
"admin", "add-grid-manager-cert",
"--name", "random-name",
"--filename", "-",
]
)
self.assertIn(
"Grid Manager certificate must contain",
str(ctx.exception)
)
class AddCertificateCommand(SyncTestCase):
"""
Tests for 'tahoe admin add-grid-manager-cert' operation
"""
def setUp(self):
self.tahoe = Options()
self.node_path = FilePath(self.mktemp())
self.node_path.makedirs()
with self.node_path.child("tahoe.cfg").open("w") as f:
f.write(b"# minimal test config\n")
return super(AddCertificateCommand, self).setUp()
def test_add_one(self):
"""
Adding a certificate succeeds
"""
self.tahoe.stdin = StringIO(json.dumps(fake_cert))
self.tahoe.stderr = StringIO()
self.tahoe.parseOptions(
[
"--node-directory", self.node_path.path,
"admin", "add-grid-manager-cert",
"--name", "zero",
"--filename", "-",
]
)
self.tahoe.subOptions.subOptions.stdin = self.tahoe.stdin
self.tahoe.subOptions.subOptions.stderr = self.tahoe.stderr
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
self.assertEqual(rc, 0)
self.assertEqual(
{"zero.cert", "tahoe.cfg"},
set(self.node_path.listdir())
)
self.assertIn(
"There are now 1 certificates",
self.tahoe.stderr.getvalue()
)
def test_add_two(self):
"""
An error message is produced when adding a certificate with a
duplicate name.
"""
self.tahoe.stdin = StringIO(json.dumps(fake_cert))
self.tahoe.stderr = StringIO()
self.tahoe.parseOptions(
[
"--node-directory", self.node_path.path,
"admin", "add-grid-manager-cert",
"--name", "zero",
"--filename", "-",
]
)
self.tahoe.subOptions.subOptions.stdin = self.tahoe.stdin
self.tahoe.subOptions.subOptions.stderr = self.tahoe.stderr
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
self.assertEqual(rc, 0)
self.tahoe.stdin = StringIO(json.dumps(fake_cert))
self.tahoe.parseOptions(
[
"--node-directory", self.node_path.path,
"admin", "add-grid-manager-cert",
"--name", "zero",
"--filename", "-",
]
)
self.tahoe.subOptions.subOptions.stdin = self.tahoe.stdin
self.tahoe.subOptions.subOptions.stderr = self.tahoe.stderr
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
self.assertEqual(rc, 1)
self.assertIn(
"Already have certificate for 'zero'",
self.tahoe.stderr.getvalue()
)

View File

@ -0,0 +1,314 @@
"""
Tests for the grid manager CLI.
"""
import os
from io import (
BytesIO,
)
from unittest import (
skipIf,
)
from twisted.trial.unittest import (
TestCase,
)
from allmydata.cli.grid_manager import (
grid_manager,
)
import click.testing
# these imports support the tests for `tahoe *` subcommands
from ..common_util import (
run_cli,
)
from twisted.internet.defer import (
inlineCallbacks,
)
from twisted.python.filepath import (
FilePath,
)
from twisted.python.runtime import (
platform,
)
from allmydata.util import jsonbytes as json
class GridManagerCommandLine(TestCase):
"""
Test the mechanics of the `grid-manager` command
"""
def setUp(self):
self.runner = click.testing.CliRunner()
super(GridManagerCommandLine, self).setUp()
def invoke_and_check(self, *args, **kwargs):
"""Invoke a command with the runner and ensure it succeeded."""
result = self.runner.invoke(*args, **kwargs)
if result.exception is not None:
raise result.exc_info[1].with_traceback(result.exc_info[2])
self.assertEqual(result.exit_code, 0, result)
return result
def test_create(self):
"""
Create a new grid-manager
"""
with self.runner.isolated_filesystem():
result = self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.assertEqual(["foo"], os.listdir("."))
self.assertEqual(["config.json"], os.listdir("./foo"))
result = self.invoke_and_check(grid_manager, ["--config", "foo", "public-identity"])
self.assertTrue(result.output.startswith("pub-v0-"))
def test_load_invalid(self):
"""
An invalid config is reported to the user
"""
with self.runner.isolated_filesystem():
with open("config.json", "wb") as f:
f.write(json.dumps_bytes({"not": "valid"}))
result = self.runner.invoke(grid_manager, ["--config", ".", "public-identity"])
self.assertNotEqual(result.exit_code, 0)
self.assertIn(
"Error loading Grid Manager",
result.output,
)
def test_create_already(self):
"""
It's an error to create a new grid-manager in an existing
directory.
"""
with self.runner.isolated_filesystem():
result = self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
result = self.runner.invoke(grid_manager, ["--config", "foo", "create"])
self.assertEqual(1, result.exit_code)
self.assertIn(
"Can't create",
result.stdout,
)
def test_create_stdout(self):
"""
Create a new grid-manager with no files
"""
with self.runner.isolated_filesystem():
result = self.invoke_and_check(grid_manager, ["--config", "-", "create"])
self.assertEqual([], os.listdir("."))
config = json.loads(result.output)
self.assertEqual(
{"private_key", "grid_manager_config_version"},
set(config.keys()),
)
def test_list_stdout(self):
"""
Load Grid Manager without files (using 'list' subcommand, but any will do)
"""
config = {
"storage_servers": {
"storage0": {
"public_key": "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
}
},
"private_key": "priv-v0-6uinzyaxy3zvscwgsps5pxcfezhrkfb43kvnrbrhhfzyduyqnniq",
"grid_manager_config_version": 0
}
result = self.invoke_and_check(
grid_manager, ["--config", "-", "list"],
input=BytesIO(json.dumps_bytes(config)),
)
self.assertEqual(result.exit_code, 0)
self.assertEqual(
"storage0: pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\n",
result.output,
)
def test_add_and_sign(self):
"""
Add a new storage-server and sign a certificate for it
"""
pubkey = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey])
result = self.invoke_and_check(grid_manager, ["--config", "foo", "sign", "storage0", "10"])
sigcert = json.loads(result.output)
self.assertEqual({"certificate", "signature"}, set(sigcert.keys()))
cert = json.loads(sigcert['certificate'])
self.assertEqual(cert["public_key"], pubkey)
def test_add_and_sign_second_cert(self):
"""
Add a new storage-server and sign two certificates.
"""
pubkey = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey])
self.invoke_and_check(grid_manager, ["--config", "foo", "sign", "storage0", "10"])
self.invoke_and_check(grid_manager, ["--config", "foo", "sign", "storage0", "10"])
# we should now have two certificates stored
self.assertEqual(
set(FilePath("foo").listdir()),
{'storage0.cert.1', 'storage0.cert.0', 'config.json'},
)
def test_add_twice(self):
"""
An error is reported trying to add an existing server
"""
pubkey0 = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
pubkey1 = "pub-v0-5ysc55trfvfvg466v46j4zmfyltgus3y2gdejifctv7h4zkuyveq"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey0])
result = self.runner.invoke(grid_manager, ["--config", "foo", "add", "storage0", pubkey1])
self.assertNotEquals(result.exit_code, 0)
self.assertIn(
"A storage-server called 'storage0' already exists",
result.output,
)
def test_add_list_remove(self):
"""
Add a storage server, list it, remove it.
"""
pubkey = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey])
self.invoke_and_check(grid_manager, ["--config", "foo", "sign", "storage0", "1"])
result = self.invoke_and_check(grid_manager, ["--config", "foo", "list"])
names = [
line.split(':')[0]
for line in result.output.strip().split('\n')
if not line.startswith(" ") # "cert" lines start with whitespace
]
self.assertEqual(names, ["storage0"])
self.invoke_and_check(grid_manager, ["--config", "foo", "remove", "storage0"])
result = self.invoke_and_check(grid_manager, ["--config", "foo", "list"])
self.assertEqual(result.output.strip(), "")
def test_remove_missing(self):
"""
Error reported when removing non-existant server
"""
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
result = self.runner.invoke(grid_manager, ["--config", "foo", "remove", "storage0"])
self.assertNotEquals(result.exit_code, 0)
self.assertIn(
"No storage-server called 'storage0' exists",
result.output,
)
def test_sign_missing(self):
"""
Error reported when signing non-existant server
"""
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
result = self.runner.invoke(grid_manager, ["--config", "foo", "sign", "storage0", "42"])
self.assertNotEquals(result.exit_code, 0)
self.assertIn(
"No storage-server called 'storage0' exists",
result.output,
)
@skipIf(not platform.isLinux(), "I only know how permissions work on linux")
def test_sign_bad_perms(self):
"""
Error reported if we can't create certificate file
"""
pubkey = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey])
# make the directory un-writable (so we can't create a new cert)
os.chmod("foo", 0o550)
result = self.runner.invoke(grid_manager, ["--config", "foo", "sign", "storage0", "42"])
self.assertEquals(result.exit_code, 1)
self.assertIn(
"Permission denied",
result.output,
)
class TahoeAddGridManagerCert(TestCase):
"""
Test `tahoe admin add-grid-manager-cert` subcommand
"""
@inlineCallbacks
def test_help(self):
"""
some kind of help is printed
"""
code, out, err = yield run_cli("admin", "add-grid-manager-cert")
self.assertEqual(err, "")
self.assertNotEqual(0, code)
@inlineCallbacks
def test_no_name(self):
"""
error to miss --name option
"""
code, out, err = yield run_cli(
"admin", "add-grid-manager-cert", "--filename", "-",
stdin=b"the cert",
)
self.assertIn(
"Must provide --name",
out
)
@inlineCallbacks
def test_no_filename(self):
"""
error to miss --name option
"""
code, out, err = yield run_cli(
"admin", "add-grid-manager-cert", "--name", "foo",
stdin=b"the cert",
)
self.assertIn(
"Must provide --filename",
out
)
@inlineCallbacks
def test_add_one(self):
"""
we can add a certificate
"""
nodedir = self.mktemp()
fake_cert = b"""{"certificate": "", "signature": ""}"""
code, out, err = yield run_cli(
"--node-directory", nodedir,
"admin", "add-grid-manager-cert", "-f", "-", "--name", "foo",
stdin=fake_cert,
ignore_stderr=True,
)
nodepath = FilePath(nodedir)
with nodepath.child("tahoe.cfg").open("r") as f:
config_data = f.read()
self.assertIn("tahoe.cfg", nodepath.listdir())
self.assertIn(
b"foo = foo.cert",
config_data,
)
self.assertIn("foo.cert", nodepath.listdir())
with nodepath.child("foo.cert").open("r") as f:
self.assertEqual(
json.load(f),
json.loads(fake_cert)
)

View File

@ -34,7 +34,7 @@ from __future__ import annotations
from typing import Iterator, Optional, List, Tuple from typing import Iterator, Optional, List, Tuple
from collections.abc import Awaitable from collections.abc import Awaitable
from inspect import getargspec from inspect import getfullargspec
from itertools import count from itertools import count
from sys import stderr from sys import stderr
@ -141,8 +141,8 @@ def _verify():
""" """
# Poor man's interface verification. # Poor man's interface verification.
a = getargspec(create) a = getfullargspec(create)
b = getargspec(MemoryWormholeServer.create) b = getfullargspec(MemoryWormholeServer.create)
# I know it has a `self` argument at the beginning. That's okay. # I know it has a `self` argument at the beginning. That's okay.
b = b._replace(args=b.args[1:]) b = b._replace(args=b.args[1:])
assert a == b, "{} != {}".format(a, b) assert a == b, "{} != {}".format(a, b)

View File

@ -681,6 +681,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# test code. # test code.
FORCE_FOOLSCAP_FOR_STORAGE : Optional[bool] = None FORCE_FOOLSCAP_FOR_STORAGE : Optional[bool] = None
# If True, reduce the timeout on connections:
REDUCE_HTTP_CLIENT_TIMEOUT : bool = True
def setUp(self): def setUp(self):
self._http_client_pools = [] self._http_client_pools = []
http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool) http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool)
@ -707,7 +710,8 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
d.addTimeout(1, reactor) d.addTimeout(1, reactor)
return d return d
pool.getConnection = getConnectionWithTimeout if self.REDUCE_HTTP_CLIENT_TIMEOUT:
pool.getConnection = getConnectionWithTimeout
def close_idle_http_connections(self): def close_idle_http_connections(self):
"""Close all HTTP client connections that are just hanging around.""" """Close all HTTP client connections that are just hanging around."""
@ -867,6 +871,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,)) setnode("nickname", u"client %d \N{BLACK SMILING FACE}" % (which,))
setconf(config, which, "storage", "force_foolscap", str(force_foolscap)) setconf(config, which, "storage", "force_foolscap", str(force_foolscap))
setconf(config, which, "client", "force_foolscap", str(force_foolscap))
tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor) tub_location_hint, tub_port_endpoint = self.port_assigner.assign(reactor)
setnode("tub.port", tub_port_endpoint) setnode("tub.port", tub_port_endpoint)

View File

@ -134,6 +134,7 @@ def run_cli_native(verb, *args, **kwargs):
stdin = TextIOWrapper(BytesIO(stdin), encoding) stdin = TextIOWrapper(BytesIO(stdin), encoding)
stdout = TextIOWrapper(BytesIO(), encoding) stdout = TextIOWrapper(BytesIO(), encoding)
stderr = TextIOWrapper(BytesIO(), encoding) stderr = TextIOWrapper(BytesIO(), encoding)
options.stdin = stdin
d = defer.succeed(argv) d = defer.succeed(argv)
d.addCallback( d.addCallback(
partial( partial(

View File

@ -20,7 +20,7 @@ from testtools.matchers import (
from twisted.internet import defer from twisted.internet import defer
from allmydata.interfaces import MDMF_VERSION from allmydata.interfaces import MDMF_VERSION
from allmydata.mutable.filenode import MutableFileNode from allmydata.mutable.filenode import MutableFileNode
from allmydata.mutable.publish import MutableData, DEFAULT_MAX_SEGMENT_SIZE from allmydata.mutable.publish import MutableData, DEFAULT_MUTABLE_MAX_SEGMENT_SIZE
from ..no_network import GridTestMixin from ..no_network import GridTestMixin
from .. import common_util as testutil from .. import common_util as testutil
@ -180,7 +180,7 @@ class Update(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin):
# long -- this is 7 segments in the default segment size. So we # long -- this is 7 segments in the default segment size. So we
# need to add 2 segments worth of data to push it over a # need to add 2 segments worth of data to push it over a
# power-of-two boundary. # power-of-two boundary.
segment = b"a" * DEFAULT_MAX_SEGMENT_SIZE segment = b"a" * DEFAULT_MUTABLE_MAX_SEGMENT_SIZE
new_data = self.data + (segment * 2) new_data = self.data + (segment * 2)
d0 = self.do_upload_mdmf() d0 = self.do_upload_mdmf()
def _run(ign): def _run(ign):
@ -232,9 +232,9 @@ class Update(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin):
return d0 return d0
def test_multiple_segment_replace(self): def test_multiple_segment_replace(self):
replace_offset = 2 * DEFAULT_MAX_SEGMENT_SIZE replace_offset = 2 * DEFAULT_MUTABLE_MAX_SEGMENT_SIZE
new_data = self.data[:replace_offset] new_data = self.data[:replace_offset]
new_segment = b"a" * DEFAULT_MAX_SEGMENT_SIZE new_segment = b"a" * DEFAULT_MUTABLE_MAX_SEGMENT_SIZE
new_data += 2 * new_segment new_data += 2 * new_segment
new_data += b"replaced" new_data += b"replaced"
rest_offset = len(new_data) rest_offset = len(new_data)

View File

@ -195,6 +195,10 @@ class NoNetworkServer(object):
return self return self
def __deepcopy__(self, memodict): def __deepcopy__(self, memodict):
return self return self
def upload_permitted(self):
return True
def get_serverid(self): def get_serverid(self):
return self.serverid return self.serverid
def get_permutation_seed(self): def get_permutation_seed(self):
@ -225,7 +229,7 @@ class NoNetworkServer(object):
@implementer(IStorageBroker) @implementer(IStorageBroker)
class NoNetworkStorageBroker(object): # type: ignore # missing many methods class NoNetworkStorageBroker(object): # type: ignore # missing many methods
def get_servers_for_psi(self, peer_selection_index): def get_servers_for_psi(self, peer_selection_index, for_upload=True):
def _permuted(server): def _permuted(server):
seed = server.get_permutation_seed() seed = server.get_permutation_seed()
return permute_server_hash(peer_selection_index, seed) return permute_server_hash(peer_selection_index, seed)

View File

@ -26,6 +26,11 @@ from ..uri import (
MDMFDirectoryURI, MDMFDirectoryURI,
) )
from allmydata.util.base32 import (
b2a,
)
def write_capabilities(): def write_capabilities():
""" """
Build ``IURI`` providers representing all kinds of write capabilities. Build ``IURI`` providers representing all kinds of write capabilities.
@ -121,6 +126,7 @@ def dir2_mdmf_capabilities():
mdmf_capabilities(), mdmf_capabilities(),
) )
def offsets(min_value=0, max_value=2 ** 16): def offsets(min_value=0, max_value=2 ** 16):
""" """
Build ``int`` values that could be used as valid offsets into a sequence Build ``int`` values that could be used as valid offsets into a sequence
@ -134,3 +140,13 @@ def lengths(min_value=1, max_value=2 ** 16):
share data in a share file). share data in a share file).
""" """
return integers(min_value, max_value) return integers(min_value, max_value)
def base32text():
"""
Build text()s that are valid base32
"""
return builds(
lambda b: str(b2a(b), "ascii"),
binary(),
)

View File

@ -10,7 +10,8 @@ from future.utils import PY2
if PY2: if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import os, sys import os
import sys
from functools import ( from functools import (
partial, partial,
) )
@ -46,6 +47,7 @@ from testtools.matchers import (
AfterPreprocessing, AfterPreprocessing,
MatchesListwise, MatchesListwise,
MatchesDict, MatchesDict,
ContainsDict,
Always, Always,
Is, Is,
raises, raises,
@ -72,6 +74,7 @@ from allmydata.util import (
fileutil, fileutil,
encodingutil, encodingutil,
configutil, configutil,
jsonbytes as json,
) )
from allmydata.util.eliotutil import capture_logging from allmydata.util.eliotutil import capture_logging
from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.fileutil import abspath_expanduser_unicode
@ -1508,3 +1511,45 @@ enabled = {storage_enabled}
), ),
), ),
) )
def test_announcement_includes_grid_manager(self):
"""
When Grid Manager is enabled certificates are included in the
announcement
"""
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":1}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda",
}
with self.basedir.child("zero.cert").open("w") as f:
f.write(json.dumps_bytes(fake_cert))
with self.basedir.child("gm0.cert").open("w") as f:
f.write(json.dumps_bytes(fake_cert))
config = client.config_from_string(
self.basedir.path,
"tub.port",
self.get_config(
storage_enabled=True,
more_storage="grid_management = True",
more_sections=(
"[grid_managers]\n"
"gm0 = pub-v0-ibpbsexcjfbv3ni7gwlclgn6mldaqnqd5mrtan2fnq2b27xnovca\n"
"[grid_manager_certificates]\n"
"foo = zero.cert\n"
)
),
)
self.assertThat(
client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
),
succeeded(AfterPreprocessing(
lambda client: get_published_announcements(client)[0].ann,
ContainsDict({
"grid-manager-certificates": Equals([fake_cert]),
}),
)),
)

View File

@ -0,0 +1,455 @@
"""
Tests for the grid manager.
"""
from datetime import (
timedelta,
)
from twisted.python.filepath import (
FilePath,
)
from hypothesis import given
from allmydata.node import (
config_from_string,
)
from allmydata.client import (
_valid_config as client_valid_config,
)
from allmydata.crypto import (
ed25519,
)
from allmydata.util import (
jsonbytes as json,
)
from allmydata.grid_manager import (
load_grid_manager,
save_grid_manager,
create_grid_manager,
parse_grid_manager_certificate,
create_grid_manager_verifier,
SignedCertificate,
)
from allmydata.test.strategies import (
base32text,
)
from .common import SyncTestCase
class GridManagerUtilities(SyncTestCase):
"""
Confirm operation of utility functions used by GridManager
"""
def test_load_certificates(self):
"""
Grid Manager certificates are deserialized from config properly
"""
cert_path = self.mktemp()
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":1}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
}
with open(cert_path, "wb") as f:
f.write(json.dumps_bytes(fake_cert))
config_data = (
"[grid_managers]\n"
"fluffy = pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq\n"
"[grid_manager_certificates]\n"
"ding = {}\n".format(cert_path)
)
config = config_from_string("/foo", "portnum", config_data, client_valid_config())
self.assertEqual(
{"fluffy": "pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq"},
config.enumerate_section("grid_managers")
)
certs = config.get_grid_manager_certificates()
self.assertEqual([fake_cert], certs)
def test_load_certificates_invalid_version(self):
"""
An error is reported loading invalid certificate version
"""
gm_path = FilePath(self.mktemp())
gm_path.makedirs()
config = {
"grid_manager_config_version": 0,
"private_key": "priv-v0-ub7knkkmkptqbsax4tznymwzc4nk5lynskwjsiubmnhcpd7lvlqa",
"storage_servers": {
"radia": {
"public_key": "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
}
}
}
with gm_path.child("config.json").open("wb") as f:
f.write(json.dumps_bytes(config))
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":22}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
}
with gm_path.child("radia.cert.0").open("wb") as f:
f.write(json.dumps_bytes(fake_cert))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(gm_path)
self.assertIn(
"22",
str(ctx.exception),
)
def test_load_certificates_unknown_key(self):
"""
An error is reported loading certificates with invalid keys in them
"""
cert_path = self.mktemp()
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":22}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda",
"something-else": "not valid in a v0 certificate"
}
with open(cert_path, "wb") as f:
f.write(json.dumps_bytes(fake_cert))
config_data = (
"[grid_manager_certificates]\n"
"ding = {}\n".format(cert_path)
)
config = config_from_string("/foo", "portnum", config_data, client_valid_config())
with self.assertRaises(ValueError) as ctx:
config.get_grid_manager_certificates()
self.assertIn(
"Unknown key in Grid Manager certificate",
str(ctx.exception)
)
def test_load_certificates_missing(self):
"""
An error is reported for missing certificates
"""
cert_path = self.mktemp()
config_data = (
"[grid_managers]\n"
"fluffy = pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq\n"
"[grid_manager_certificates]\n"
"ding = {}\n".format(cert_path)
)
config = config_from_string("/foo", "portnum", config_data, client_valid_config())
with self.assertRaises(ValueError) as ctx:
config.get_grid_manager_certificates()
# we don't reliably know how Windows or MacOS will represent
# the path in the exception, so we don't check for the *exact*
# message with full-path here..
self.assertIn(
"Grid Manager certificate file",
str(ctx.exception)
)
self.assertIn(
" doesn't exist",
str(ctx.exception)
)
class GridManagerVerifier(SyncTestCase):
"""
Tests related to rejecting or accepting Grid Manager certificates.
"""
def setUp(self):
self.gm = create_grid_manager()
return super(GridManagerVerifier, self).setUp()
def test_sign_cert(self):
"""
For a storage server previously added to a grid manager,
_GridManager.sign returns a dict with "certificate" and
"signature" properties where the value of "signature" gives
the ed25519 signature (using the grid manager's private key of
the value) of "certificate".
"""
priv, pub = ed25519.create_signing_keypair()
self.gm.add_storage_server("test", pub)
cert0 = self.gm.sign("test", timedelta(seconds=86400))
cert1 = self.gm.sign("test", timedelta(seconds=3600))
self.assertNotEqual(cert0, cert1)
self.assertIsInstance(cert0, SignedCertificate)
gm_key = ed25519.verifying_key_from_string(self.gm.public_identity())
self.assertEqual(
ed25519.verify_signature(
gm_key,
cert0.signature,
cert0.certificate,
),
None
)
def test_sign_cert_wrong_name(self):
"""
Try to sign a storage-server that doesn't exist
"""
with self.assertRaises(KeyError):
self.gm.sign("doesn't exist", timedelta(seconds=86400))
def test_add_cert(self):
"""
Add a storage-server and serialize it
"""
priv, pub = ed25519.create_signing_keypair()
self.gm.add_storage_server("test", pub)
data = self.gm.marshal()
self.assertEqual(
data["storage_servers"],
{
"test": {
"public_key": ed25519.string_from_verifying_key(pub),
}
}
)
def test_remove(self):
"""
Add then remove a storage-server
"""
priv, pub = ed25519.create_signing_keypair()
self.gm.add_storage_server("test", pub)
self.gm.remove_storage_server("test")
self.assertEqual(len(self.gm.storage_servers), 0)
def test_serialize(self):
"""
Write and then read a Grid Manager config
"""
priv0, pub0 = ed25519.create_signing_keypair()
priv1, pub1 = ed25519.create_signing_keypair()
self.gm.add_storage_server("test0", pub0)
self.gm.add_storage_server("test1", pub1)
tempdir = self.mktemp()
fp = FilePath(tempdir)
save_grid_manager(fp, self.gm)
gm2 = load_grid_manager(fp)
self.assertEqual(
self.gm.public_identity(),
gm2.public_identity(),
)
self.assertEqual(
len(self.gm.storage_servers),
len(gm2.storage_servers),
)
for name, ss0 in list(self.gm.storage_servers.items()):
ss1 = gm2.storage_servers[name]
self.assertEqual(ss0.name, ss1.name)
self.assertEqual(ss0.public_key_string(), ss1.public_key_string())
self.assertEqual(self.gm.marshal(), gm2.marshal())
def test_invalid_no_version(self):
"""
Invalid Grid Manager config with no version
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
bad_config = {
"private_key": "at least we have one",
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(bad_config))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"unknown version",
str(ctx.exception),
)
def test_invalid_certificate_bad_version(self):
"""
Invalid Grid Manager config containing a certificate with an
illegal version
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
config = {
"grid_manager_config_version": 0,
"private_key": "priv-v0-ub7knkkmkptqbsax4tznymwzc4nk5lynskwjsiubmnhcpd7lvlqa",
"storage_servers": {
"alice": {
"public_key": "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
}
}
}
bad_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":0}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(config))
with fp.child("alice.cert.0").open("w") as f:
f.write(json.dumps_bytes(bad_cert))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"Unknown certificate version",
str(ctx.exception),
)
def test_invalid_no_private_key(self):
"""
Invalid Grid Manager config with no private key
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
bad_config = {
"grid_manager_config_version": 0,
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(bad_config))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"'private_key' required",
str(ctx.exception),
)
def test_invalid_bad_private_key(self):
"""
Invalid Grid Manager config with bad private-key
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
bad_config = {
"grid_manager_config_version": 0,
"private_key": "not actually encoded key",
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(bad_config))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"Invalid Grid Manager private_key",
str(ctx.exception),
)
def test_invalid_storage_server(self):
"""
Invalid Grid Manager config with missing public-key for
storage-server
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
bad_config = {
"grid_manager_config_version": 0,
"private_key": "priv-v0-ub7knkkmkptqbsax4tznymwzc4nk5lynskwjsiubmnhcpd7lvlqa",
"storage_servers": {
"bad": {}
}
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(bad_config))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"No 'public_key' for storage server",
str(ctx.exception),
)
def test_parse_cert(self):
"""
Parse an ostensibly valid storage certificate
"""
js = parse_grid_manager_certificate('{"certificate": "", "signature": ""}')
self.assertEqual(
set(js.keys()),
{"certificate", "signature"}
)
# the signature isn't *valid*, but that's checked in a
# different function
def test_parse_cert_not_dict(self):
"""
Certificate data not even a dict
"""
with self.assertRaises(ValueError) as ctx:
parse_grid_manager_certificate("[]")
self.assertIn(
"must be a dict",
str(ctx.exception),
)
def test_parse_cert_missing_signature(self):
"""
Missing the signature
"""
with self.assertRaises(ValueError) as ctx:
parse_grid_manager_certificate('{"certificate": ""}')
self.assertIn(
"must contain",
str(ctx.exception),
)
def test_validate_cert(self):
"""
Validate a correctly-signed certificate
"""
priv0, pub0 = ed25519.create_signing_keypair()
self.gm.add_storage_server("test0", pub0)
cert0 = self.gm.sign("test0", timedelta(seconds=86400))
verify = create_grid_manager_verifier(
[self.gm._public_key],
[cert0],
ed25519.string_from_verifying_key(pub0),
)
self.assertTrue(verify())
class GridManagerInvalidVerifier(SyncTestCase):
"""
Invalid certificate rejection tests
"""
def setUp(self):
self.gm = create_grid_manager()
self.priv0, self.pub0 = ed25519.create_signing_keypair()
self.gm.add_storage_server("test0", self.pub0)
self.cert0 = self.gm.sign("test0", timedelta(seconds=86400))
return super(GridManagerInvalidVerifier, self).setUp()
@given(
base32text(),
)
def test_validate_cert_invalid(self, invalid_signature):
"""
An incorrect signature is rejected
"""
# make signature invalid
invalid_cert = SignedCertificate(
self.cert0.certificate,
invalid_signature.encode("ascii"),
)
verify = create_grid_manager_verifier(
[self.gm._public_key],
[invalid_cert],
ed25519.string_from_verifying_key(self.pub0),
bad_cert = lambda key, cert: None,
)
self.assertFalse(verify())

View File

@ -261,6 +261,20 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
with self.assertRaises(MissingConfigEntry): with self.assertRaises(MissingConfigEntry):
config.get_config("node", "log_gatherer.furl") config.get_config("node", "log_gatherer.furl")
def test_missing_config_section(self):
"""
Enumerating a missing section returns empty dict
"""
basedir = self.mktemp()
fileutil.make_dirs(basedir)
with open(os.path.join(basedir, 'tahoe.cfg'), 'w'):
pass
config = read_config(basedir, "")
self.assertEquals(
config.enumerate_section("not-a-section"),
{}
)
def test_config_required(self): def test_config_required(self):
""" """
Asking for missing (but required) configuration is an error Asking for missing (but required) configuration is an error

File diff suppressed because it is too large Load Diff

View File

@ -94,10 +94,13 @@ from allmydata.storage_client import (
StorageFarmBroker, StorageFarmBroker,
_FoolscapStorage, _FoolscapStorage,
_NullStorage, _NullStorage,
ANONYMOUS_STORAGE_NURLS
) )
from ..storage.server import ( from ..storage.server import (
StorageServer, StorageServer,
) )
from ..client import config_from_string
from allmydata.interfaces import ( from allmydata.interfaces import (
IConnectionStatus, IConnectionStatus,
IStorageServer, IStorageServer,
@ -739,3 +742,42 @@ storage:
yield done yield done
self.assertTrue(done.called) self.assertTrue(done.called)
def test_should_we_use_http_default(self):
"""Default is to not use HTTP; this will change eventually"""
basedir = self.mktemp()
node_config = config_from_string(basedir, "", "")
announcement = {ANONYMOUS_STORAGE_NURLS: ["pb://..."]}
self.assertFalse(
StorageFarmBroker._should_we_use_http(node_config, announcement)
)
self.assertFalse(
StorageFarmBroker._should_we_use_http(node_config, {})
)
def test_should_we_use_http(self):
"""
If HTTP is allowed, it will only be used if the announcement includes
some NURLs.
"""
basedir = self.mktemp()
no_nurls = {}
empty_nurls = {ANONYMOUS_STORAGE_NURLS: []}
has_nurls = {ANONYMOUS_STORAGE_NURLS: ["pb://.."]}
for force_foolscap, announcement, expected_http_usage in [
("false", no_nurls, False),
("false", empty_nurls, False),
("false", has_nurls, True),
("true", empty_nurls, False),
("true", no_nurls, False),
("true", has_nurls, False),
]:
node_config = config_from_string(
basedir, "", f"[client]\nforce_foolscap = {force_foolscap}"
)
self.assertEqual(
StorageFarmBroker._should_we_use_http(node_config, announcement),
expected_http_usage
)

View File

@ -18,10 +18,12 @@ sadly, an internal implementation detail of Twisted being leaked to tests...
For definitely synchronous calls, you can just use ``result_of()``. For definitely synchronous calls, you can just use ``result_of()``.
""" """
import time
from base64 import b64encode from base64 import b64encode
from contextlib import contextmanager from contextlib import contextmanager
from os import urandom from os import urandom
from typing import Union, Callable, Tuple, Iterable from typing import Union, Callable, Tuple, Iterable
from queue import Queue
from cbor2 import dumps from cbor2 import dumps
from pycddl import ValidationError as CDDLValidationError from pycddl import ValidationError as CDDLValidationError
from hypothesis import assume, given, strategies as st from hypothesis import assume, given, strategies as st
@ -31,13 +33,14 @@ from klein import Klein
from hyperlink import DecodedURL from hyperlink import DecodedURL
from collections_extended import RangeMap from collections_extended import RangeMap
from twisted.internet.task import Clock, Cooperator from twisted.internet.task import Clock, Cooperator
from twisted.internet.interfaces import IReactorTime from twisted.internet.interfaces import IReactorTime, IReactorFromThreads
from twisted.internet.defer import CancelledError, Deferred from twisted.internet.defer import CancelledError, Deferred
from twisted.web import http from twisted.web import http
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from werkzeug import routing from werkzeug import routing
from werkzeug.exceptions import NotFound as WNotFound from werkzeug.exceptions import NotFound as WNotFound
from testtools.matchers import Equals from testtools.matchers import Equals
from zope.interface import implementer
from .common import SyncTestCase from .common import SyncTestCase
from ..storage.http_common import get_content_type, CBOR_MIME_TYPE from ..storage.http_common import get_content_type, CBOR_MIME_TYPE
@ -449,6 +452,27 @@ class CustomHTTPServerTests(SyncTestCase):
self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0) self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0)
@implementer(IReactorFromThreads)
class Reactor(Clock):
"""
Fake reactor that supports time APIs and callFromThread.
Advancing the clock also runs any callbacks scheduled via callFromThread.
"""
def __init__(self):
Clock.__init__(self)
self._queue = Queue()
def callFromThread(self, f, *args, **kwargs):
self._queue.put((f, args, kwargs))
def advance(self, *args, **kwargs):
Clock.advance(self, *args, **kwargs)
while not self._queue.empty():
f, args, kwargs = self._queue.get()
f(*args, **kwargs)
class HttpTestFixture(Fixture): class HttpTestFixture(Fixture):
""" """
Setup HTTP tests' infrastructure, the storage server and corresponding Setup HTTP tests' infrastructure, the storage server and corresponding
@ -460,7 +484,7 @@ class HttpTestFixture(Fixture):
lambda pool: self.addCleanup(pool.closeCachedConnections) lambda pool: self.addCleanup(pool.closeCachedConnections)
) )
self.addCleanup(StorageClient.stop_test_mode) self.addCleanup(StorageClient.stop_test_mode)
self.clock = Clock() self.clock = Reactor()
self.tempdir = self.useFixture(TempDir()) self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in # The global Cooperator used by Twisted (a) used by pull producers in
# twisted.web, (b) is driven by a real reactor. We want to push time # twisted.web, (b) is driven by a real reactor. We want to push time
@ -475,7 +499,7 @@ class HttpTestFixture(Fixture):
self.storage_server = StorageServer( self.storage_server = StorageServer(
self.tempdir.path, b"\x00" * 20, clock=self.clock self.tempdir.path, b"\x00" * 20, clock=self.clock
) )
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST) self.http_server = HTTPServer(self.clock, self.storage_server, SWISSNUM_FOR_TEST)
self.treq = StubTreq(self.http_server.get_resource()) self.treq = StubTreq(self.http_server.get_resource())
self.client = StorageClient( self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"), DecodedURL.from_text("http://127.0.0.1"),
@ -501,13 +525,25 @@ class HttpTestFixture(Fixture):
# OK, no result yet, probably async HTTP endpoint handler, so advance # OK, no result yet, probably async HTTP endpoint handler, so advance
# time, flush treq, and try again: # time, flush treq, and try again:
for i in range(100): for i in range(10_000):
self.clock.advance(0.001) self.clock.advance(0.001)
self.treq.flush() self.treq.flush()
if result:
break
# By putting the sleep at the end, tests that are completely
# synchronous and don't use threads will have already broken out of
# the loop, and so will finish without any sleeps. This allows them
# to run as quickly as possible.
#
# However, some tests do talk to APIs that use a thread pool on the
# backend, so we need to allow actual time to pass for those.
time.sleep(0.001)
if result: if result:
return result[0] return result[0]
if error: if error:
error[0].raiseException() error[0].raiseException()
raise RuntimeError( raise RuntimeError(
"We expected given Deferred to have result already, but it wasn't. " "We expected given Deferred to have result already, but it wasn't. "
+ "This is probably a test design issue." + "This is probably a test design issue."

View File

@ -33,6 +33,7 @@ from allmydata.util import log, base32
from allmydata.util.encodingutil import quote_output, unicode_to_argv from allmydata.util.encodingutil import quote_output, unicode_to_argv
from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.consumer import MemoryConsumer, download_to_data from allmydata.util.consumer import MemoryConsumer, download_to_data
from allmydata.util.deferredutil import async_to_deferred
from allmydata.interfaces import IDirectoryNode, IFileNode, \ from allmydata.interfaces import IDirectoryNode, IFileNode, \
NoSuchChildError, NoSharesError, SDMF_VERSION, MDMF_VERSION NoSuchChildError, NoSharesError, SDMF_VERSION, MDMF_VERSION
from allmydata.monitor import Monitor from allmydata.monitor import Monitor
@ -657,7 +658,25 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
self.failUnlessEqual(res, NEWERDATA) self.failUnlessEqual(res, NEWERDATA)
d.addCallback(_check_download_5) d.addCallback(_check_download_5)
def _corrupt_shares(res): # The previous checks upload a complete replacement. This uses a
# different API that is supposed to do a partial write at an offset.
@async_to_deferred
async def _check_write_at_offset(newnode):
log.msg("writing at offset")
start = b"abcdef"
expected = b"abXYef"
uri = self._mutable_node_1.get_uri()
newnode = self.clients[0].create_node_from_uri(uri)
await newnode.overwrite(MutableData(start))
version = await newnode.get_mutable_version()
await version.update(MutableData(b"XY"), 2)
result = await newnode.download_best_version()
self.assertEqual(result, expected)
# Revert to previous version
await newnode.overwrite(MutableData(NEWERDATA))
d.addCallback(_check_write_at_offset)
def _corrupt_shares(_res):
# run around and flip bits in all but k of the shares, to test # run around and flip bits in all but k of the shares, to test
# the hash checks # the hash checks
shares = self._find_all_shares(self.basedir) shares = self._find_all_shares(self.basedir)

View File

@ -15,8 +15,10 @@ import six
import os, time, sys import os, time, sys
import yaml import yaml
import json import json
from threading import current_thread
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import reactor
from foolscap.api import Violation, RemoteException from foolscap.api import Violation, RemoteException
from allmydata.util import idlib, mathutil from allmydata.util import idlib, mathutil
@ -26,6 +28,7 @@ from allmydata.util import pollmixin
from allmydata.util import yamlutil from allmydata.util import yamlutil
from allmydata.util import rrefutil from allmydata.util import rrefutil
from allmydata.util.fileutil import EncryptedTemporaryFile from allmydata.util.fileutil import EncryptedTemporaryFile
from allmydata.util.cputhreadpool import defer_to_thread
from allmydata.test.common_util import ReallyEqualMixin from allmydata.test.common_util import ReallyEqualMixin
from .no_network import fireNow, LocalWrapper from .no_network import fireNow, LocalWrapper
@ -553,6 +556,17 @@ class JSONBytes(unittest.TestCase):
o, cls=jsonbytes.AnyBytesJSONEncoder)), o, cls=jsonbytes.AnyBytesJSONEncoder)),
expected, expected,
) )
self.assertEqual(
json.loads(jsonbytes.dumps(o, any_bytes=True)),
expected
)
def test_dumps_bytes_unicode_separators(self):
"""Unicode separators don't prevent the result from being bytes."""
result = jsonbytes.dumps_bytes([1, 2], separators=(u',', u':'))
self.assertIsInstance(result, bytes)
self.assertEqual(result, b"[1,2]")
class FakeGetVersion(object): class FakeGetVersion(object):
@ -588,3 +602,31 @@ class RrefUtilTests(unittest.TestCase):
) )
self.assertEqual(result.version, "Default") self.assertEqual(result.version, "Default")
self.assertIdentical(result, rref) self.assertIdentical(result, rref)
class CPUThreadPool(unittest.TestCase):
"""Tests for cputhreadpool."""
async def test_runs_in_thread(self):
"""The given function runs in a thread."""
def f(*args, **kwargs):
return current_thread(), args, kwargs
this_thread = current_thread().ident
result = defer_to_thread(reactor, f, 1, 3, key=4, value=5)
# Callbacks run in the correct thread:
callback_thread_ident = []
def passthrough(result):
callback_thread_ident.append(current_thread().ident)
return result
result.addCallback(passthrough)
# The task ran in a different thread:
thread, args, kwargs = await result
self.assertEqual(callback_thread_ident[0], this_thread)
self.assertNotEqual(thread.ident, this_thread)
self.assertEqual(args, (1, 3))
self.assertEqual(kwargs, {"key": 4, "value": 5})

View File

@ -73,13 +73,15 @@ def write_config(tahoe_cfg: FilePath, config: ConfigParser) -> None:
""" """
Write a configuration to a file. Write a configuration to a file.
:param FilePath tahoe_cfg: The path to which to write the config. :param FilePath tahoe_cfg: The path to which to write the
config. The directories are created if they do not already exist.
:param ConfigParser config: The configuration to write. :param ConfigParser config: The configuration to write.
:return: ``None`` :return: ``None``
""" """
tmp = tahoe_cfg.temporarySibling() tmp = tahoe_cfg.temporarySibling()
tahoe_cfg.parent().makedirs(ignoreExistingDirectory=True)
# FilePath.open can only open files in binary mode which does not work # FilePath.open can only open files in binary mode which does not work
# with ConfigParser.write. # with ConfigParser.write.
with open(tmp.path, "wt") as fp: with open(tmp.path, "wt") as fp:
@ -87,7 +89,10 @@ def write_config(tahoe_cfg: FilePath, config: ConfigParser) -> None:
# Windows doesn't have atomic overwrite semantics for moveTo. Thus we end # Windows doesn't have atomic overwrite semantics for moveTo. Thus we end
# up slightly less than atomic. # up slightly less than atomic.
if platform.isWindows(): if platform.isWindows():
tahoe_cfg.remove() try:
tahoe_cfg.remove()
except FileNotFoundError:
pass
tmp.moveTo(tahoe_cfg) tmp.moveTo(tahoe_cfg)
def validate_config(fname: str, cfg: ConfigParser, valid_config: ValidConfiguration) -> None: def validate_config(fname: str, cfg: ConfigParser, valid_config: ValidConfiguration) -> None:
@ -169,7 +174,7 @@ class ValidConfiguration(object):
def is_valid_item(self, section_name: str, item_name: str) -> bool: def is_valid_item(self, section_name: str, item_name: str) -> bool:
""" """
:return: True if the given section name, ite name pair is valid, False :return: True if the given section name, item_name pair is valid, False
otherwise. otherwise.
""" """
return ( return (

View File

@ -0,0 +1,59 @@
"""
A global thread pool for CPU-intensive tasks.
Motivation:
* Certain tasks are blocking on CPU, and so should be run in a thread.
* The Twisted thread pool is used for operations that don't necessarily block
on CPU, like DNS lookups. CPU processing should not block DNS lookups!
* The number of threads should be fixed, and tied to the number of available
CPUs.
As a first pass, this uses ``os.cpu_count()`` to determine the max number of
threads. This may create too many threads, as it doesn't cover things like
scheduler affinity or cgroups, but that's not the end of the world.
"""
import os
from typing import TypeVar, Callable, cast
from functools import partial
import threading
from typing_extensions import ParamSpec
from twisted.python.threadpool import ThreadPool
from twisted.internet.defer import Deferred
from twisted.internet.threads import deferToThreadPool
from twisted.internet.interfaces import IReactorFromThreads
_CPU_THREAD_POOL = ThreadPool(minthreads=0, maxthreads=os.cpu_count(), name="TahoeCPU")
if hasattr(threading, "_register_atexit"):
# This is a private API present in Python 3.8 or later, specifically
# designed for thread pool shutdown. Since it's private, it might go away
# at any point, so if it doesn't exist we still have a solution.
threading._register_atexit(_CPU_THREAD_POOL.stop) # type: ignore
else:
# Daemon threads allow shutdown to happen without any explicit stopping of
# threads. There are some bugs in old Python versions related to daemon
# threads (fixed in subsequent CPython patch releases), but Python's own
# thread pools use daemon threads in those versions so we're no worse off.
_CPU_THREAD_POOL.threadFactory = partial( # type: ignore
_CPU_THREAD_POOL.threadFactory, daemon=True
)
_CPU_THREAD_POOL.start()
P = ParamSpec("P")
R = TypeVar("R")
def defer_to_thread(
reactor: IReactorFromThreads, f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
) -> Deferred[R]:
"""Run the function in a thread, return the result as a ``Deferred``."""
# deferToThreadPool has no type annotations...
result = deferToThreadPool(reactor, _CPU_THREAD_POOL, f, *args, **kwargs)
return cast(Deferred[R], result)
__all__ = ["defer_to_thread"]

View File

@ -9,8 +9,7 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
from future.utils import PY2, PY3 from future.utils import PY2
if PY2: if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
@ -110,14 +109,12 @@ def dumps_bytes(obj: Any, *args: Any, **kwargs: Any) -> bytes:
UTF-8 encoded Unicode strings. If True, non-UTF-8 bytes are quoted for UTF-8 encoded Unicode strings. If True, non-UTF-8 bytes are quoted for
human consumption. human consumption.
""" """
result: str = dumps(obj, *args, **kwargs) return dumps(obj, *args, **kwargs).encode("utf-8")
if PY3:
resultbytes = result.encode("utf-8")
return resultbytes
# To make this module drop-in compatible with json module: # To make this module drop-in compatible with json module:
loads = json.loads loads = json.loads
load = json.load
__all__ = ["dumps", "loads"] __all__ = ["dumps", "loads", "load"]