Merge branch 'master' into 3828.key-length

This commit is contained in:
meejah 2022-03-24 11:40:52 -06:00
commit cc682d0413
35 changed files with 1403 additions and 628 deletions

View File

@ -1,7 +1,7 @@
ARG TAG
FROM debian:${TAG}
ARG PYTHON_VERSION
ENV DEBIAN_FRONTEND noninteractive
ENV WHEELHOUSE_PATH /tmp/wheelhouse
ENV VIRTUALENV_PATH /tmp/venv
# This will get updated by the CircleCI checkout step.

View File

@ -1,5 +1,5 @@
ARG TAG
FROM centos:${TAG}
FROM oraclelinux:${TAG}
ARG PYTHON_VERSION
ENV WHEELHOUSE_PATH /tmp/wheelhouse
@ -13,7 +13,6 @@ RUN yum install --assumeyes \
sudo \
make automake gcc gcc-c++ \
python${PYTHON_VERSION} \
python${PYTHON_VERSION}-devel \
libffi-devel \
openssl-devel \
libyaml \

View File

@ -1,7 +1,7 @@
ARG TAG
FROM ubuntu:${TAG}
ARG PYTHON_VERSION
ENV DEBIAN_FRONTEND noninteractive
ENV WHEELHOUSE_PATH /tmp/wheelhouse
ENV VIRTUALENV_PATH /tmp/venv
# This will get updated by the CircleCI checkout step.

View File

@ -15,28 +15,20 @@ workflows:
ci:
jobs:
# Start with jobs testing various platforms.
- "debian-9":
{}
- "debian-10":
{}
- "debian-11":
requires:
- "debian-9"
- "debian-10"
- "ubuntu-20-04":
{}
- "ubuntu-18-04":
requires:
- "ubuntu-20-04"
- "ubuntu-16-04":
requires:
- "ubuntu-20-04"
- "fedora-29":
{}
- "fedora-28":
requires:
- "fedora-29"
- "centos-8":
# Equivalent to RHEL 8; CentOS 8 is dead.
- "oraclelinux-8":
{}
- "nixos":
@ -47,18 +39,12 @@ workflows:
name: "NixOS 21.11"
nixpkgs: "21.11"
# Test against PyPy 2.7
- "pypy27-buster":
{}
# Test against Python 3:
- "python37":
{}
# Eventually, test against PyPy 3.8
#- "pypy27-buster":
# {}
# Other assorted tasks and configurations
- "lint":
{}
- "codechecks3":
- "codechecks":
{}
- "pyinstaller":
{}
@ -74,7 +60,7 @@ workflows:
requires:
# If the unit test suite doesn't pass, don't bother running the
# integration tests.
- "debian-9"
- "debian-10"
- "typechecks":
{}
@ -104,24 +90,19 @@ workflows:
# https://app.circleci.com/settings/organization/github/tahoe-lafs/contexts
- "build-image-debian-10": &DOCKERHUB_CONTEXT
context: "dockerhub-auth"
- "build-image-debian-9":
<<: *DOCKERHUB_CONTEXT
- "build-image-ubuntu-16-04":
- "build-image-debian-11":
<<: *DOCKERHUB_CONTEXT
- "build-image-ubuntu-18-04":
<<: *DOCKERHUB_CONTEXT
- "build-image-ubuntu-20-04":
<<: *DOCKERHUB_CONTEXT
- "build-image-fedora-28":
- "build-image-fedora-35":
<<: *DOCKERHUB_CONTEXT
- "build-image-fedora-29":
<<: *DOCKERHUB_CONTEXT
- "build-image-centos-8":
<<: *DOCKERHUB_CONTEXT
- "build-image-pypy27-buster":
<<: *DOCKERHUB_CONTEXT
- "build-image-python37-ubuntu":
- "build-image-oraclelinux-8":
<<: *DOCKERHUB_CONTEXT
# Restore later as PyPy38
#- "build-image-pypy27-buster":
# <<: *DOCKERHUB_CONTEXT
jobs:
@ -147,10 +128,10 @@ jobs:
# Since this job is never scheduled this step is never run so the
# actual value here is irrelevant.
lint:
codechecks:
docker:
- <<: *DOCKERHUB_AUTH
image: "circleci/python:2"
image: "cimg/python:3.9"
steps:
- "checkout"
@ -165,28 +146,10 @@ jobs:
command: |
~/.local/bin/tox -e codechecks
codechecks3:
docker:
- <<: *DOCKERHUB_AUTH
image: "circleci/python:3"
steps:
- "checkout"
- run:
name: "Install tox"
command: |
pip install --user tox
- run:
name: "Static-ish code checks"
command: |
~/.local/bin/tox -e codechecks3
pyinstaller:
docker:
- <<: *DOCKERHUB_AUTH
image: "circleci/python:2"
image: "cimg/python:3.9"
steps:
- "checkout"
@ -209,10 +172,10 @@ jobs:
command: |
dist/Tahoe-LAFS/tahoe --version
debian-9: &DEBIAN
debian-10: &DEBIAN
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/debian:9-py2.7"
image: "tahoelafsci/debian:10-py3.7"
user: "nobody"
environment: &UTF_8_ENVIRONMENT
@ -226,7 +189,7 @@ jobs:
# filenames and argv).
LANG: "en_US.UTF-8"
# Select a tox environment to run for this job.
TAHOE_LAFS_TOX_ENVIRONMENT: "py27"
TAHOE_LAFS_TOX_ENVIRONMENT: "py37"
# Additional arguments to pass to tox.
TAHOE_LAFS_TOX_ARGS: ""
# The path in which test artifacts will be placed.
@ -294,29 +257,29 @@ jobs:
/tmp/venv/bin/codecov
fi
debian-10:
debian-11:
<<: *DEBIAN
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/debian:10-py2.7"
image: "tahoelafsci/debian:11-py3.9"
user: "nobody"
pypy27-buster:
<<: *DEBIAN
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/pypy:buster-py2"
user: "nobody"
environment:
<<: *UTF_8_ENVIRONMENT
# We don't do coverage since it makes PyPy far too slow:
TAHOE_LAFS_TOX_ENVIRONMENT: "pypy27"
# Since we didn't collect it, don't upload it.
UPLOAD_COVERAGE: ""
TAHOE_LAFS_TOX_ENVIRONMENT: "py39"
# Restore later using PyPy3.8
# pypy27-buster:
# <<: *DEBIAN
# docker:
# - <<: *DOCKERHUB_AUTH
# image: "tahoelafsci/pypy:buster-py2"
# user: "nobody"
# environment:
# <<: *UTF_8_ENVIRONMENT
# # We don't do coverage since it makes PyPy far too slow:
# TAHOE_LAFS_TOX_ENVIRONMENT: "pypy27"
# # Since we didn't collect it, don't upload it.
# UPLOAD_COVERAGE: ""
c-locale:
<<: *DEBIAN
@ -364,25 +327,8 @@ jobs:
- run: *SETUP_VIRTUALENV
- run: *RUN_TESTS
ubuntu-16-04:
<<: *DEBIAN
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/ubuntu:16.04-py2.7"
user: "nobody"
ubuntu-18-04: &UBUNTU_18_04
<<: *DEBIAN
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/ubuntu:18.04-py2.7"
user: "nobody"
python37:
<<: *UBUNTU_18_04
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/ubuntu:18.04-py3.7"
@ -401,17 +347,21 @@ jobs:
<<: *DEBIAN
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/ubuntu:20.04"
image: "tahoelafsci/ubuntu:20.04-py3.9"
user: "nobody"
environment:
<<: *UTF_8_ENVIRONMENT
TAHOE_LAFS_TOX_ENVIRONMENT: "py39"
centos-8: &RHEL_DERIV
oraclelinux-8: &RHEL_DERIV
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/centos:8-py2"
image: "tahoelafsci/oraclelinux:8-py3.8"
user: "nobody"
environment: *UTF_8_ENVIRONMENT
environment:
<<: *UTF_8_ENVIRONMENT
TAHOE_LAFS_TOX_ENVIRONMENT: "py38"
# pip cannot install packages if the working directory is not readable.
# We want to run a lot of steps as nobody instead of as root.
@ -427,20 +377,11 @@ jobs:
- store_artifacts: *STORE_OTHER_ARTIFACTS
- run: *SUBMIT_COVERAGE
fedora-28:
fedora-35:
<<: *RHEL_DERIV
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/fedora:28-py"
user: "nobody"
fedora-29:
<<: *RHEL_DERIV
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/fedora:29-py"
image: "tahoelafsci/fedora:35-py3"
user: "nobody"
nixos:
@ -554,7 +495,7 @@ jobs:
typechecks:
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/ubuntu:18.04-py3"
image: "tahoelafsci/ubuntu:18.04-py3.7"
steps:
- "checkout"
@ -566,7 +507,7 @@ jobs:
docs:
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/ubuntu:18.04-py3"
image: "tahoelafsci/ubuntu:18.04-py3.7"
steps:
- "checkout"
@ -589,13 +530,14 @@ jobs:
image: "cimg/base:2022.01"
environment:
DISTRO: "tahoelafsci/<DISTRO>:foo-py2"
TAG: "tahoelafsci/distro:<TAG>-py2"
DISTRO: "tahoelafsci/<DISTRO>:foo-py3.9"
TAG: "tahoelafsci/distro:<TAG>-py3.9"
PYTHON_VERSION: "tahoelafsci/distro:tag-py<PYTHON_VERSION}"
steps:
- "checkout"
- "setup_remote_docker"
- setup_remote_docker:
version: "20.10.11"
- run:
name: "Log in to Dockerhub"
command: |
@ -622,39 +564,20 @@ jobs:
environment:
DISTRO: "debian"
TAG: "10"
PYTHON_VERSION: "2.7"
PYTHON_VERSION: "3.7"
build-image-debian-9:
build-image-debian-11:
<<: *BUILD_IMAGE
environment:
DISTRO: "debian"
TAG: "9"
PYTHON_VERSION: "2.7"
build-image-ubuntu-16-04:
<<: *BUILD_IMAGE
environment:
DISTRO: "ubuntu"
TAG: "16.04"
PYTHON_VERSION: "2.7"
TAG: "11"
PYTHON_VERSION: "3.9"
build-image-ubuntu-18-04:
<<: *BUILD_IMAGE
environment:
DISTRO: "ubuntu"
TAG: "18.04"
PYTHON_VERSION: "2.7"
build-image-python37-ubuntu:
<<: *BUILD_IMAGE
environment:
DISTRO: "ubuntu"
TAG: "18.04"
@ -667,43 +590,32 @@ jobs:
environment:
DISTRO: "ubuntu"
TAG: "20.04"
PYTHON_VERSION: "2.7"
PYTHON_VERSION: "3.9"
build-image-centos-8:
build-image-oraclelinux-8:
<<: *BUILD_IMAGE
environment:
DISTRO: "centos"
DISTRO: "oraclelinux"
TAG: "8"
PYTHON_VERSION: "2"
PYTHON_VERSION: "3.8"
build-image-fedora-28:
build-image-fedora-35:
<<: *BUILD_IMAGE
environment:
DISTRO: "fedora"
TAG: "28"
# The default on Fedora (this version anyway) is still Python 2.
PYTHON_VERSION: ""
TAG: "35"
PYTHON_VERSION: "3"
# build-image-pypy27-buster:
# <<: *BUILD_IMAGE
build-image-fedora-29:
<<: *BUILD_IMAGE
environment:
DISTRO: "fedora"
TAG: "29"
build-image-pypy27-buster:
<<: *BUILD_IMAGE
environment:
DISTRO: "pypy"
TAG: "buster"
# We only have Python 2 for PyPy right now so there's no support for
# setting up PyPy 3 in the image building toolchain. This value is just
# for constructing the right Docker image tag.
PYTHON_VERSION: "2"
# environment:
# DISTRO: "pypy"
# TAG: "buster"
# # We only have Python 2 for PyPy right now so there's no support for
# # setting up PyPy 3 in the image building toolchain. This value is just
# # for constructing the right Docker image tag.
# PYTHON_VERSION: "2"

