remove integration tests/refactoring

This commit is contained in:
meejah 2020-11-06 18:07:30 -07:00
parent f923144478
commit 30b7be6f1d
7 changed files with 161 additions and 832 deletions

View File

@ -36,11 +36,6 @@ from util import (
await_client_ready,
TahoeProcess,
)
from grid import (
create_port_allocator,
create_flog_gatherer,
create_grid,
)
# pytest customization hooks
@ -77,12 +72,6 @@ def reactor():
return _reactor
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:port_allocator", include_result=False)
def port_allocator(reactor):
return create_port_allocator(start_port=45000)
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:temp_dir", include_args=[])
def temp_dir(request):
@ -117,30 +106,127 @@ def flog_binary():
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:flog_gatherer", include_args=[])
def flog_gatherer(reactor, temp_dir, flog_binary, request):
fg = pytest_twisted.blockon(
create_flog_gatherer(reactor, request, temp_dir, flog_binary)
out_protocol = _CollectOutputProtocol()
gather_dir = join(temp_dir, 'flog_gather')
reactor.spawnProcess(
out_protocol,
flog_binary,
(
'flogtool', 'create-gatherer',
'--location', 'tcp:localhost:3117',
'--port', '3117',
gather_dir,
)
return fg
)
pytest_twisted.blockon(out_protocol.done)
twistd_protocol = _MagicTextProtocol("Gatherer waiting at")
twistd_process = reactor.spawnProcess(
twistd_protocol,
which('twistd')[0],
(
'twistd', '--nodaemon', '--python',
join(gather_dir, 'gatherer.tac'),
),
path=gather_dir,
)
pytest_twisted.blockon(twistd_protocol.magic_seen)
def cleanup():
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
flog_file = mktemp('.flog_dump')
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
flog_dir = join(temp_dir, 'flog_gather')
flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
reactor.spawnProcess(
flog_protocol,
flog_binary,
(
'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0])
),
)
print("Waiting for flogtool to complete")
try:
pytest_twisted.blockon(flog_protocol.done)
except ProcessTerminated as e:
print("flogtool exited unexpectedly: {}".format(str(e)))
print("Flogtool completed")
request.addfinalizer(cleanup)
with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
furl = f.read().strip()
return furl
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:grid", include_args=[])
def grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
g = pytest_twisted.blockon(
create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
@log_call(
action_type=u"integration:introducer",
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def introducer(reactor, temp_dir, flog_gatherer, request):
config = '''
[node]
nickname = introducer0
web.port = 4560
log_gatherer.furl = {log_furl}
'''.format(log_furl=flog_gatherer)
intro_dir = join(temp_dir, 'introducer')
print("making introducer", intro_dir)
if not exists(intro_dir):
mkdir(intro_dir)
done_proto = _ProcessExitedProtocol()
_tahoe_runner_optional_coverage(
done_proto,
reactor,
request,
(
'create-introducer',
'--listen=tcp',
'--hostname=localhost',
intro_dir,
),
)
return g
pytest_twisted.blockon(done_proto.done)
# over-write the config file with our stuff
with open(join(intro_dir, 'tahoe.cfg'), 'w') as f:
f.write(config)
@pytest.fixture(scope='session')
def introducer(grid):
return grid.introducer
# on windows, "tahoe start" means: run forever in the foreground,
# but on linux it means daemonize. "tahoe run" is consistent
# between platforms.
protocol = _MagicTextProtocol('introducer running')
transport = _tahoe_runner_optional_coverage(
protocol,
reactor,
request,
(
'run',
intro_dir,
),
)
request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited))
pytest_twisted.blockon(protocol.magic_seen)
return TahoeProcess(transport, intro_dir)
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"])
def introducer_furl(introducer, temp_dir):
return introducer.furl
furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl')
while not exists(furl_fname):
print("Don't see {} yet".format(furl_fname))
sleep(.1)
furl = open(furl_fname, 'r').read()
return furl
@pytest.fixture(scope='session')
@ -219,20 +305,28 @@ def tor_introducer_furl(tor_introducer, temp_dir):
@pytest.fixture(scope='session')
@log_call(
action_type=u"integration:storage_nodes",
include_args=["grid"],
include_args=["temp_dir", "introducer_furl", "flog_gatherer"],
include_result=False,
)
def storage_nodes(grid):
def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request):
nodes_d = []
# start all 5 nodes in parallel
for x in range(5):
#nodes_d.append(grid.add_storage_node())
pytest_twisted.blockon(grid.add_storage_node())
name = 'node{}'.format(x)
web_port= 9990 + x
nodes_d.append(
_create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, name,
web_port="tcp:{}:interface=localhost".format(web_port),
storage=True,
)
)
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
for ok, value in nodes_status:
assert ok, "Storage node creation failed: {}".format(value)
return grid.storage_servers
nodes = []
for ok, process in nodes_status:
assert ok, "Storage node creation failed: {}".format(process)
nodes.append(process)
return nodes
@pytest.fixture(scope='session')

View File

@ -1,480 +0,0 @@
"""
Classes which directly represent various kinds of Tahoe processes
that co-operate to for "a Grid".
These methods and objects are used by conftest.py fixtures but may
also be used as direct helpers for tests that don't want to (or can't)
rely on 'the' global grid as provided by fixtures like 'alice' or
'storage_servers'.
"""
from os import mkdir, listdir, environ
from os.path import join, exists
from tempfile import mkdtemp, mktemp
from eliot import (
log_call,
)
from twisted.python.procutils import which
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
maybeDeferred,
)
from twisted.internet.task import (
deferLater,
)
from twisted.internet.interfaces import (
IProcessTransport,
IProcessProtocol,
IProtocol,
)
from twisted.internet.endpoints import (
TCP4ServerEndpoint,
)
from twisted.internet.protocol import (
Factory,
Protocol,
)
from util import (
_CollectOutputProtocol,
_MagicTextProtocol,
_DumpOutputProtocol,
_ProcessExitedProtocol,
_create_node,
_run_node,
_cleanup_tahoe_process,
_tahoe_runner_optional_coverage,
TahoeProcess,
await_client_ready,
)
import attr
import pytest_twisted
# further directions:
# - "Grid" is unused, basically -- tie into the rest?
# - could make a Grid instance mandatory for create_* calls
# - could instead make create_* calls methods of Grid
# - Bring more 'util' or 'conftest' code into here
# - stop()/start()/restart() methods on StorageServer etc
# - more-complex stuff like config changes (which imply a restart too)?
@attr.s
class FlogGatherer(object):
"""
Flog Gatherer process.
"""
process = attr.ib(
validator=attr.validators.provides(IProcessTransport)
)
protocol = attr.ib(
validator=attr.validators.provides(IProcessProtocol)
)
furl = attr.ib()
@inlineCallbacks
def create_flog_gatherer(reactor, request, temp_dir, flog_binary):
out_protocol = _CollectOutputProtocol()
gather_dir = join(temp_dir, 'flog_gather')
reactor.spawnProcess(
out_protocol,
flog_binary,
(
'flogtool', 'create-gatherer',
'--location', 'tcp:localhost:3117',
'--port', '3117',
gather_dir,
)
)
yield out_protocol.done
twistd_protocol = _MagicTextProtocol("Gatherer waiting at")
twistd_process = reactor.spawnProcess(
twistd_protocol,
which('twistd')[0],
(
'twistd', '--nodaemon', '--python',
join(gather_dir, 'gatherer.tac'),
),
path=gather_dir,
)
yield twistd_protocol.magic_seen
def cleanup():
_cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
flog_file = mktemp('.flog_dump')
flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
flog_dir = join(temp_dir, 'flog_gather')
flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
reactor.spawnProcess(
flog_protocol,
flog_binary,
(
'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0])
),
)
print("Waiting for flogtool to complete")
try:
pytest_twisted.blockon(flog_protocol.done)
except ProcessTerminated as e:
print("flogtool exited unexpectedly: {}".format(str(e)))
print("Flogtool completed")
request.addfinalizer(cleanup)
with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
furl = f.read().strip()
returnValue(
FlogGatherer(
protocol=twistd_protocol,
process=twistd_process,
furl=furl,
)
)
@attr.s
class StorageServer(object):
"""
Represents a Tahoe Storage Server
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=attr.validators.provides(IProcessProtocol)
)
@inlineCallbacks
def restart(self, reactor, request):
"""
re-start our underlying process by issuing a TERM, waiting and
then running again. await_client_ready() will be done as well
Note that self.process and self.protocol will be new instances
after this.
"""
self.process.transport.signalProcess('TERM')
yield self.protocol.exited
self.process = yield _run_node(
reactor, self.process.node_dir, request, None,
)
self.protocol = self.process.transport._protocol
@inlineCallbacks
def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
needed=2, happy=3, total=4):
"""
Create a new storage server
"""
from util import _create_node
node_process = yield _create_node(
reactor, request, temp_dir, introducer.furl, flog_gatherer,
name, web_port, storage=True, needed=needed, happy=happy, total=total,
)
storage = StorageServer(
process=node_process,
protocol=node_process.transport._protocol,
)
returnValue(storage)
@attr.s
class Client(object):
"""
Represents a Tahoe client
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=attr.validators.provides(IProcessProtocol)
)
@inlineCallbacks
def restart(self, reactor, request, servers=1):
"""
re-start our underlying process by issuing a TERM, waiting and
then running again.
:param int servers: number of server connections we will wait
for before being 'ready'
Note that self.process and self.protocol will be new instances
after this.
"""
self.process.transport.signalProcess('TERM')
yield self.protocol.exited
process = yield _run_node(
reactor, self.process.node_dir, request, None,
)
self.process = process
self.protocol = self.process.transport._protocol
# XXX add stop / start / restart
# ...maybe "reconfig" of some kind?
@inlineCallbacks
def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
needed=2, happy=3, total=4):
"""
Create a new storage server
"""
from util import _create_node
node_process = yield _create_node(
reactor, request, temp_dir, introducer.furl, flog_gatherer,
name, web_port, storage=False, needed=needed, happy=happy, total=total,
)
returnValue(
Client(
process=node_process,
protocol=node_process.transport._protocol,
)
)
@attr.s
class Introducer(object):
"""
Reprsents a running introducer
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=attr.validators.provides(IProcessProtocol)
)
furl = attr.ib()
@inlineCallbacks
@log_call(
action_type=u"integration:introducer",
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def create_introducer(reactor, request, temp_dir, flog_gatherer, port):
"""
Run a new Introducer and return an Introducer instance.
"""
config = (
'[node]\n'
'nickname = introducer{port}\n'
'web.port = {port}\n'
'log_gatherer.furl = {log_furl}\n'
).format(
port=port,
log_furl=flog_gatherer.furl,
)
intro_dir = join(temp_dir, 'introducer{}'.format(port))
if not exists(intro_dir):
mkdir(intro_dir)
done_proto = _ProcessExitedProtocol()
_tahoe_runner_optional_coverage(
done_proto,
reactor,
request,
(
'create-introducer',
'--listen=tcp',
'--hostname=localhost',
intro_dir,
),
)
yield done_proto.done
# over-write the config file with our stuff
with open(join(intro_dir, 'tahoe.cfg'), 'w') as f:
f.write(config)
# on windows, "tahoe start" means: run forever in the foreground,
# but on linux it means daemonize. "tahoe run" is consistent
# between platforms.
protocol = _MagicTextProtocol('introducer running')
transport = _tahoe_runner_optional_coverage(
protocol,
reactor,
request,
(
'run',
intro_dir,
),
)
def clean():
return _cleanup_tahoe_process(transport, protocol.exited)
request.addfinalizer(clean)
yield protocol.magic_seen
furl_fname = join(intro_dir, 'private', 'introducer.furl')
while not exists(furl_fname):
print("Don't see {} yet".format(furl_fname))
yield deferLater(reactor, .1, lambda: None)
furl = open(furl_fname, 'r').read()
returnValue(
Introducer(
process=TahoeProcess(transport, intro_dir),
protocol=protocol,
furl=furl,
)
)
@attr.s
class Grid(object):
"""
Represents an entire Tahoe Grid setup
A Grid includes an Introducer, Flog Gatherer and some number of
Storage Servers.
"""
_reactor = attr.ib()
_request = attr.ib()
_temp_dir = attr.ib()
_port_allocator = attr.ib()
introducer = attr.ib()
flog_gatherer = attr.ib()
storage_servers = attr.ib(factory=list)
clients = attr.ib(factory=dict)
@storage_servers.validator
def check(self, attribute, value):
for server in value:
if not isinstance(server, StorageServer):
raise ValueError(
"storage_servers must be StorageServer"
)
@inlineCallbacks
def add_storage_node(self):
"""
Creates a new storage node, returns a StorageServer instance
(which will already be added to our .storage_servers list)
"""
port = yield self._port_allocator()
print("make {}".format(port))
name = 'node{}'.format(port)
web_port = 'tcp:{}:interface=localhost'.format(port)
server = yield create_storage_server(
self._reactor,
self._request,
self._temp_dir,
self.introducer,
self.flog_gatherer,
name,
web_port,
)
self.storage_servers.append(server)
returnValue(server)
@inlineCallbacks
def add_client(self, name, needed=2, happy=3, total=4):
"""
Create a new client node
"""
port = yield self._port_allocator()
web_port = 'tcp:{}:interface=localhost'.format(port)
client = yield create_client(
self._reactor,
self._request,
self._temp_dir,
self.introducer,
self.flog_gatherer,
name,
web_port,
needed=needed,
happy=happy,
total=total,
)
self.clients[name] = client
yield await_client_ready(client.process)
returnValue(client)
# XXX THINK can we tie a whole *grid* to a single request? (I think
# that's all that makes sense)
@inlineCallbacks
def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
"""
intro_port = yield port_allocator()
introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port)
grid = Grid(
reactor,
request,
temp_dir,
port_allocator,
introducer,
flog_gatherer,
)
returnValue(grid)
def create_port_allocator(start_port):
"""
Returns a new port-allocator .. which is a zero-argument function
that returns Deferreds that fire with new, sequential ports
starting at `start_port` skipping any that already appear to have
a listener.
There can still be a race against other processes allocating ports
-- between the time when we check the status of the port and when
our subprocess starts up. This *could* be mitigated by instructing
the OS to not randomly-allocate ports in some range, and then
using that range here (explicitly, ourselves).
NB once we're Python3-only this could be an async-generator
"""
port = [start_port - 1]
# import stays here to not interfere with reactor selection -- but
# maybe this function should be arranged to be called once from a
# fixture (with the reactor)?
from twisted.internet import reactor
class NothingProtocol(Protocol):
"""
I do nothing.
"""
def port_generator():
print("Checking port {}".format(port))
port[0] += 1
ep = TCP4ServerEndpoint(reactor, port[0], interface="localhost")
d = ep.listen(Factory.forProtocol(NothingProtocol))
def good(listening_port):
unlisten_d = maybeDeferred(listening_port.stopListening)
def return_port(_):
return port[0]
unlisten_d.addBoth(return_port)
return unlisten_d
def try_again(fail):
return port_generator()
d.addCallbacks(good, try_again)
return d
return port_generator