View File

@ -38,17 +38,22 @@ jobs:
- windows-latest
- ubuntu-latest
python-version:
- 2.7
- 3.7
- 3.8
- 3.9
- "3.7"
- "3.8"
- "3.9"
- "3.10"
include:
# On macOS don't bother with 3.7-3.8, just to get faster builds.
- os: macos-10.15
python-version: 2.7
- os: macos-latest
python-version: 3.9
python-version: "3.9"
- os: macos-latest
python-version: "3.10"
# We only support PyPy on Linux at the moment.
- os: ubuntu-latest
python-version: "pypy-3.7"
- os: ubuntu-latest
python-version: "pypy-3.8"
steps:
# See https://github.com/actions/checkout. A fetch-depth of 0
# fetches all tags and branches.
@ -108,25 +113,6 @@ jobs:
# Action for this, as of Jan 2021 it does not support Python coverage
# files - only lcov files. Therefore, we use coveralls-python, the
# coveralls.io-supplied Python reporter, for this.
#
# It is coveralls-python 1.x that has maintained compatibility
# with Python 2, while coveralls-python 3.x is compatible with
# Python 3. Sadly we can't use them both in the same workflow.
#
# The two versions of coveralls-python are somewhat mutually
# incompatible. Mixing these two different versions when
# reporting coverage to coveralls.io will lead to grief, since
# they get job IDs in different fashion. If we use both
# versions of coveralls in the same workflow, the finalizing
# step will be able to mark only part of the jobs as done, and
# the other part will be left hanging, never marked as done: it
# does not matter if we make an API call or `coveralls --finish`
# to indicate that CI has finished running.
#
# So we try to use the newer coveralls-python that is available
# via Python 3 (which is present in GitHub Actions tool cache,
# even when we're running Python 2.7 tests) throughout this
# workflow.
- name: "Report Coverage to Coveralls"
run: |
pip3 install --upgrade coveralls==3.0.1
@ -179,13 +165,10 @@ jobs:
- windows-latest
- ubuntu-latest
python-version:
- 2.7
- 3.7
- 3.9
include:
# On macOS don't bother with 3.7, just to get faster builds.
- os: macos-10.15
python-version: 2.7
- os: macos-latest
python-version: 3.9
@ -201,9 +184,7 @@ jobs:
- name: Install Tor [macOS, ${{ matrix.python-version }} ]
if: ${{ contains(matrix.os, 'macos') }}
run: |
brew extract --version 0.4.5.8 tor homebrew/cask
brew install tor@0.4.5.8
brew link --overwrite tor@0.4.5.8
brew install tor
- name: Install Tor [Windows]
if: matrix.os == 'windows-latest'
@ -242,13 +223,13 @@ jobs:
- name: Display tool versions
run: python misc/build_helpers/show-tool-versions.py
- name: Run "Python 2 integration tests"
if: ${{ matrix.python-version == '2.7' }}
run: tox -e integration
- name: Run "Python 3 integration tests"
if: ${{ matrix.python-version != '2.7' }}
run: tox -e integration3
env:
# On macOS this is necessary to ensure unix socket paths for tor
# aren't too long. On Windows tox won't pass it through so it has no
# effect. On Linux it doesn't make a difference one way or another.
TMPDIR: "/tmp"
run: tox -e integration
- name: Upload eliot.log in case of failure
uses: actions/upload-artifact@v1
@ -267,7 +248,7 @@ jobs:
- windows-latest
- ubuntu-latest
python-version:
- 2.7
- 3.9
steps:

View File

@ -17,7 +17,7 @@ PYTHON=python
export PYTHON
PYFLAKES=flake8
export PYFLAKES
VIRTUAL_ENV=./.tox/py27
VIRTUAL_ENV=./.tox/py37
SOURCES=src/allmydata static misc setup.py
APPNAME=tahoe-lafs
TEST_SUITE=allmydata
@ -35,7 +35,7 @@ test: .tox/create-venvs.log
# Run codechecks first since it takes the least time to report issues early.
tox --develop -e codechecks
# Run all the test environments in parallel to reduce run-time
tox --develop -p auto -e 'py27,py37,pypy27'
tox --develop -p auto -e 'py37'
.PHONY: test-venv-coverage
## Run all tests with coverage collection and reporting.
test-venv-coverage:
@ -136,7 +136,7 @@ count-lines:
# Here is a list of testing tools that can be run with 'python' from a
# virtualenv in which Tahoe has been installed. There used to be Makefile
# targets for each, but the exact path to a suitable python is now up to the
# developer. But as a hint, after running 'tox', ./.tox/py27/bin/python will
# developer. But as a hint, after running 'tox', ./.tox/py37/bin/python will
# probably work.
# src/allmydata/test/bench_dirnode.py

View File