View File

@ -1,245 +0,0 @@
import sys
import time
import json
import shutil
from os import mkdir, unlink, listdir, utime
from os.path import join, exists, getmtime
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
)
from allmydata.crypto import ed25519
from allmydata.util import base32
from allmydata.util import configutil
import util
from grid import (
create_grid,
)
import pytest_twisted
@pytest_twisted.inlineCallbacks
def test_create_certificate(reactor, request):
"""
The Grid Manager produces a valid, correctly-signed certificate.
"""
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "create",
)
privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes)
# Note that zara + her key here are arbitrary and don't match any
# "actual" clients in the test-grid; we're just checking that the
# Grid Manager signs this properly.
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdin=gm_config,
)
zara_cert_bytes = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "sign", "zara", "1",
stdin=gm_config,
)
zara_cert = json.loads(zara_cert_bytes)
# confirm that zara's certificate is made by the Grid Manager
# (.verify returns None on success, raises exception on error)
pubkey.verify(
base32.a2b(zara_cert['signature'].encode('ascii')),
zara_cert['certificate'].encode('ascii'),
)
@pytest_twisted.inlineCallbacks
def test_remove_client(reactor, request):
"""
A Grid Manager can add and successfully remove a client
"""
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "create",
)
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdin=gm_config,
)
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "add",
"yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq",
stdin=gm_config,
)
assert "zara" in json.loads(gm_config)['storage_servers']
assert "yakov" in json.loads(gm_config)['storage_servers']
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "remove",
"zara",
stdin=gm_config,
)
assert "zara" not in json.loads(gm_config)['storage_servers']
assert "yakov" in json.loads(gm_config)['storage_servers']
@pytest_twisted.inlineCallbacks
def test_remove_last_client(reactor, request):
"""
A Grid Manager can remove all clients
"""
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "create",
)
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
stdin=gm_config,
)
assert "zara" in json.loads(gm_config)['storage_servers']
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "remove",
"zara",
stdin=gm_config,
)
# there are no storage servers left at all now
assert "storage_servers" not in json.loads(gm_config)
@pytest_twisted.inlineCallbacks
def test_add_remove_client_file(reactor, request, temp_dir):
"""
A Grid Manager can add and successfully remove a client (when
keeping data on disk)
"""
gmconfig = join(temp_dir, "gmtest")
gmconfig_file = join(temp_dir, "gmtest", "config.json")
yield util.run_tahoe(
reactor, request, "grid-manager", "--config", gmconfig, "create",
)
yield util.run_tahoe(
reactor, request, "grid-manager", "--config", gmconfig, "add",
"zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga",
)
yield util.run_tahoe(
reactor, request, "grid-manager", "--config", gmconfig, "add",
"yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq",
)
assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers']
assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers']
yield util.run_tahoe(
reactor, request, "grid-manager", "--config", gmconfig, "remove",
"zara",
)
assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers']
assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers']
@pytest_twisted.inlineCallbacks
def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
A client with happines=2 fails to upload to a Grid when it is
using Grid Manager and there is only 1 storage server with a valid
certificate.
"""
grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
storage0 = yield grid.add_storage_node()
storage1 = yield grid.add_storage_node()
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "create",
)
gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii')
gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes)
# create certificate for the first storage-server
pubkey_fname = join(storage0.process.node_dir, "node.pubkey")
with open(pubkey_fname, 'r') as f:
pubkey_str = f.read().strip()
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "add",
"storage0", pubkey_str,
stdin=gm_config,
)
assert json.loads(gm_config)['storage_servers'].keys() == ['storage0']
print("inserting certificate")
cert = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "sign", "storage0", "1",
stdin=gm_config,
)
yield util.run_tahoe(
reactor, request, "--node-directory", storage0.process.node_dir,
"admin", "add-grid-manager-cert",
"--name", "default",
"--filename", "-",
stdin=cert,
)
# re-start this storage server
yield storage0.restart(reactor, request)
# now only one storage-server has the certificate .. configure
# diana to have the grid-manager certificate
diana = yield grid.add_client("diana", needed=2, happy=2, total=2)
config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg"))
config.add_section("grid_managers")
config.set("grid_managers", "test", ed25519.string_from_verifying_key(gm_pubkey))
with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f:
config.write(f)
yield diana.restart(reactor, request, servers=2)
# try to put something into the grid, which should fail (because
# diana has happy=2 but should only find storage0 to be acceptable
# to upload to)
try:
yield util.run_tahoe(
reactor, request, "--node-directory", diana.process.node_dir,
"put", "-",
stdin="some content\n" * 200,
)
assert False, "Should get a failure"
except util.ProcessFailed as e:
assert 'UploadUnhappinessError' in e.output
@pytest_twisted.inlineCallbacks
def test_identity(reactor, request, temp_dir):
"""
Dump public key to CLI
"""
gm_config = join(temp_dir, "test_identity")
yield util.run_tahoe(
reactor, request, "grid-manager", "--config", gm_config, "create",
)
# ask the CLI for the grid-manager pubkey
pubkey = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", gm_config, "public-identity",
)
alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip())
# load the grid-manager pubkey "ourselves"
with open(join(gm_config, "config.json"), "r") as f:
real_config = json.load(f)
real_privkey, real_pubkey = ed25519.signing_keypair_from_string(
real_config["private_key"].encode("ascii"),
)
# confirm the CLI told us the correct thing
alleged_bytes = alleged_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)
real_bytes = real_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw)
assert alleged_bytes == real_bytes, "Keys don't match"

View File

@ -39,7 +39,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto
try:
yield proto.done
assert False, "should raise exception"
except util.ProcessFailed as e:
assert "UploadUnhappinessError" in e.output
except Exception as e:
assert isinstance(e, ProcessTerminated)
assert "shares could be placed on only" in proto.output.getvalue()
output = proto.output.getvalue()
assert "shares could be placed on only" in output

View File

@ -76,10 +76,7 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
node_dir = join(temp_dir, name)
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
if exists(node_dir):
raise RuntimeError(
"A node already exists in '{}'".format(node_dir)
)
if True:
print("creating", node_dir)
mkdir(node_dir)
proto = util._DumpOutputProtocol(None)

View File

@ -96,7 +96,7 @@ def test_helper_status(storage_nodes):
successfully GET the /helper_status page
"""
url = util.node_url(storage_nodes[0].process.node_dir, "helper_status")
url = util.node_url(storage_nodes[0].node_dir, "helper_status")
resp = requests.get(url)
assert resp.status_code >= 200 and resp.status_code < 300
dom = BeautifulSoup(resp.content, "html5lib")
@ -416,7 +416,7 @@ def test_storage_info(storage_nodes):
storage0 = storage_nodes[0]
requests.get(
util.node_url(storage0.process.node_dir, u"storage"),
util.node_url(storage0.node_dir, u"storage"),
)
@ -427,7 +427,7 @@ def test_storage_info_json(storage_nodes):
storage0 = storage_nodes[0]
resp = requests.get(
util.node_url(storage0.process.node_dir, u"storage"),
util.node_url(storage0.node_dir, u"storage"),
params={u"t": u"json"},
)
data = json.loads(resp.content)
@ -439,12 +439,12 @@ def test_introducer_info(introducer):
retrieve and confirm /introducer URI for the introducer
"""
resp = requests.get(
util.node_url(introducer.process.node_dir, u""),
util.node_url(introducer.node_dir, u""),
)
assert "Introducer" in resp.content
resp = requests.get(
util.node_url(introducer.process.node_dir, u""),
util.node_url(introducer.node_dir, u""),
params={u"t": u"json"},
)
data = json.loads(resp.content)

View File

@ -5,7 +5,6 @@ from os import mkdir
from os.path import exists, join
from six.moves import StringIO
from functools import partial
from shutil import rmtree
from twisted.internet.defer import Deferred, succeed
from twisted.internet.protocol import ProcessProtocol
@ -36,38 +35,15 @@ class _ProcessExitedProtocol(ProcessProtocol):
self.done.callback(None)
class ProcessFailed(Exception):
"""
A subprocess has failed.
:ivar ProcessTerminated reason: the original reason from .processExited
:ivar StringIO output: all stdout and stderr collected to this point.
"""
def __init__(self, reason, output):
self.reason = reason
self.output = output
def __str__(self):
return "<ProcessFailed: {}>:\n{}".format(self.reason, self.output)
class _CollectOutputProtocol(ProcessProtocol):
"""
Internal helper. Collects all output (stdout + stderr) into
self.output, and callback's on done with all of it after the
process exits (for any reason).
"""
def __init__(self, stdin=None):
def __init__(self):
self.done = Deferred()
self.output = StringIO()
self._stdin = stdin
def connectionMade(self):
if self._stdin is not None:
self.transport.write(self._stdin)
self.transport.closeStdin()
def processEnded(self, reason):
if not self.done.called:
@ -75,7 +51,7 @@ class _CollectOutputProtocol(ProcessProtocol):
def processExited(self, reason):
if not isinstance(reason.value, ProcessDone):
self.done.errback(ProcessFailed(reason, self.output.getvalue()))
self.done.errback(reason)
def outReceived(self, data):
self.output.write(data)
@ -147,27 +123,13 @@ def _cleanup_tahoe_process(tahoe_transport, exited):
try:
print("signaling {} with TERM".format(tahoe_transport.pid))
tahoe_transport.signalProcess('TERM')
print("signaled, blocking on exit {}".format(exited))
print("signaled, blocking on exit")
pytest_twisted.blockon(exited)
print("exited, goodbye")
except ProcessExitedAlready:
pass
def run_tahoe(reactor, request, *args, **kwargs):
"""
Helper to run tahoe with optional coverage.
:returns: a Deferred that fires when the command is done (or a
ProcessFailed exception if it exits non-zero)
"""
stdin = kwargs.get("stdin", None)
protocol = _CollectOutputProtocol(stdin=stdin)
process = _tahoe_runner_optional_coverage(protocol, reactor, request, args)
process.exited = protocol.done
return protocol.done
def _tahoe_runner_optional_coverage(proto, reactor, request, other_args):
"""
Internal helper. Calls spawnProcess with `-m
@ -269,7 +231,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
if exists(node_dir):
created_d = succeed(None)
else:
print("creating: {}".format(node_dir))
print("creating", node_dir)
mkdir(node_dir)
done_proto = _ProcessExitedProtocol()
args = [
@ -294,7 +256,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
def created(_):
config_path = join(node_dir, 'tahoe.cfg')
config = get_config(config_path)
set_config(config, 'node', 'log_gatherer.furl', flog_gatherer.furl)
set_config(config, 'node', 'log_gatherer.furl', flog_gatherer)
write_config(config_path, config)
created_d.addCallback(created)
@ -481,7 +443,7 @@ def web_post(tahoe, uri_fragment, **kwargs):
return resp.content
def await_client_ready(tahoe, timeout=10, liveness=60*2, servers=1):
def await_client_ready(tahoe, timeout=10, liveness=60*2):
"""
Uses the status API to wait for a client-type node (in `tahoe`, a
`TahoeProcess` instance usually from a fixture e.g. `alice`) to be
@ -505,8 +467,8 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, servers=1):
time.sleep(1)
continue
if len(js['servers']) < servers:
print("waiting because fewer than {} server(s)".format(servers))
if len(js['servers']) == 0:
print("waiting because no servers at all")
time.sleep(1)
continue
server_times = [