@ -53,12 +53,11 @@ For more detailed instructions, read `Installing Tahoe-LAFS <docs/Installation/i
Once ``tahoe --version`` works, see `How to Run Tahoe-LAFS <docs/running.rst>`__ to learn how to set up your first Tahoe-LAFS node.
🐍 Python 3 Support
--------------------
🐍 Python 2
-----------
Python 3 support has been introduced starting with Tahoe-LAFS 1.16.0, alongside Python 2.
System administrators are advised to start running Tahoe on Python 3 and should expect Python 2 support to be dropped in a future version.
Please, feel free to file issues if you run into bugs while running Tahoe on Python 3.
Python 3.7 or later is now required.
If you are still using Python 2.7, use Tahoe-LAFS version 1.17.1.
🤖 Issues

View File

@ -493,8 +493,8 @@ Handling repeat calls:
* If the same API call is repeated with the same upload secret, the response is the same and no change is made to server state.
This is necessary to ensure retries work in the face of lost responses from the server.
* If the API calls is with a different upload secret, this implies a new client, perhaps because the old client died.
In order to prevent storage servers from being able to mess with each other, this API call will fail, because the secret doesn't match.
The use case of restarting upload from scratch if the client dies can be implemented by having the client persist the upload secret.
Or it may happen because the client wants to upload a different share number than a previous client.
New shares will be created, existing shares will be unchanged, regardless of whether the upload secret matches or not.
Discussion
``````````
@ -540,7 +540,7 @@ Rejected designs for upload secrets:
Write data for the indicated share.
The share number must belong to the storage index.
The request body is the raw share data (i.e., ``application/octet-stream``).
*Content-Range* requests are encouraged for large transfers to allow partially complete uploads to be resumed.
*Content-Range* requests are required; for large transfers this allows partially complete uploads to be resumed.
For example,
a 1MiB share can be divided in to eight separate 128KiB chunks.
Each chunk can be uploaded in a separate request.
@ -614,16 +614,19 @@ From RFC 7231::
``POST /v1/immutable/:storage_index/:share_number/corrupt``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Advise the server the data read from the indicated share was corrupt.
The request body includes an human-meaningful string with details about the corruption.
It also includes potentially important details about the share.
Advise the server the data read from the indicated share was corrupt. The
request body includes an human-meaningful text string with details about the
corruption. It also includes potentially important details about the share.
For example::
{"reason": "expected hash abcd, got hash efgh"}
{"reason": u"expected hash abcd, got hash efgh"}
.. share-type, storage-index, and share-number are inferred from the URL
The response code is OK (200) by default, or NOT FOUND (404) if the share
couldn't be found.
Reading
~~~~~~~
@ -644,7 +647,7 @@ Read a contiguous sequence of bytes from one share in one bucket.
The response body is the raw share data (i.e., ``application/octet-stream``).
The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content).
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
Multiple ranges in a single request are *not* supported.
Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported.
Discussion
``````````

View File

@ -122,7 +122,7 @@ they will need to evaluate which contributors' signatures they trust.
- these should all pass:
- tox -e py27,codechecks,docs,integration
- tox -e py37,codechecks,docs,integration
- these can fail (ideally they should not of course):

View File

@ -462,10 +462,8 @@ def chutney(reactor, temp_dir):
)
pytest_twisted.blockon(proto.done)
# XXX: Here we reset Chutney to the last revision known to work
# with Python 2, as a workaround for Chutney moving to Python 3.
# When this is no longer necessary, we will have to drop this and
# add '--depth=1' back to the above 'git clone' subprocess.
# XXX: Here we reset Chutney to a specific revision known to work,
# since there are no stability guarantees or releases yet.
proto = _DumpOutputProtocol(None)
reactor.spawnProcess(
proto,
@ -473,7 +471,7 @@ def chutney(reactor, temp_dir):
(
'git', '-C', chutney_dir,
'reset', '--hard',
'99bd06c7554b9113af8c0877b6eca4ceb95dcbaa'
'c825cba0bcd813c644c6ac069deeb7347d3200ee'
),
env=environ,
)

View File

@ -26,10 +26,10 @@ python run-deprecations.py [--warnings=STDERRFILE] [--package=PYTHONPACKAGE ] CO
class RunPP(protocol.ProcessProtocol):
def outReceived(self, data):
self.stdout.write(data)
sys.stdout.write(data)
sys.stdout.write(str(data, sys.stdout.encoding))
def errReceived(self, data):
self.stderr.write(data)
sys.stderr.write(data)
sys.stderr.write(str(data, sys.stdout.encoding))
def processEnded(self, reason):
signal = reason.value.signal
rc = reason.value.exitCode
@ -100,17 +100,19 @@ def run_command(main):
pp.stdout.seek(0)
for line in pp.stdout.readlines():
line = str(line, sys.stdout.encoding)
if match(line):
add(line) # includes newline
pp.stderr.seek(0)
for line in pp.stderr.readlines():
line = str(line, sys.stdout.encoding)
if match(line):
add(line)
if warnings:
if config["warnings"]:
with open(config["warnings"], "wb") as f:
with open(config["warnings"], "w") as f:
print("".join(warnings), file=f)
print("ERROR: %d deprecation warnings found" % len(warnings))
sys.exit(1)

0
newsfragments/3327.minor Normal file
View File

1
newsfragments/3697.minor Normal file
View File

@ -0,0 +1 @@
Added support for Python 3.10. Added support for PyPy3 (3.7 and 3.8, on Linux only).

0
newsfragments/3860.minor Normal file
View File

View File

@ -0,0 +1 @@
Python 3.7 or later is now required; Python 2 is no longer supported.

0
newsfragments/3876.minor Normal file
View File

0
newsfragments/3877.minor Normal file
View File

View File

@ -0,0 +1 @@
Share corruption reports stored on disk are now always encoded in UTF-8.

0
newsfragments/3881.minor Normal file
View File

0
newsfragments/3882.minor Normal file
View File

0
newsfragments/3883.minor Normal file
View File

View File

@ -11,7 +11,10 @@ import struct
import sys
if not hasattr(sys, 'real_prefix'):
try:
import allmydata
del allmydata
except ImportError:
sys.exit("Please run inside a virtualenv with Tahoe-LAFS installed.")

View File

@ -55,8 +55,7 @@ install_requires = [
# * foolscap >= 0.12.6 has an i2p.sam_endpoint() that takes kwargs
# * foolscap 0.13.2 drops i2p support completely
# * foolscap >= 21.7 is necessary for Python 3 with i2p support.
"foolscap == 0.13.1 ; python_version < '3.0'",
"foolscap >= 21.7.0 ; python_version > '3.0'",
"foolscap >= 21.7.0",
# * cryptography 2.6 introduced some ed25519 APIs we rely on. Note that
# Twisted[conch] also depends on cryptography and Twisted[tls]
@ -106,16 +105,10 @@ install_requires = [
# for 'tahoe invite' and 'tahoe join'
"magic-wormhole >= 0.10.2",
# Eliot is contemplating dropping Python 2 support. Stick to a version we
# know works on Python 2.7.
"eliot ~= 1.7 ; python_version < '3.0'",
# On Python 3, we want a new enough version to support custom JSON encoders.
"eliot >= 1.13.0 ; python_version > '3.0'",
# We want a new enough version to support custom JSON encoders.
"eliot >= 1.13.0",
# Pyrsistent 0.17.0 (which we use by way of Eliot) has dropped
# Python 2 entirely; stick to the version known to work for us.
"pyrsistent < 0.17.0 ; python_version < '3.0'",
"pyrsistent ; python_version > '3.0'",
"pyrsistent",
# A great way to define types of values.
"attrs >= 18.2.0",
@ -135,14 +128,8 @@ install_requires = [
# Linux distribution detection:
"distro >= 1.4.0",
# Backported configparser for Python 2:
"configparser ; python_version < '3.0'",
# For the RangeMap datastructure. Need 2.0.2 at least for bugfixes. Python
# 2 doesn't actually need this, since HTTP storage protocol isn't supported
# there, so we just pick whatever version so that code imports.
"collections-extended >= 2.0.2 ; python_version > '3.0'",
"collections-extended ; python_version < '3.0'",
# For the RangeMap datastructure. Need 2.0.2 at least for bugfixes.
"collections-extended >= 2.0.2",
# HTTP server and client
"klein",
@ -201,8 +188,7 @@ trove_classifiers=[
"Natural Language :: English",
"Programming Language :: C",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Topic :: Utilities",
"Topic :: System :: Systems Administration",
"Topic :: System :: Filesystems",
@ -229,7 +215,7 @@ def run_command(args, cwd=None):
use_shell = sys.platform == "win32"
try:
p = subprocess.Popen(args, stdout=subprocess.PIPE, cwd=cwd, shell=use_shell)
except EnvironmentError as e: # if this gives a SyntaxError, note that Tahoe-LAFS requires Python 2.7+
except EnvironmentError as e: # if this gives a SyntaxError, note that Tahoe-LAFS requires Python 3.7+
print("Warning: unable to run %r." % (" ".join(args),))
print(e)
return None
@ -380,8 +366,8 @@ setup(name="tahoe-lafs", # also set in __init__.py
package_dir = {'':'src'},
packages=find_packages('src') + ['allmydata.test.plugins'],
classifiers=trove_classifiers,
# We support Python 2.7, and Python 3.7 or later.
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, !=3.6.*",
# We support Python 3.7 or later. 3.11 is not supported yet.
python_requires=">=3.7, <3.11",
install_requires=install_requires,
extras_require={
# Duplicate the Twisted pywin32 dependency here. See
@ -400,10 +386,6 @@ setup(name="tahoe-lafs", # also set in __init__.py
"tox",
"pytest",
"pytest-twisted",
# XXX: decorator isn't a direct dependency, but pytest-twisted
# depends on decorator, and decorator 5.x isn't compatible with
# Python 2.7.
"decorator < 5",
"hypothesis >= 3.6.1",
"towncrier",
"testtools",

View File

@ -1,54 +1,39 @@
"""
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from time import clock as process_time
else:
from time import process_time
from collections import deque
from time import process_time
import time
from typing import Deque, Tuple
from twisted.application import service
from twisted.application.internet import TimerService
from zope.interface import implementer
from foolscap.api import eventually
from allmydata.util import log, dictutil
from allmydata.interfaces import IStatsProducer
@implementer(IStatsProducer)
class CPUUsageMonitor(service.MultiService):
HISTORY_LENGTH = 15
POLL_INTERVAL = 60 # type: float
HISTORY_LENGTH: int = 15
POLL_INTERVAL: float = 60
initial_cpu: float = 0.0
def __init__(self):
service.MultiService.__init__(self)
# we don't use process_time() here, because the constructor is run by
# the twistd parent process (as it loads the .tac file), whereas the
# rest of the program will be run by the child process, after twistd
# forks. Instead, set self.initial_cpu as soon as the reactor starts
# up.
self.initial_cpu = 0.0 # just in case
eventually(self._set_initial_cpu)
self.samples = []
self.samples: Deque[Tuple[float, float]] = deque([], self.HISTORY_LENGTH + 1)
# we provide 1min, 5min, and 15min moving averages
TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
def _set_initial_cpu(self):
def startService(self):
self.initial_cpu = process_time()
return super().startService()
def check(self):
now_wall = time.time()
now_cpu = process_time()
self.samples.append( (now_wall, now_cpu) )
while len(self.samples) > self.HISTORY_LENGTH+1:
self.samples.pop(0)
def _average_N_minutes(self, size):
if len(self.samples) < size+1:

View File

@ -2,27 +2,8 @@
HTTP client that talks to the HTTP storage server.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
from collections import defaultdict
Optional = Set = defaultdict(
lambda: None
) # some garbage to just make this module import
else:
# typing module not available in Python 2, and we only do type checking in
# Python 3 anyway.
from typing import Union, Set, Optional
from treq.testing import StubTreq
from typing import Union, Set, Optional
from treq.testing import StubTreq
from base64 import b64encode
@ -38,7 +19,7 @@ from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred
from hyperlink import DecodedURL
import treq
from .http_common import swissnum_auth_header, Secrets
from .http_common import swissnum_auth_header, Secrets, get_content_type, CBOR_MIME_TYPE
from .common import si_b2a
@ -48,14 +29,25 @@ def _encode_si(si): # type: (bytes) -> str
class ClientException(Exception):
"""An unexpected error."""
"""An unexpected response code from the server."""
def __init__(self, code, *additional_args):
Exception.__init__(self, code, *additional_args)
self.code = code
def _decode_cbor(response):
"""Given HTTP response, return decoded CBOR body."""
if response.code > 199 and response.code < 300:
return treq.content(response).addCallback(loads)
return fail(ClientException(response.code, response.phrase))
content_type = get_content_type(response.headers)
if content_type == CBOR_MIME_TYPE:
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
return treq.content(response).addCallback(loads)
else:
raise ClientException(-1, "Server didn't send CBOR")
else:
return fail(ClientException(response.code, response.phrase))
@attr.s
@ -68,7 +60,7 @@ class ImmutableCreateResult(object):
class StorageClient(object):
"""
HTTP client that talks to the HTTP storage server.
Low-level HTTP client that talks to the HTTP storage server.
"""
def __init__(
@ -78,7 +70,7 @@ class StorageClient(object):
self._swissnum = swissnum
self._treq = treq
def _url(self, path):
def relative_url(self, path):
"""Get a URL relative to the base URL."""
return self._base_url.click(path)
@ -92,7 +84,7 @@ class StorageClient(object):
)
return headers
def _request(
def request(
self,
method,
url,
@ -100,13 +92,19 @@ class StorageClient(object):
lease_cancel_secret=None,
upload_secret=None,
headers=None,
message_to_serialize=None,
**kwargs
):
"""
Like ``treq.request()``, but with optional secrets that get translated
into corresponding HTTP headers.
If ``message_to_serialize`` is set, it will be serialized (by default
with CBOR) and set as the request body.
"""
headers = self._get_headers(headers)
# Add secrets:
for secret, value in [
(Secrets.LEASE_RENEW, lease_renew_secret),
(Secrets.LEASE_CANCEL, lease_cancel_secret),
@ -118,15 +116,39 @@ class StorageClient(object):
"X-Tahoe-Authorization",
b"%s %s" % (secret.value.encode("ascii"), b64encode(value).strip()),
)
# Note we can accept CBOR:
headers.addRawHeader("Accept", CBOR_MIME_TYPE)
# If there's a request message, serialize it and set the Content-Type
# header:
if message_to_serialize is not None:
if "data" in kwargs:
raise TypeError(
"Can't use both `message_to_serialize` and `data` "
"as keyword arguments at the same time"
)
kwargs["data"] = dumps(message_to_serialize)
headers.addRawHeader("Content-Type", CBOR_MIME_TYPE)
return self._treq.request(method, url, headers=headers, **kwargs)
class StorageClientGeneral(object):
"""
High-level HTTP APIs that aren't immutable- or mutable-specific.
"""
def __init__(self, client): # type: (StorageClient) -> None
self._client = client
@inlineCallbacks
def get_version(self):
"""
Return the version metadata for the server.
"""
url = self._url("/v1/version")
response = yield self._request("GET", url)
url = self._client.relative_url("/v1/version")
response = yield self._client.request("GET", url)
decoded_response = yield _decode_cbor(response)
returnValue(decoded_response)
@ -148,7 +170,7 @@ class StorageClientImmutables(object):
APIs for interacting with immutables.
"""
def __init__(self, client): # type: (StorageClient) -> None
def __init__(self, client: StorageClient):
self._client = client
@inlineCallbacks
@ -174,18 +196,16 @@ class StorageClientImmutables(object):
Result fires when creating the storage index succeeded, if creating the
storage index failed the result will fire with an exception.
"""
url = self._client._url("/v1/immutable/" + _encode_si(storage_index))
message = dumps(
{"share-numbers": share_numbers, "allocated-size": allocated_size}
)
response = yield self._client._request(
url = self._client.relative_url("/v1/immutable/" + _encode_si(storage_index))
message = {"share-numbers": share_numbers, "allocated-size": allocated_size}
response = yield self._client.request(
"POST",
url,
lease_renew_secret=lease_renew_secret,
lease_cancel_secret=lease_cancel_secret,
upload_secret=upload_secret,
data=message,
headers=Headers({"content-type": ["application/cbor"]}),
message_to_serialize=message,
)
decoded_response = yield _decode_cbor(response)
returnValue(
@ -195,6 +215,27 @@ class StorageClientImmutables(object):
)
)
@inlineCallbacks
def abort_upload(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> Deferred[None]:
"""Abort the upload."""
url = self._client.relative_url(
"/v1/immutable/{}/{}/abort".format(_encode_si(storage_index), share_number)
)
response = yield self._client.request(
"PUT",
url,
upload_secret=upload_secret,
)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,
)
@inlineCallbacks
def write_share_chunk(
self, storage_index, share_number, upload_secret, offset, data
@ -211,10 +252,10 @@ class StorageClientImmutables(object):
whether the _complete_ share (i.e. all chunks, not just this one) has
been uploaded.
"""
url = self._client._url(
url = self._client.relative_url(
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
)
response = yield self._client._request(
response = yield self._client.request(
"PATCH",
url,
upload_secret=upload_secret,
@ -262,10 +303,10 @@ class StorageClientImmutables(object):
the HTTP protocol will be simplified, see
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3777
"""
url = self._client._url(
url = self._client.relative_url(
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
)
response = yield self._client._request(
response = yield self._client.request(
"GET",
url,
headers=Headers(
@ -276,25 +317,69 @@ class StorageClientImmutables(object):
body = yield response.content()
returnValue(body)
else:
raise ClientException(
response.code,
)
raise ClientException(response.code)
@inlineCallbacks
def list_shares(self, storage_index): # type: (bytes,) -> Deferred[Set[int]]
"""
Return the set of shares for a given storage index.
"""
url = self._client._url(
url = self._client.relative_url(
"/v1/immutable/{}/shares".format(_encode_si(storage_index))
)
response = yield self._client._request(
response = yield self._client.request(
"GET",
url,
)
if response.code == http.OK:
body = yield _decode_cbor(response)
returnValue(set(body))
else:
raise ClientException(response.code)
@inlineCallbacks
def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
):
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
url = self._client.relative_url(
"/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
lease_cancel_secret=cancel_secret,
)
if response.code == http.NO_CONTENT:
return
else:
raise ClientException(response.code)
@inlineCallbacks
def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
"""Indicate a share has been corrupted, with a human-readable message."""
assert isinstance(reason, str)
url = self._client.relative_url(
"/v1/immutable/{}/{}/corrupt".format(
_encode_si(storage_index), share_number
)
)
message = {"reason": reason}
response = yield self._client.request("POST", url, message_to_serialize=message)
if response.code == http.OK:
return
else:
raise ClientException(
response.code,

View File

@ -1,15 +1,26 @@
"""
Common HTTP infrastructure for the storge server.
"""
from future.utils import PY2
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
from enum import Enum
from base64 import b64encode
from typing import Optional
from werkzeug.http import parse_options_header
from twisted.web.http_headers import Headers
CBOR_MIME_TYPE = "application/cbor"
def get_content_type(headers: Headers) -> Optional[str]:
"""
Get the content type from the HTTP ``Content-Type`` header.
Returns ``None`` if no content-type was set.
"""
values = headers.getRawHeaders("content-type") or [None]
content_type = parse_options_header(values[0])[0] or None
return content_type
def swissnum_auth_header(swissnum): # type: (bytes) -> bytes

View File

@ -2,36 +2,32 @@
HTTP server for storage.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
else:
from typing import Dict, List, Set
from typing import Dict, List, Set, Tuple, Any
from functools import wraps
from base64 import b64decode
import binascii
from klein import Klein
from twisted.web import http
import attr
from werkzeug.http import parse_range_header, parse_content_range_header
from werkzeug.http import (
parse_range_header,
parse_content_range_header,
parse_accept_header,
)
from werkzeug.routing import BaseConverter, ValidationError
from werkzeug.datastructures import ContentRange
# TODO Make sure to use pure Python versions?
from cbor2 import dumps, loads
from .server import StorageServer
from .http_common import swissnum_auth_header, Secrets
from .http_common import swissnum_auth_header, Secrets, get_content_type, CBOR_MIME_TYPE
from .common import si_a2b
from .immutable import BucketWriter
from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet
class ClientSecretsException(Exception):
@ -128,10 +124,95 @@ class StorageIndexUploads(object):
"""
# Map share number to BucketWriter
shares = attr.ib() # type: Dict[int,BucketWriter]
shares = attr.ib(factory=dict) # type: Dict[int,BucketWriter]
# The upload key.
upload_secret = attr.ib() # type: bytes
# Map share number to the upload secret (different shares might have
# different upload secrets).
upload_secrets = attr.ib(factory=dict) # type: Dict[int,bytes]
@attr.s
class UploadsInProgress(object):
"""
Keep track of uploads for storage indexes.
"""
# Map storage index to corresponding uploads-in-progress
_uploads = attr.ib(type=Dict[bytes, StorageIndexUploads], factory=dict)
# Map BucketWriter to (storage index, share number)
_bucketwriters = attr.ib(type=Dict[BucketWriter, Tuple[bytes, int]], factory=dict)
def add_write_bucket(
self,
storage_index: bytes,
share_number: int,
upload_secret: bytes,
bucket: BucketWriter,
):
"""Add a new ``BucketWriter`` to be tracked."""
si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
si_uploads.shares[share_number] = bucket
si_uploads.upload_secrets[share_number] = upload_secret
self._bucketwriters[bucket] = (storage_index, share_number)
def get_write_bucket(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> BucketWriter:
"""Get the given in-progress immutable share upload."""
self.validate_upload_secret(storage_index, share_number, upload_secret)
try:
return self._uploads[storage_index].shares[share_number]
except (KeyError, IndexError):
raise _HTTPError(http.NOT_FOUND)
def remove_write_bucket(self, bucket: BucketWriter):
"""Stop tracking the given ``BucketWriter``."""
storage_index, share_number = self._bucketwriters.pop(bucket)
uploads_index = self._uploads[storage_index]
uploads_index.shares.pop(share_number)
uploads_index.upload_secrets.pop(share_number)
if not uploads_index.shares:
self._uploads.pop(storage_index)
def validate_upload_secret(
self, storage_index: bytes, share_number: int, upload_secret: bytes
):
"""
Raise an unauthorized-HTTP-response exception if the given
storage_index+share_number have a different upload secret than the
given one.
If the given upload doesn't exist at all, nothing happens.
"""
if storage_index in self._uploads:
in_progress = self._uploads[storage_index]
# For pre-existing upload, make sure password matches.
if share_number in in_progress.upload_secrets and not timing_safe_compare(
in_progress.upload_secrets[share_number], upload_secret
):
raise _HTTPError(http.UNAUTHORIZED)
class StorageIndexConverter(BaseConverter):
"""Parser/validator for storage index URL path segments."""
regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}"
def to_python(self, value):
try:
return si_a2b(value.encode("ascii"))
except (AssertionError, binascii.Error, ValueError):
raise ValidationError("Invalid storage index")
class _HTTPError(Exception):
"""
Raise from ``HTTPServer`` endpoint to return the given HTTP response code.
"""
def __init__(self, code: int):
self.code = code
class HTTPServer(object):
@ -140,6 +221,13 @@ class HTTPServer(object):
"""
_app = Klein()
_app.url_map.converters["storage_index"] = StorageIndexConverter
@_app.handle_errors(_HTTPError)
def _http_error(self, request, failure):
"""Handle ``_HTTPError`` exceptions."""
request.setResponseCode(failure.value.code)
return b""
def __init__(
self, storage_server, swissnum
@ -147,102 +235,157 @@ class HTTPServer(object):
self._storage_server = storage_server
self._swissnum = swissnum
# Maps storage index to StorageIndexUploads:
self._uploads = {} # type: Dict[bytes,StorageIndexUploads]
self._uploads = UploadsInProgress()
# When an upload finishes successfully, gets aborted, or times out,
# make sure it gets removed from our tracking datastructure:
self._storage_server.register_bucket_writer_close_handler(
self._uploads.remove_write_bucket
)
def get_resource(self):
"""Return twisted.web ``Resource`` for this object."""
return self._app.resource()
def _cbor(self, request, data):
"""Return CBOR-encoded data."""
# TODO Might want to optionally send JSON someday, based on Accept
# headers, see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
request.setHeader("Content-Type", "application/cbor")
# TODO if data is big, maybe want to use a temporary file eventually...
return dumps(data)
def _send_encoded(self, request, data):
"""
Return encoded data suitable for writing as the HTTP body response, by
default using CBOR.
Also sets the appropriate ``Content-Type`` header on the response.
"""
accept_headers = request.requestHeaders.getRawHeaders("accept") or [
CBOR_MIME_TYPE
]
accept = parse_accept_header(accept_headers[0])
if accept.best == CBOR_MIME_TYPE:
request.setHeader("Content-Type", CBOR_MIME_TYPE)
# TODO if data is big, maybe want to use a temporary file eventually...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
return dumps(data)
else:
# TODO Might want to optionally send JSON someday:
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE)
def _read_encoded(self, request) -> Any:
"""
Read encoded request body data, decoding it with CBOR by default.
"""
content_type = get_content_type(request.requestHeaders)
if content_type == CBOR_MIME_TYPE:
# TODO limit memory usage, client could send arbitrarily large data...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
return loads(request.content.read())
else:
raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
##### Generic APIs #####
@_authorized_route(_app, set(), "/v1/version", methods=["GET"])
def version(self, request, authorization):
"""Return version information."""
return self._cbor(request, self._storage_server.get_version())
return self._send_encoded(request, self._storage_server.get_version())
##### Immutable APIs #####
@_authorized_route(
_app,
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD},
"/v1/immutable/<string:storage_index>",
"/v1/immutable/<storage_index:storage_index>",
methods=["POST"],
)
def allocate_buckets(self, request, authorization, storage_index):
"""Allocate buckets."""
storage_index = si_a2b(storage_index.encode("ascii"))
info = loads(request.content.read())
upload_secret = authorization[Secrets.UPLOAD]
info = self._read_encoded(request)
if storage_index in self._uploads:
# Pre-existing upload.
in_progress = self._uploads[storage_index]
if timing_safe_compare(in_progress.upload_secret, upload_secret):
# Same session.
# TODO add BucketWriters only for new shares that don't already have buckets; see the HTTP spec for details.
# The backend code may already implement this logic.
pass
else:
# TODO Fail, since the secret doesnt match.
pass
else:
# New upload.
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
storage_index,
renew_secret=authorization[Secrets.LEASE_RENEW],
cancel_secret=authorization[Secrets.LEASE_CANCEL],
sharenums=info["share-numbers"],
allocated_size=info["allocated-size"],
)
self._uploads[storage_index] = StorageIndexUploads(
shares=sharenum_to_bucket, upload_secret=authorization[Secrets.UPLOAD]
)
return self._cbor(
request,
{
"already-have": set(already_got),
"allocated": set(sharenum_to_bucket),
},
# We do NOT validate the upload secret for existing bucket uploads.
# Another upload may be happening in parallel, with a different upload
# key. That's fine! If a client tries to _write_ to that upload, they
# need to have an upload key. That does mean we leak the existence of
# these parallel uploads, but if you know storage index you can
# download them once upload finishes, so it's not a big deal to leak
# that information.
already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
storage_index,
renew_secret=authorization[Secrets.LEASE_RENEW],
cancel_secret=authorization[Secrets.LEASE_CANCEL],
sharenums=info["share-numbers"],
allocated_size=info["allocated-size"],
)
for share_number, bucket in sharenum_to_bucket.items():
self._uploads.add_write_bucket(
storage_index, share_number, upload_secret, bucket
)
return self._send_encoded(
request,
{
"already-have": set(already_got),
"allocated": set(sharenum_to_bucket),
},
)
@_authorized_route(
_app,
{Secrets.UPLOAD},
"/v1/immutable/<string:storage_index>/<int:share_number>",
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/abort",
methods=["PUT"],
)
def abort_share_upload(self, request, authorization, storage_index, share_number):
"""Abort an in-progress immutable share upload."""
try:
bucket = self._uploads.get_write_bucket(
storage_index, share_number, authorization[Secrets.UPLOAD]
)
except _HTTPError as e:
if e.code == http.NOT_FOUND:
# It may be we've already uploaded this, in which case error
# should be method not allowed (405).
try:
self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
pass
else:
# Already uploaded, so we can't abort.
raise _HTTPError(http.NOT_ALLOWED)
raise
# Abort the upload; this should close it which will eventually result
# in self._uploads.remove_write_bucket() being called.
bucket.abort()
return b""
@_authorized_route(
_app,
{Secrets.UPLOAD},
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
methods=["PATCH"],
)
def write_share_data(self, request, authorization, storage_index, share_number):
"""Write data to an in-progress immutable upload."""
storage_index = si_a2b(storage_index.encode("ascii"))
content_range = parse_content_range_header(request.getHeader("content-range"))
# TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
# 1. Malformed header should result in error 416
# 2. Non-bytes unit should result in error 416
# 3. Missing header means full upload in one request
# 4. Impossible range should resul tin error 416
if content_range is None or content_range.units != "bytes":
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
return b""
offset = content_range.start
# TODO basic checks on validity of start, offset, and content-range in general. also of share_number.
# TODO basic check that body isn't infinite. require content-length? or maybe we should require content-range (it's optional now)? if so, needs to be rflected in protocol spec.
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = request.content.read(content_range.stop - content_range.start + 1)
bucket = self._uploads.get_write_bucket(
storage_index, share_number, authorization[Secrets.UPLOAD]
)
data = request.content.read()
try:
bucket = self._uploads[storage_index].shares[share_number]
except (KeyError, IndexError):
# TODO return 404
raise
finished = bucket.write(offset, data)
# TODO if raises ConflictingWriteError, return HTTP CONFLICT code.
finished = bucket.write(offset, data)
except ConflictingWriteError:
request.setResponseCode(http.CONFLICT)
return b""
if finished:
bucket.close()
@ -253,51 +396,108 @@ class HTTPServer(object):
required = []
for start, end, _ in bucket.required_ranges().ranges():
required.append({"begin": start, "end": end})
return self._cbor(request, {"required": required})
return self._send_encoded(request, {"required": required})
@_authorized_route(
_app,
set(),
"/v1/immutable/<string:storage_index>/shares",
"/v1/immutable/<storage_index:storage_index>/shares",
methods=["GET"],
)
def list_shares(self, request, authorization, storage_index):
"""
List shares for the given storage index.
"""
storage_index = si_a2b(storage_index.encode("ascii"))
share_numbers = list(self._storage_server.get_buckets(storage_index).keys())
return self._cbor(request, share_numbers)
return self._send_encoded(request, share_numbers)
@_authorized_route(
_app,
set(),
"/v1/immutable/<string:storage_index>/<int:share_number>",
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
methods=["GET"],
)
def read_share_chunk(self, request, authorization, storage_index, share_number):
"""Read a chunk for an already uploaded immutable."""
# TODO in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
# 1. basic checks on validity on storage index, share number
# 2. missing range header should have response code 200 and return whole thing
# 3. malformed range header should result in error? or return everything?
# 4. non-bytes range results in error
# 5. ranges make sense semantically (positive, etc.)
# 6. multiple ranges fails with error
# 7. missing end of range means "to the end of share"
storage_index = si_a2b(storage_index.encode("ascii"))
range_header = parse_range_header(request.getHeader("range"))
offset, end = range_header.ranges[0]
assert end != None # TODO support this case
try:
bucket = self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
request.setResponseCode(http.NOT_FOUND)
return b""
# TODO if not found, 404
bucket = self._storage_server.get_buckets(storage_index)[share_number]
if request.getHeader("range") is None:
# Return the whole thing.
start = 0
while True:
# TODO should probably yield to event loop occasionally...
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = bucket.read(start, start + 65536)
if not data:
request.finish()
return
request.write(data)
start += len(data)
range_header = parse_range_header(request.getHeader("range"))
if (
range_header is None
or range_header.units != "bytes"
or len(range_header.ranges) > 1 # more than one range
or range_header.ranges[0][1] is None # range without end
):
request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
return b""
offset, end = range_header.ranges[0]
# TODO limit memory usage
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
data = bucket.read(offset, end - offset)
request.setResponseCode(http.PARTIAL_CONTENT)
# TODO set content-range on response. We we need to expand the
# BucketReader interface to return share's length.
#
# request.setHeader(
# "content-range", range_header.make_content_range(share_length).to_header()
# )
if len(data):
# For empty bodies the content-range header makes no sense since
# the end of the range is inclusive.
request.setHeader(
"content-range",
ContentRange("bytes", offset, offset + len(data)).to_header(),
)
return data
@_authorized_route(
_app,
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL},
"/v1/lease/<storage_index:storage_index>",
methods=["PUT"],
)
def add_or_renew_lease(self, request, authorization, storage_index):
"""Update the lease for an immutable share."""
if not self._storage_server.get_buckets(storage_index):
raise _HTTPError(http.NOT_FOUND)
# Checking of the renewal secret is done by the backend.
self._storage_server.add_lease(
storage_index,
authorization[Secrets.LEASE_RENEW],
authorization[Secrets.LEASE_CANCEL],
)
request.setResponseCode(http.NO_CONTENT)
return b""
@_authorized_route(
_app,
set(),
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share(self, request, authorization, storage_index, share_number):
"""Indicate that given share is corrupt, with a text reason."""
try:
bucket = self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request)
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
return b""

View File

@ -743,8 +743,9 @@ class StorageServer(service.MultiService):
def advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
# This is a remote API, I believe, so this has to be bytes for legacy
# protocol backwards compatibility reasons.
# Previously this had to be bytes for legacy protocol backwards
# compatibility reasons. Now that Foolscap layer has been abstracted
# out, we can probably refactor this to be unicode...
assert isinstance(share_type, bytes)
assert isinstance(reason, bytes), "%r is not bytes" % (reason,)
@ -777,7 +778,7 @@ class StorageServer(service.MultiService):
si_s,
shnum,
)
with open(report_path, "w") as f:
with open(report_path, "w", encoding="utf-8") as f:
f.write(report)
return None

View File

@ -58,7 +58,7 @@ from twisted.plugin import (
from eliot import (
log_call,
)
from foolscap.api import eventually
from foolscap.api import eventually, RemoteException
from foolscap.reconnector import (
ReconnectionInfo,
)
@ -75,7 +75,10 @@ from allmydata.util.observer import ObserverList
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
from allmydata.storage.http_client import StorageClient, StorageClientImmutables
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException
)
# who is responsible for de-duplication?
@ -1035,8 +1038,13 @@ class _FakeRemoteReference(object):
"""
local_object = attr.ib(type=object)
@defer.inlineCallbacks
def callRemote(self, action, *args, **kwargs):
return getattr(self.local_object, action)(*args, **kwargs)
try:
result = yield getattr(self.local_object, action)(*args, **kwargs)
defer.returnValue(result)
except HTTPClientException as e:
raise RemoteException(e.args)
@attr.s
@ -1051,7 +1059,8 @@ class _HTTPBucketWriter(object):
finished = attr.ib(type=bool, default=False)
def abort(self):
pass # TODO in later ticket
return self.client.abort_upload(self.storage_index, self.share_number,
self.upload_secret)
@defer.inlineCallbacks
def write(self, offset, data):
@ -1085,7 +1094,10 @@ class _HTTPBucketReader(object):
)
def advise_corrupt_share(self, reason):
pass # TODO in later ticket
return self.client.advise_corrupt_share(
self.storage_index, self.share_number,
str(reason, "utf-8", errors="backslashreplace")
)
# WORK IN PROGRESS, for now it doesn't actually implement whole thing.
@ -1105,7 +1117,7 @@ class _HTTPStorageServer(object):
return _HTTPStorageServer(http_client=http_client)
def get_version(self):
return self._http_client.get_version()
return StorageClientGeneral(self._http_client).get_version()
@defer.inlineCallbacks
def allocate_buckets(
@ -1115,7 +1127,7 @@ class _HTTPStorageServer(object):
cancel_secret,
sharenums,
allocated_size,
canary,
canary
):
upload_secret = urandom(20)
immutable_client = StorageClientImmutables(self._http_client)
@ -1139,7 +1151,7 @@ class _HTTPStorageServer(object):
@defer.inlineCallbacks
def get_buckets(
self,
storage_index,
storage_index
):
immutable_client = StorageClientImmutables(self._http_client)
share_numbers = yield immutable_client.list_shares(
@ -1151,3 +1163,29 @@ class _HTTPStorageServer(object):
))
for share_num in share_numbers
})
def add_lease(
self,
storage_index,
renew_secret,
cancel_secret
):
immutable_client = StorageClientImmutables(self._http_client)
return immutable_client.add_or_renew_lease(
storage_index, renew_secret, cancel_secret
)
def advise_corrupt_share(
self,
share_type,
storage_index,
shnum,
reason: bytes
):
if share_type == b"immutable":
imm_client = StorageClientImmutables(self._http_client)
return imm_client.advise_corrupt_share(
storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
)
else:
raise NotImplementedError() # future tickets

View File

@ -176,8 +176,9 @@ class IStorageServerImmutableAPIsTestsMixin(object):
canary=Referenceable(),
)
# Bucket 1 is fully written in one go.
yield allocated[0].callRemote("write", 0, b"1" * 1024)
# Bucket 1 get some data written (but not all, or HTTP implicitly
# finishes the upload)
yield allocated[0].callRemote("write", 0, b"1" * 1023)
# Disconnect or abort, depending on the test:
yield abort_or_disconnect(allocated[0])
@ -193,20 +194,6 @@ class IStorageServerImmutableAPIsTestsMixin(object):
)
yield allocated[0].callRemote("write", 0, b"2" * 1024)
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol should skip this test, since disconnection is meaningless
concept; this is more about testing implicit contract the Foolscap
implementation depends on doesn't change as we refactor things.
"""
return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
@inlineCallbacks
def test_written_shares_are_allocated(self):
"""
@ -1061,13 +1048,6 @@ class _SharedMixin(SystemTestMixin):
AsyncTestCase.tearDown(self)
yield SystemTestMixin.tearDown(self)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
raise NotImplementedError("implement in subclass")
class _FoolscapMixin(_SharedMixin):
"""Run tests on Foolscap version of ``IStorageServer``."""
@ -1080,16 +1060,6 @@ class _FoolscapMixin(_SharedMixin):
self.assertTrue(IStorageServer.providedBy(client))
return succeed(client)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_client
yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server()
assert self.storage_client is not current
class _HTTPMixin(_SharedMixin):
"""Run tests on the HTTP version of ``IStorageServer``."""
@ -1148,27 +1118,37 @@ class FoolscapImmutableAPIsTests(
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol doesn't need this test, since disconnection is a
meaningless concept; this is more about testing the implicit contract
the Foolscap implementation depends on doesn't change as we refactor
things.
"""
return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_client
yield self.bounce_client(0)
self.storage_client = self._get_native_server().get_storage_server()
assert self.storage_client is not current
class HTTPImmutableAPIsTests(
_HTTPMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
):
"""HTTP-specific tests for immutable ``IStorageServer`` APIs."""
# These will start passing in future PRs as HTTP protocol is implemented.
SKIP_TESTS = {
"test_abort",
"test_add_lease_renewal",
"test_add_new_lease",
"test_advise_corrupt_share",
"test_allocate_buckets_repeat",
"test_bucket_advise_corrupt_share",
"test_disconnection",
"test_get_buckets_skips_unfinished_buckets",
"test_matching_overlapping_writes",
"test_non_matching_overlapping_writes",
"test_written_shares_are_allocated",
}
class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase

View File

@ -203,10 +203,10 @@ class BinTahoe(common_util.SignalMixin, unittest.TestCase):
# but on Windows we parse the whole command line string ourselves so
# we have to have our own implementation of skipping these options.
# -t is a harmless option that warns about tabs so we can add it
# -B is a harmless option that prevents writing bytecode so we can add it
# without impacting other behavior noticably.
out, err, returncode = run_bintahoe([u"--version"], python_options=[u"-t"])
self.assertEqual(returncode, 0)
out, err, returncode = run_bintahoe([u"--version"], python_options=[u"-B"])
self.assertEqual(returncode, 0, f"Out:\n{out}\nErr:\n{err}")
self.assertTrue(out.startswith(allmydata.__appname__ + '/'))
def test_help_eliot_destinations(self):

View File

@ -17,7 +17,7 @@ from allmydata.util import pollmixin
import allmydata.test.common_util as testutil
class FasterMonitor(CPUUsageMonitor):
POLL_INTERVAL = 0.1
POLL_INTERVAL = 0.01
class CPUUsage(unittest.TestCase, pollmixin.PollMixin, testutil.StallMixin):
@ -36,9 +36,9 @@ class CPUUsage(unittest.TestCase, pollmixin.PollMixin, testutil.StallMixin):
def _poller():
return bool(len(m.samples) == m.HISTORY_LENGTH+1)
d = self.poll(_poller)
# pause one more second, to make sure that the history-trimming code
# is exercised
d.addCallback(self.stall, 1.0)
# pause a couple more intervals, to make sure that the history-trimming
# code is exercised
d.addCallback(self.stall, FasterMonitor.POLL_INTERVAL * 2)
def _check(res):
s = m.get_stats()
self.failUnless("cpu_monitor.1min_avg" in s)

View File

@ -15,6 +15,7 @@ if PY2:
# fmt: on
from base64 import b64encode
from contextlib import contextmanager
from os import urandom
from hypothesis import assume, given, strategies as st
@ -24,6 +25,10 @@ from klein import Klein
from hyperlink import DecodedURL
from collections_extended import RangeMap
from twisted.internet.task import Clock
from twisted.web import http
from twisted.web.http_headers import Headers
from werkzeug import routing
from werkzeug.exceptions import NotFound as WNotFound
from .common import SyncTestCase
from ..storage.server import StorageServer
@ -33,6 +38,7 @@ from ..storage.http_server import (
Secrets,
ClientSecretsException,
_authorized_route,
StorageIndexConverter,
)
from ..storage.http_client import (
StorageClient,
@ -40,7 +46,30 @@ from ..storage.http_client import (
StorageClientImmutables,
ImmutableCreateResult,
UploadProgress,
StorageClientGeneral,
_encode_si,
)
from ..storage.http_common import get_content_type
from ..storage.common import si_b2a
class HTTPUtilities(SyncTestCase):
"""Tests for HTTP common utilities."""
def test_get_content_type(self):
"""``get_content_type()`` extracts the content-type from the header."""
def assert_header_values_result(values, expected_content_type):
headers = Headers()
if values:
headers.setRawHeaders("Content-Type", values)
content_type = get_content_type(headers)
self.assertEqual(content_type, expected_content_type)
assert_header_values_result(["text/html"], "text/html")
assert_header_values_result([], None)
assert_header_values_result(["text/plain", "application/json"], "text/plain")
assert_header_values_result(["text/html;encoding=utf-8"], "text/html")
def _post_process(params):
@ -147,6 +176,52 @@ class ExtractSecretsTests(SyncTestCase):
_extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW})
class RouteConverterTests(SyncTestCase):
"""Tests for custom werkzeug path segment converters."""
adapter = routing.Map(
[
routing.Rule(
"/<storage_index:storage_index>/", endpoint="si", methods=["GET"]
)
],
converters={"storage_index": StorageIndexConverter},
).bind("example.com", "/")
@given(storage_index=st.binary(min_size=16, max_size=16))
def test_good_storage_index_is_parsed(self, storage_index):
"""
A valid storage index is accepted and parsed back out by
StorageIndexConverter.
"""
self.assertEqual(
self.adapter.match(
"/{}/".format(str(si_b2a(storage_index), "ascii")), method="GET"
),
("si", {"storage_index": storage_index}),
)
def test_long_storage_index_is_not_parsed(self):
"""An overly long storage_index string is not parsed."""
with self.assertRaises(WNotFound):
self.adapter.match("/{}/".format("a" * 27), method="GET")
def test_short_storage_index_is_not_parsed(self):
"""An overly short storage_index string is not parsed."""
with self.assertRaises(WNotFound):
self.adapter.match("/{}/".format("a" * 25), method="GET")
def test_bad_characters_storage_index_is_not_parsed(self):
"""A storage_index string with bad characters is not parsed."""
with self.assertRaises(WNotFound):
self.adapter.match("/{}_/".format("a" * 25), method="GET")
def test_invalid_storage_index_is_not_parsed(self):
"""An invalid storage_index string is not parsed."""
with self.assertRaises(WNotFound):
self.adapter.match("/nomd2a65ylxjbqzsw7gcfh4ivr/", method="GET")
# TODO should be actual swissnum
SWISSNUM_FOR_TEST = b"abcd"
@ -207,7 +282,7 @@ class RoutingTests(SyncTestCase):
"""
# Without secret, get a 400 error.
response = result_of(
self.client._request(
self.client.request(
"GET",
"http://127.0.0.1/upload_secret",
)
@ -216,7 +291,7 @@ class RoutingTests(SyncTestCase):
# With secret, we're good.
response = result_of(
self.client._request(
self.client.request(
"GET", "http://127.0.0.1/upload_secret", upload_secret=b"MAGIC"
)
)
@ -244,6 +319,38 @@ class HttpTestFixture(Fixture):
)
class StorageClientWithHeadersOverride(object):
"""Wrap ``StorageClient`` and override sent headers."""
def __init__(self, storage_client, add_headers):
self.storage_client = storage_client
self.add_headers = add_headers
def __getattr__(self, attr):
return getattr(self.storage_client, attr)
def request(self, *args, headers=None, **kwargs):
if headers is None:
headers = Headers()
for key, value in self.add_headers.items():
headers.setRawHeaders(key, [value])
return self.storage_client.request(*args, headers=headers, **kwargs)
@contextmanager
def assert_fails_with_http_code(test_case: SyncTestCase, code: int):
"""
Context manager that asserts the code fails with the given HTTP response
code.
"""
with test_case.assertRaises(ClientException) as e:
try:
yield
finally:
pass
test_case.assertEqual(e.exception.code, code)
class GenericHTTPAPITests(SyncTestCase):
"""
Tests of HTTP client talking to the HTTP server, for generic HTTP API
@ -261,14 +368,26 @@ class GenericHTTPAPITests(SyncTestCase):
If the wrong swissnum is used, an ``Unauthorized`` response code is
returned.
"""
client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"something wrong",
treq=StubTreq(self.http.http_server.get_resource()),
client = StorageClientGeneral(
StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"something wrong",
treq=StubTreq(self.http.http_server.get_resource()),
)
)
with self.assertRaises(ClientException) as e:
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(client.get_version())
def test_unsupported_mime_type(self):
"""
The client can request mime types other than CBOR, and if they are
unsupported a NOT ACCEPTABLE (406) error will be returned.
"""
client = StorageClientGeneral(
StorageClientWithHeadersOverride(self.http.client, {"accept": "image/gif"})
)
with assert_fails_with_http_code(self, http.NOT_ACCEPTABLE):
result_of(client.get_version())
self.assertEqual(e.exception.args[0], 401)
def test_version(self):
"""
@ -277,7 +396,8 @@ class GenericHTTPAPITests(SyncTestCase):
We ignore available disk space and max immutable share size, since that
might change across calls.
"""
version = result_of(self.http.client.get_version())
client = StorageClientGeneral(self.http.client)
version = result_of(client.get_version())
version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
b"available-space"
)
@ -304,6 +424,28 @@ class ImmutableHTTPAPITests(SyncTestCase):
self.skipTest("Not going to bother supporting Python 2")
super(ImmutableHTTPAPITests, self).setUp()
self.http = self.useFixture(HttpTestFixture())
self.imm_client = StorageClientImmutables(self.http.client)
def create_upload(self, share_numbers, length):
"""
Create a write bucket on server, return:
(upload_secret, lease_secret, storage_index, result)
"""
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = urandom(16)
created = result_of(
self.imm_client.create(
storage_index,
share_numbers,
length,
upload_secret,
lease_secret,
lease_secret,
)
)
return (upload_secret, lease_secret, storage_index, created)
def test_upload_can_be_downloaded(self):
"""
@ -315,19 +457,10 @@ class ImmutableHTTPAPITests(SyncTestCase):
that's already done in test_storage.py.
"""
length = 100
expected_data = b"".join(bytes([i]) for i in range(100))
im_client = StorageClientImmutables(self.http.client)
expected_data = bytes(range(100))
# Create a upload:
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = b"".join(bytes([i]) for i in range(16))
created = result_of(
im_client.create(
storage_index, {1}, 100, upload_secret, lease_secret, lease_secret
)
)
(upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
self.assertEqual(
created, ImmutableCreateResult(already_have=set(), allocated={1})
)
@ -338,7 +471,7 @@ class ImmutableHTTPAPITests(SyncTestCase):
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
def write(offset, length):
remaining.empty(offset, offset + length)
return im_client.write_share_chunk(
return self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
@ -382,31 +515,111 @@ class ImmutableHTTPAPITests(SyncTestCase):
# We can now read:
for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
downloaded = result_of(
im_client.read_share_chunk(storage_index, 1, offset, length)
self.imm_client.read_share_chunk(storage_index, 1, offset, length)
)
self.assertEqual(downloaded, expected_data[offset : offset + length])
def test_write_with_wrong_upload_key(self):
"""
A write with an upload key that is different than the original upload
key will fail.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret + b"X",
0,
b"123",
)
)
def test_allocate_buckets_second_time_different_shares(self):
"""
If allocate buckets endpoint is called second time with different
upload key on potentially different shares, that creates the buckets on
those shares that are different.
"""
# Create a upload:
(upload_secret, lease_secret, storage_index, created) = self.create_upload(
{1, 2, 3}, 100
)
# Write half of share 1
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"a" * 50,
)
)
# Add same shares with a different upload key share 1 overlaps with
# existing shares, this call shouldn't overwrite the existing
# work-in-progress.
upload_secret2 = b"x" * 2
created2 = result_of(
self.imm_client.create(
storage_index,
{1, 4, 6},
100,
upload_secret2,
lease_secret,
lease_secret,
)
)
self.assertEqual(created2.allocated, {4, 6})
# Write second half of share 1
self.assertTrue(
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
50,
b"b" * 50,
)
).finished
)
# The upload of share 1 succeeded, demonstrating that second create()
# call didn't overwrite work-in-progress.
downloaded = result_of(
self.imm_client.read_share_chunk(storage_index, 1, 0, 100)
)
self.assertEqual(downloaded, b"a" * 50 + b"b" * 50)
# We can successfully upload the shares created with the second upload secret.
self.assertTrue(
result_of(
self.imm_client.write_share_chunk(
storage_index,
4,
upload_secret2,
0,
b"x" * 100,
)
).finished
)
def test_list_shares(self):
"""
Once a share is finished uploading, it's possible to list it.
"""
im_client = StorageClientImmutables(self.http.client)
upload_secret = urandom(32)
lease_secret = urandom(32)
storage_index = b"".join(bytes([i]) for i in range(16))
result_of(
im_client.create(
storage_index, {1, 2, 3}, 10, upload_secret, lease_secret, lease_secret
)
)
(upload_secret, _, storage_index, created) = self.create_upload({1, 2, 3}, 10)
# Initially there are no shares:
self.assertEqual(result_of(im_client.list_shares(storage_index)), set())
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set())
# Upload shares 1 and 3:
for share_number in [1, 3]:
progress = result_of(
im_client.write_share_chunk(
self.imm_client.write_share_chunk(
storage_index,
share_number,
upload_secret,
@ -417,87 +630,491 @@ class ImmutableHTTPAPITests(SyncTestCase):
self.assertTrue(progress.finished)
# Now shares 1 and 3 exist:
self.assertEqual(result_of(im_client.list_shares(storage_index)), {1, 3})
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), {1, 3})
def test_upload_bad_content_range(self):
"""
Malformed or invalid Content-Range headers to the immutable upload
endpoint result in a 416 error.
"""
(upload_secret, _, storage_index, created) = self.create_upload({1}, 10)
def check_invalid(bad_content_range_value):
client = StorageClientImmutables(
StorageClientWithHeadersOverride(
self.http.client, {"content-range": bad_content_range_value}
)
)
with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE
):
result_of(
client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"0123456789",
)
)
check_invalid("not a valid content-range header at all")
check_invalid("bytes -1-9/10")
check_invalid("bytes 0--9/10")
check_invalid("teapots 0-9/10")
def test_list_shares_unknown_storage_index(self):
"""
Listing unknown storage index's shares results in empty list of shares.
"""
im_client = StorageClientImmutables(self.http.client)
storage_index = b"".join(bytes([i]) for i in range(16))
self.assertEqual(result_of(im_client.list_shares(storage_index)), set())
storage_index = bytes(range(16))
self.assertEqual(result_of(self.imm_client.list_shares(storage_index)), set())
def test_upload_non_existent_storage_index(self):
"""
Uploading to a non-existent storage index or share number results in
404.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 10)
def unknown_check(storage_index, share_number):
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.write_share_chunk(
storage_index,
share_number,
upload_secret,
0,
b"0123456789",
)
)
# Wrong share number:
unknown_check(storage_index, 7)
# Wrong storage index:
unknown_check(b"X" * 16, 7)
def test_multiple_shares_uploaded_to_different_place(self):
"""
If a storage index has multiple shares, uploads to different shares are
stored separately and can be downloaded separately.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
def test_bucket_allocated_with_new_shares(self):
"""
If some shares already exist, allocating shares indicates only the new
ones were created.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
def test_bucket_allocation_new_upload_secret(self):
"""
If a bucket was allocated with one upload secret, and a different upload
key is used to allocate the bucket again, the second allocation fails.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
def test_upload_with_wrong_upload_secret_fails(self):
"""
Uploading with a key that doesn't match the one used to allocate the
bucket will fail.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
def test_upload_offset_cannot_be_negative(self):
"""
A negative upload offset will be rejected.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
(upload_secret, _, storage_index, _) = self.create_upload({1, 2}, 10)
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"1" * 10,
)
)
result_of(
self.imm_client.write_share_chunk(
storage_index,
2,
upload_secret,
0,
b"2" * 10,
)
)
self.assertEqual(
result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)),
b"1" * 10,
)
self.assertEqual(
result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)),
b"2" * 10,
)
def test_mismatching_upload_fails(self):
"""
If an uploaded chunk conflicts with an already uploaded chunk, a
CONFLICT error is returned.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
(upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
# Write:
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"0" * 10,
)
)
# Conflicting write:
with assert_fails_with_http_code(self, http.CONFLICT):
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"0123456789",
)
)
def upload(self, share_number, data_length=26):
"""
Create a share, return (storage_index, uploaded_data).
"""
uploaded_data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[
:data_length
]
(upload_secret, _, storage_index, _) = self.create_upload(
{share_number}, data_length
)
result_of(
self.imm_client.write_share_chunk(
storage_index,
share_number,
upload_secret,
0,
uploaded_data,
)
)
return storage_index, uploaded_data
def test_read_of_wrong_storage_index_fails(self):
"""
Reading from unknown storage index results in 404.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.read_share_chunk(
b"1" * 16,
1,
0,
10,
)
)
def test_read_of_wrong_share_number_fails(self):
"""
Reading from unknown storage index results in 404.
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
"""
storage_index, _ = self.upload(1)
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.read_share_chunk(
storage_index,
7, # different share number
0,
10,
)
)
def test_read_with_negative_offset_fails(self):
"""
The offset for reads cannot be negative.
Malformed or unsupported Range headers result in 416 (requested range
not satisfiable) error.
"""
storage_index, _ = self.upload(1)
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
def check_bad_range(bad_range_value):
client = StorageClientImmutables(
StorageClientWithHeadersOverride(
self.http.client, {"range": bad_range_value}
)
)
with assert_fails_with_http_code(
self, http.REQUESTED_RANGE_NOT_SATISFIABLE
):
result_of(
client.read_share_chunk(
storage_index,
1,
0,
10,
)
)
# Bad unit
check_bad_range("molluscs=0-9")
# Negative offsets
check_bad_range("bytes=-2-9")
check_bad_range("bytes=0--10")
# Negative offset no endpoint
check_bad_range("bytes=-300-")
check_bad_range("bytes=")
# Multiple ranges are currently unsupported, even if they're
# semantically valid under HTTP:
check_bad_range("bytes=0-5, 6-7")
# Ranges without an end are currently unsupported, even if they're
# semantically valid under HTTP.
check_bad_range("bytes=0-")
@given(data_length=st.integers(min_value=1, max_value=300000))
def test_read_with_no_range(self, data_length):
"""
A read with no range returns the whole immutable.
"""
storage_index, uploaded_data = self.upload(1, data_length)
response = result_of(
self.http.client.request(
"GET",
self.http.client.relative_url(
"/v1/immutable/{}/1".format(_encode_si(storage_index))
),
)
)
self.assertEqual(response.code, http.OK)
self.assertEqual(result_of(response.content()), uploaded_data)
def test_validate_content_range_response_to_read(self):
"""
The server responds to ranged reads with an appropriate Content-Range
header.
"""
storage_index, _ = self.upload(1, 26)
def check_range(requested_range, expected_response):
headers = Headers()
headers.setRawHeaders("range", [requested_range])
response = result_of(
self.http.client.request(
"GET",
self.http.client.relative_url(
"/v1/immutable/{}/1".format(_encode_si(storage_index))
),
headers=headers,
)
)
self.assertEqual(
response.headers.getRawHeaders("content-range"), [expected_response]
)
check_range("bytes=0-10", "bytes 0-10/*")
# Can't go beyond the end of the immutable!
check_range("bytes=10-100", "bytes 10-25/*")
def test_timed_out_upload_allows_reupload(self):
"""
If an in-progress upload times out, it is cancelled altogether,
allowing a new upload to occur.
"""
self._test_abort_or_timed_out_upload_to_existing_storage_index(
lambda **kwargs: self.http.clock.advance(30 * 60 + 1)
)
def test_abort_upload_allows_reupload(self):
"""
If an in-progress upload is aborted, it is cancelled altogether,
allowing a new upload to occur.
"""
def test_read_with_negative_length_fails(self):
"""
The length for reads cannot be negative.
def abort(storage_index, share_number, upload_secret):
return result_of(
self.imm_client.abort_upload(storage_index, share_number, upload_secret)
)
TBD in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3860
self._test_abort_or_timed_out_upload_to_existing_storage_index(abort)
def _test_abort_or_timed_out_upload_to_existing_storage_index(self, cancel_upload):
"""Start uploading to an existing storage index that then times out or aborts.
Re-uploading should work.
"""
# Start an upload:
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"123",
)
)
# Now, the upload is cancelled somehow:
cancel_upload(
storage_index=storage_index, upload_secret=upload_secret, share_number=1
)
# Now we can create a new share with the same storage index without
# complaint:
upload_secret = urandom(32)
lease_secret = urandom(32)
created = result_of(
self.imm_client.create(
storage_index,
{1},
100,
upload_secret,
lease_secret,
lease_secret,
)
)
self.assertEqual(created.allocated, {1})
# And write to it, too:
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"ABC",
)
)
def test_unknown_aborts(self):
"""
Aborting uploads with an unknown storage index or share number will
result 404 HTTP response code.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
for si, num in [(storage_index, 3), (b"x" * 16, 1)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.imm_client.abort_upload(si, num, upload_secret))
def test_unauthorized_abort(self):
"""
An abort with the wrong key will return an unauthorized error, and will
not abort the upload.
"""
(upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
# Failed to abort becaues wrong upload secret:
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
result_of(
self.imm_client.abort_upload(storage_index, 1, upload_secret + b"X")
)
# We can still write to it:
result_of(
self.imm_client.write_share_chunk(
storage_index,
1,
upload_secret,
0,
b"ABC",
)
)
def test_too_late_abort(self):
"""
An abort of an already-fully-uploaded immutable will result in 405
error and will not affect the immutable.
"""
uploaded_data = b"123"
(upload_secret, _, storage_index, _) = self.create_upload({0}, 3)
result_of(
self.imm_client.write_share_chunk(
storage_index,
0,
upload_secret,
0,
uploaded_data,
)
)
# Can't abort, we finished upload:
with assert_fails_with_http_code(self, http.NOT_ALLOWED):
result_of(self.imm_client.abort_upload(storage_index, 0, upload_secret))
# Abort didn't prevent reading:
self.assertEqual(
uploaded_data,
result_of(
self.imm_client.read_share_chunk(
storage_index,
0,
0,
3,
)
),
)
def test_lease_renew_and_add(self):
"""
It's possible the renew the lease on an uploaded immutable, by using
the same renewal secret, or add a new lease by choosing a different
renewal secret.
"""
# Create immutable:
(upload_secret, lease_secret, storage_index, _) = self.create_upload({0}, 100)
result_of(
self.imm_client.write_share_chunk(
storage_index,
0,
upload_secret,
0,
b"A" * 100,
)
)
[lease] = self.http.storage_server.get_leases(storage_index)
initial_expiration_time = lease.get_expiration_time()
# Time passes:
self.http.clock.advance(167)
# We renew the lease:
result_of(
self.imm_client.add_or_renew_lease(
storage_index, lease_secret, lease_secret
)
)
# More time passes:
self.http.clock.advance(10)
# We create a new lease:
lease_secret2 = urandom(32)
result_of(
self.imm_client.add_or_renew_lease(
storage_index, lease_secret2, lease_secret2
)
)
[lease1, lease2] = self.http.storage_server.get_leases(storage_index)
self.assertEqual(lease1.get_expiration_time(), initial_expiration_time + 167)
self.assertEqual(lease2.get_expiration_time(), initial_expiration_time + 177)
def test_lease_on_unknown_storage_index(self):
"""
An attempt to renew an unknown storage index will result in a HTTP 404.
"""
storage_index = urandom(16)
secret = b"A" * 32
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(self.imm_client.add_or_renew_lease(storage_index, secret, secret))
def test_advise_corrupt_share(self):
"""
Advising share was corrupted succeeds from HTTP client's perspective,
and calls appropriate method on server.
"""
corrupted = []
self.http.storage_server.advise_corrupt_share = lambda *args: corrupted.append(
args
)
storage_index, _ = self.upload(13)
reason = "OHNO \u1235"
result_of(self.imm_client.advise_corrupt_share(storage_index, 13, reason))
self.assertEqual(
corrupted, [(b"immutable", storage_index, 13, reason.encode("utf-8"))]
)
def test_advise_corrupt_share_unknown(self):
"""
Advising an unknown share was corrupted results in 404.
"""
storage_index, _ = self.upload(13)
reason = "OHNO \u1235"
result_of(self.imm_client.advise_corrupt_share(storage_index, 13, reason))
for (si, share_number) in [(storage_index, 11), (urandom(16), 13)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
result_of(
self.imm_client.advise_corrupt_share(si, share_number, reason)
)

View File

@ -73,6 +73,7 @@ let
in
# Make a derivation that runs the unit test suite.
pkgs.runCommand "tahoe-lafs-tests" { } ''
export TAHOE_LAFS_HYPOTHESIS_PROFILE=ci
${python-env}/bin/python -m twisted.trial -j $NIX_BUILD_CORES allmydata
# It's not cool to put the whole _trial_temp into $out because it has weird

53
tox.ini
View File

@ -7,17 +7,19 @@
# the tox-gh-actions package.
[gh-actions]
python =
2.7: py27-coverage,codechecks
3.7: py37-coverage,typechecks,codechecks3
3.7: py37-coverage,typechecks,codechecks
3.8: py38-coverage
3.9: py39-coverage
pypy-3.7: pypy3
3.10: py310-coverage
pypy-3.7: pypy37
pypy-3.8: pypy38
pypy-3.9: pypy39
[pytest]
twisted = 1
[tox]
envlist = typechecks,codechecks,codechecks3,py{27,37,38,39}-{coverage},pypy27,pypy3,integration,integration3
envlist = typechecks,codechecks,py{37,38,39,310}-{coverage},pypy27,pypy37,pypy38,pypy39,integration
minversion = 2.4
[testenv]
@ -35,12 +37,10 @@ deps =
# happening at the time. The versions selected here are just the current
# versions at the time. Bumping them to keep up with future releases is
# fine as long as those releases are known to actually work.
#
# For now these are versions that support Python 2.
pip==20.3.4
setuptools==44.1.1
wheel==0.36.2
subunitreporter==19.3.2
pip==22.0.3
setuptools==60.9.1
wheel==0.37.1
subunitreporter==22.2.0
# As an exception, we don't pin certifi because it contains CA
# certificates which necessarily change over time. Pinning this is
# guaranteed to cause things to break eventually as old certificates
@ -89,40 +89,20 @@ commands =
coverage: coverage report
[testenv:integration]
setenv =
COVERAGE_PROCESS_START=.coveragerc
commands =
# NOTE: 'run with "py.test --keep-tempdir -s -v integration/" to debug failures'
py.test --timeout=1800 --coverage -v {posargs:integration}
coverage combine
coverage report
[testenv:integration3]
basepython = python3
platform = mylinux: linux
mymacos: darwin
mywindows: win32
setenv =
COVERAGE_PROCESS_START=.coveragerc
commands =
python --version
# NOTE: 'run with "py.test --keep-tempdir -s -v integration/" to debug failures'
py.test --timeout=1800 --coverage -v {posargs:integration}
coverage combine
coverage report
# Once 2.7 is dropped, this can be removed. It just does flake8 with Python 2
# since that can give different results than flake8 on Python 3.
[testenv:codechecks]
basepython = python2.7
setenv =
# If no positional arguments are given, try to run the checks on the
# entire codebase, including various pieces of supporting code.
DEFAULT_FILES=src integration static misc setup.py
commands =
flake8 {posargs:{env:DEFAULT_FILES}}
[testenv:codechecks3]
basepython = python3
deps =
# Newer versions of PyLint have buggy configuration
@ -224,16 +204,11 @@ commands =
sphinx-build -W -b html -d {toxinidir}/docs/_build/doctrees {toxinidir}/docs {toxinidir}/docs/_build/html
[testenv:pyinstaller]
# We override this to pass --no-use-pep517 because pyinstaller (3.4, at least)
# is broken when this feature is enabled.
install_command = python -m pip install --no-use-pep517 {opts} {packages}
extras =
deps =
{[testenv]deps}
packaging
# PyInstaller 4.0 drops Python 2 support. When we finish porting to
# Python 3 we can reconsider this constraint.
pyinstaller < 4.0
pyinstaller
pefile ; platform_system == "Windows"
# Setting PYTHONHASHSEED to a known value assists with reproducible builds.
# See https://pyinstaller.readthedocs.io/en/stable/advanced-topics.html#creating-a-reproducible-build