diff --git a/integration/conftest.py b/integration/conftest.py index 1fb3fd761..04e3dcb52 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -36,11 +36,6 @@ from util import ( await_client_ready, TahoeProcess, ) -from grid import ( - create_port_allocator, - create_flog_gatherer, - create_grid, -) # pytest customization hooks @@ -77,12 +72,6 @@ def reactor(): return _reactor -@pytest.fixture(scope='session') -@log_call(action_type=u"integration:port_allocator", include_result=False) -def port_allocator(reactor): - return create_port_allocator(start_port=45000) - - @pytest.fixture(scope='session') @log_call(action_type=u"integration:temp_dir", include_args=[]) def temp_dir(request): @@ -117,30 +106,127 @@ def flog_binary(): @pytest.fixture(scope='session') @log_call(action_type=u"integration:flog_gatherer", include_args=[]) def flog_gatherer(reactor, temp_dir, flog_binary, request): - fg = pytest_twisted.blockon( - create_flog_gatherer(reactor, request, temp_dir, flog_binary) + out_protocol = _CollectOutputProtocol() + gather_dir = join(temp_dir, 'flog_gather') + reactor.spawnProcess( + out_protocol, + flog_binary, + ( + 'flogtool', 'create-gatherer', + '--location', 'tcp:localhost:3117', + '--port', '3117', + gather_dir, + ) ) - return fg + pytest_twisted.blockon(out_protocol.done) + + twistd_protocol = _MagicTextProtocol("Gatherer waiting at") + twistd_process = reactor.spawnProcess( + twistd_protocol, + which('twistd')[0], + ( + 'twistd', '--nodaemon', '--python', + join(gather_dir, 'gatherer.tac'), + ), + path=gather_dir, + ) + pytest_twisted.blockon(twistd_protocol.magic_seen) + + def cleanup(): + _cleanup_tahoe_process(twistd_process, twistd_protocol.exited) + + flog_file = mktemp('.flog_dump') + flog_protocol = _DumpOutputProtocol(open(flog_file, 'w')) + flog_dir = join(temp_dir, 'flog_gather') + flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] + + print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) + reactor.spawnProcess( + flog_protocol, + flog_binary, + ( + 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) + ), + ) + print("Waiting for flogtool to complete") + try: + pytest_twisted.blockon(flog_protocol.done) + except ProcessTerminated as e: + print("flogtool exited unexpectedly: {}".format(str(e))) + print("Flogtool completed") + + request.addfinalizer(cleanup) + + with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f: + furl = f.read().strip() + return furl @pytest.fixture(scope='session') -@log_call(action_type=u"integration:grid", include_args=[]) -def grid(reactor, request, temp_dir, flog_gatherer, port_allocator): - g = pytest_twisted.blockon( - create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) +@log_call( + action_type=u"integration:introducer", + include_args=["temp_dir", "flog_gatherer"], + include_result=False, +) +def introducer(reactor, temp_dir, flog_gatherer, request): + config = ''' +[node] +nickname = introducer0 +web.port = 4560 +log_gatherer.furl = {log_furl} +'''.format(log_furl=flog_gatherer) + + intro_dir = join(temp_dir, 'introducer') + print("making introducer", intro_dir) + + if not exists(intro_dir): + mkdir(intro_dir) + done_proto = _ProcessExitedProtocol() + _tahoe_runner_optional_coverage( + done_proto, + reactor, + request, + ( + 'create-introducer', + '--listen=tcp', + '--hostname=localhost', + intro_dir, + ), + ) + pytest_twisted.blockon(done_proto.done) + + # over-write the config file with our stuff + with open(join(intro_dir, 'tahoe.cfg'), 'w') as f: + f.write(config) + + # on windows, "tahoe start" means: run forever in the foreground, + # but on linux it means daemonize. "tahoe run" is consistent + # between platforms. + protocol = _MagicTextProtocol('introducer running') + transport = _tahoe_runner_optional_coverage( + protocol, + reactor, + request, + ( + 'run', + intro_dir, + ), ) - return g + request.addfinalizer(partial(_cleanup_tahoe_process, transport, protocol.exited)) - -@pytest.fixture(scope='session') -def introducer(grid): - return grid.introducer + pytest_twisted.blockon(protocol.magic_seen) + return TahoeProcess(transport, intro_dir) @pytest.fixture(scope='session') @log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"]) def introducer_furl(introducer, temp_dir): - return introducer.furl + furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl') + while not exists(furl_fname): + print("Don't see {} yet".format(furl_fname)) + sleep(.1) + furl = open(furl_fname, 'r').read() + return furl @pytest.fixture(scope='session') @@ -219,20 +305,28 @@ def tor_introducer_furl(tor_introducer, temp_dir): @pytest.fixture(scope='session') @log_call( action_type=u"integration:storage_nodes", - include_args=["grid"], + include_args=["temp_dir", "introducer_furl", "flog_gatherer"], include_result=False, ) -def storage_nodes(grid): +def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request): nodes_d = [] # start all 5 nodes in parallel for x in range(5): - #nodes_d.append(grid.add_storage_node()) - pytest_twisted.blockon(grid.add_storage_node()) - + name = 'node{}'.format(x) + web_port= 9990 + x + nodes_d.append( + _create_node( + reactor, request, temp_dir, introducer_furl, flog_gatherer, name, + web_port="tcp:{}:interface=localhost".format(web_port), + storage=True, + ) + ) nodes_status = pytest_twisted.blockon(DeferredList(nodes_d)) - for ok, value in nodes_status: - assert ok, "Storage node creation failed: {}".format(value) - return grid.storage_servers + nodes = [] + for ok, process in nodes_status: + assert ok, "Storage node creation failed: {}".format(process) + nodes.append(process) + return nodes @pytest.fixture(scope='session') diff --git a/integration/grid.py b/integration/grid.py deleted file mode 100644 index f39f04d3c..000000000 --- a/integration/grid.py +++ /dev/null @@ -1,480 +0,0 @@ -""" -Classes which directly represent various kinds of Tahoe processes -that co-operate to for "a Grid". - -These methods and objects are used by conftest.py fixtures but may -also be used as direct helpers for tests that don't want to (or can't) -rely on 'the' global grid as provided by fixtures like 'alice' or -'storage_servers'. -""" - -from os import mkdir, listdir, environ -from os.path import join, exists -from tempfile import mkdtemp, mktemp - -from eliot import ( - log_call, -) - -from twisted.python.procutils import which -from twisted.internet.defer import ( - inlineCallbacks, - returnValue, - maybeDeferred, -) -from twisted.internet.task import ( - deferLater, -) -from twisted.internet.interfaces import ( - IProcessTransport, - IProcessProtocol, - IProtocol, -) -from twisted.internet.endpoints import ( - TCP4ServerEndpoint, -) -from twisted.internet.protocol import ( - Factory, - Protocol, -) - -from util import ( - _CollectOutputProtocol, - _MagicTextProtocol, - _DumpOutputProtocol, - _ProcessExitedProtocol, - _create_node, - _run_node, - _cleanup_tahoe_process, - _tahoe_runner_optional_coverage, - TahoeProcess, - await_client_ready, -) - -import attr -import pytest_twisted - - -# further directions: -# - "Grid" is unused, basically -- tie into the rest? -# - could make a Grid instance mandatory for create_* calls -# - could instead make create_* calls methods of Grid -# - Bring more 'util' or 'conftest' code into here -# - stop()/start()/restart() methods on StorageServer etc -# - more-complex stuff like config changes (which imply a restart too)? - - -@attr.s -class FlogGatherer(object): - """ - Flog Gatherer process. - """ - - process = attr.ib( - validator=attr.validators.provides(IProcessTransport) - ) - protocol = attr.ib( - validator=attr.validators.provides(IProcessProtocol) - ) - furl = attr.ib() - - -@inlineCallbacks -def create_flog_gatherer(reactor, request, temp_dir, flog_binary): - out_protocol = _CollectOutputProtocol() - gather_dir = join(temp_dir, 'flog_gather') - reactor.spawnProcess( - out_protocol, - flog_binary, - ( - 'flogtool', 'create-gatherer', - '--location', 'tcp:localhost:3117', - '--port', '3117', - gather_dir, - ) - ) - yield out_protocol.done - - twistd_protocol = _MagicTextProtocol("Gatherer waiting at") - twistd_process = reactor.spawnProcess( - twistd_protocol, - which('twistd')[0], - ( - 'twistd', '--nodaemon', '--python', - join(gather_dir, 'gatherer.tac'), - ), - path=gather_dir, - ) - yield twistd_protocol.magic_seen - - def cleanup(): - _cleanup_tahoe_process(twistd_process, twistd_protocol.exited) - - flog_file = mktemp('.flog_dump') - flog_protocol = _DumpOutputProtocol(open(flog_file, 'w')) - flog_dir = join(temp_dir, 'flog_gather') - flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')] - - print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file)) - reactor.spawnProcess( - flog_protocol, - flog_binary, - ( - 'flogtool', 'dump', join(temp_dir, 'flog_gather', flogs[0]) - ), - ) - print("Waiting for flogtool to complete") - try: - pytest_twisted.blockon(flog_protocol.done) - except ProcessTerminated as e: - print("flogtool exited unexpectedly: {}".format(str(e))) - print("Flogtool completed") - - request.addfinalizer(cleanup) - - with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f: - furl = f.read().strip() - returnValue( - FlogGatherer( - protocol=twistd_protocol, - process=twistd_process, - furl=furl, - ) - ) - - -@attr.s -class StorageServer(object): - """ - Represents a Tahoe Storage Server - """ - - process = attr.ib( - validator=attr.validators.instance_of(TahoeProcess) - ) - protocol = attr.ib( - validator=attr.validators.provides(IProcessProtocol) - ) - - @inlineCallbacks - def restart(self, reactor, request): - """ - re-start our underlying process by issuing a TERM, waiting and - then running again. await_client_ready() will be done as well - - Note that self.process and self.protocol will be new instances - after this. - """ - self.process.transport.signalProcess('TERM') - yield self.protocol.exited - self.process = yield _run_node( - reactor, self.process.node_dir, request, None, - ) - self.protocol = self.process.transport._protocol - - -@inlineCallbacks -def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port, - needed=2, happy=3, total=4): - """ - Create a new storage server - """ - from util import _create_node - node_process = yield _create_node( - reactor, request, temp_dir, introducer.furl, flog_gatherer, - name, web_port, storage=True, needed=needed, happy=happy, total=total, - ) - storage = StorageServer( - process=node_process, - protocol=node_process.transport._protocol, - ) - returnValue(storage) - - -@attr.s -class Client(object): - """ - Represents a Tahoe client - """ - - process = attr.ib( - validator=attr.validators.instance_of(TahoeProcess) - ) - protocol = attr.ib( - validator=attr.validators.provides(IProcessProtocol) - ) - - @inlineCallbacks - def restart(self, reactor, request, servers=1): - """ - re-start our underlying process by issuing a TERM, waiting and - then running again. - - :param int servers: number of server connections we will wait - for before being 'ready' - - Note that self.process and self.protocol will be new instances - after this. - """ - self.process.transport.signalProcess('TERM') - yield self.protocol.exited - process = yield _run_node( - reactor, self.process.node_dir, request, None, - ) - self.process = process - self.protocol = self.process.transport._protocol - - - # XXX add stop / start / restart - # ...maybe "reconfig" of some kind? - - -@inlineCallbacks -def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port, - needed=2, happy=3, total=4): - """ - Create a new storage server - """ - from util import _create_node - node_process = yield _create_node( - reactor, request, temp_dir, introducer.furl, flog_gatherer, - name, web_port, storage=False, needed=needed, happy=happy, total=total, - ) - returnValue( - Client( - process=node_process, - protocol=node_process.transport._protocol, - ) - ) - - -@attr.s -class Introducer(object): - """ - Reprsents a running introducer - """ - - process = attr.ib( - validator=attr.validators.instance_of(TahoeProcess) - ) - protocol = attr.ib( - validator=attr.validators.provides(IProcessProtocol) - ) - furl = attr.ib() - - -@inlineCallbacks -@log_call( - action_type=u"integration:introducer", - include_args=["temp_dir", "flog_gatherer"], - include_result=False, -) -def create_introducer(reactor, request, temp_dir, flog_gatherer, port): - """ - Run a new Introducer and return an Introducer instance. - """ - config = ( - '[node]\n' - 'nickname = introducer{port}\n' - 'web.port = {port}\n' - 'log_gatherer.furl = {log_furl}\n' - ).format( - port=port, - log_furl=flog_gatherer.furl, - ) - - intro_dir = join(temp_dir, 'introducer{}'.format(port)) - - if not exists(intro_dir): - mkdir(intro_dir) - done_proto = _ProcessExitedProtocol() - _tahoe_runner_optional_coverage( - done_proto, - reactor, - request, - ( - 'create-introducer', - '--listen=tcp', - '--hostname=localhost', - intro_dir, - ), - ) - yield done_proto.done - - # over-write the config file with our stuff - with open(join(intro_dir, 'tahoe.cfg'), 'w') as f: - f.write(config) - - # on windows, "tahoe start" means: run forever in the foreground, - # but on linux it means daemonize. "tahoe run" is consistent - # between platforms. - protocol = _MagicTextProtocol('introducer running') - transport = _tahoe_runner_optional_coverage( - protocol, - reactor, - request, - ( - 'run', - intro_dir, - ), - ) - - def clean(): - return _cleanup_tahoe_process(transport, protocol.exited) - request.addfinalizer(clean) - - yield protocol.magic_seen - - furl_fname = join(intro_dir, 'private', 'introducer.furl') - while not exists(furl_fname): - print("Don't see {} yet".format(furl_fname)) - yield deferLater(reactor, .1, lambda: None) - furl = open(furl_fname, 'r').read() - - returnValue( - Introducer( - process=TahoeProcess(transport, intro_dir), - protocol=protocol, - furl=furl, - ) - ) - - -@attr.s -class Grid(object): - """ - Represents an entire Tahoe Grid setup - - A Grid includes an Introducer, Flog Gatherer and some number of - Storage Servers. - """ - - _reactor = attr.ib() - _request = attr.ib() - _temp_dir = attr.ib() - _port_allocator = attr.ib() - introducer = attr.ib() - flog_gatherer = attr.ib() - storage_servers = attr.ib(factory=list) - clients = attr.ib(factory=dict) - - @storage_servers.validator - def check(self, attribute, value): - for server in value: - if not isinstance(server, StorageServer): - raise ValueError( - "storage_servers must be StorageServer" - ) - - @inlineCallbacks - def add_storage_node(self): - """ - Creates a new storage node, returns a StorageServer instance - (which will already be added to our .storage_servers list) - """ - port = yield self._port_allocator() - print("make {}".format(port)) - name = 'node{}'.format(port) - web_port = 'tcp:{}:interface=localhost'.format(port) - server = yield create_storage_server( - self._reactor, - self._request, - self._temp_dir, - self.introducer, - self.flog_gatherer, - name, - web_port, - ) - self.storage_servers.append(server) - returnValue(server) - - @inlineCallbacks - def add_client(self, name, needed=2, happy=3, total=4): - """ - Create a new client node - """ - port = yield self._port_allocator() - web_port = 'tcp:{}:interface=localhost'.format(port) - client = yield create_client( - self._reactor, - self._request, - self._temp_dir, - self.introducer, - self.flog_gatherer, - name, - web_port, - needed=needed, - happy=happy, - total=total, - ) - self.clients[name] = client - yield await_client_ready(client.process) - returnValue(client) - - - -# XXX THINK can we tie a whole *grid* to a single request? (I think -# that's all that makes sense) -@inlineCallbacks -def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator): - """ - """ - intro_port = yield port_allocator() - introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port) - grid = Grid( - reactor, - request, - temp_dir, - port_allocator, - introducer, - flog_gatherer, - ) - returnValue(grid) - - -def create_port_allocator(start_port): - """ - Returns a new port-allocator .. which is a zero-argument function - that returns Deferreds that fire with new, sequential ports - starting at `start_port` skipping any that already appear to have - a listener. - - There can still be a race against other processes allocating ports - -- between the time when we check the status of the port and when - our subprocess starts up. This *could* be mitigated by instructing - the OS to not randomly-allocate ports in some range, and then - using that range here (explicitly, ourselves). - - NB once we're Python3-only this could be an async-generator - """ - port = [start_port - 1] - - # import stays here to not interfere with reactor selection -- but - # maybe this function should be arranged to be called once from a - # fixture (with the reactor)? - from twisted.internet import reactor - - class NothingProtocol(Protocol): - """ - I do nothing. - """ - - def port_generator(): - print("Checking port {}".format(port)) - port[0] += 1 - ep = TCP4ServerEndpoint(reactor, port[0], interface="localhost") - d = ep.listen(Factory.forProtocol(NothingProtocol)) - - def good(listening_port): - unlisten_d = maybeDeferred(listening_port.stopListening) - def return_port(_): - return port[0] - unlisten_d.addBoth(return_port) - return unlisten_d - - def try_again(fail): - return port_generator() - - d.addCallbacks(good, try_again) - return d - return port_generator diff --git a/integration/test_grid_manager.py b/integration/test_grid_manager.py deleted file mode 100644 index db93f17c7..000000000 --- a/integration/test_grid_manager.py +++ /dev/null @@ -1,245 +0,0 @@ -import sys -import time -import json -import shutil -from os import mkdir, unlink, listdir, utime -from os.path import join, exists, getmtime - -from cryptography.hazmat.primitives.serialization import ( - Encoding, - PublicFormat, -) - -from allmydata.crypto import ed25519 -from allmydata.util import base32 -from allmydata.util import configutil - -import util -from grid import ( - create_grid, -) - -import pytest_twisted - - -@pytest_twisted.inlineCallbacks -def test_create_certificate(reactor, request): - """ - The Grid Manager produces a valid, correctly-signed certificate. - """ - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "create", - ) - privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') - privkey, pubkey = ed25519.signing_keypair_from_string(privkey_bytes) - - # Note that zara + her key here are arbitrary and don't match any - # "actual" clients in the test-grid; we're just checking that the - # Grid Manager signs this properly. - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "add", - "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", - stdin=gm_config, - ) - zara_cert_bytes = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "sign", "zara", "1", - stdin=gm_config, - ) - zara_cert = json.loads(zara_cert_bytes) - - # confirm that zara's certificate is made by the Grid Manager - # (.verify returns None on success, raises exception on error) - pubkey.verify( - base32.a2b(zara_cert['signature'].encode('ascii')), - zara_cert['certificate'].encode('ascii'), - ) - - -@pytest_twisted.inlineCallbacks -def test_remove_client(reactor, request): - """ - A Grid Manager can add and successfully remove a client - """ - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "create", - ) - - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "add", - "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", - stdin=gm_config, - ) - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "add", - "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", - stdin=gm_config, - ) - assert "zara" in json.loads(gm_config)['storage_servers'] - assert "yakov" in json.loads(gm_config)['storage_servers'] - - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "remove", - "zara", - stdin=gm_config, - ) - assert "zara" not in json.loads(gm_config)['storage_servers'] - assert "yakov" in json.loads(gm_config)['storage_servers'] - - -@pytest_twisted.inlineCallbacks -def test_remove_last_client(reactor, request): - """ - A Grid Manager can remove all clients - """ - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "create", - ) - - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "add", - "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", - stdin=gm_config, - ) - assert "zara" in json.loads(gm_config)['storage_servers'] - - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "remove", - "zara", - stdin=gm_config, - ) - # there are no storage servers left at all now - assert "storage_servers" not in json.loads(gm_config) - - -@pytest_twisted.inlineCallbacks -def test_add_remove_client_file(reactor, request, temp_dir): - """ - A Grid Manager can add and successfully remove a client (when - keeping data on disk) - """ - gmconfig = join(temp_dir, "gmtest") - gmconfig_file = join(temp_dir, "gmtest", "config.json") - yield util.run_tahoe( - reactor, request, "grid-manager", "--config", gmconfig, "create", - ) - - yield util.run_tahoe( - reactor, request, "grid-manager", "--config", gmconfig, "add", - "zara", "pub-v0-kzug3ut2m7ziihf3ndpqlquuxeie4foyl36wn54myqc4wmiwe4ga", - ) - yield util.run_tahoe( - reactor, request, "grid-manager", "--config", gmconfig, "add", - "yakov", "pub-v0-kvxhb3nexybmipkrar2ztfrwp4uxxsmrjzkpzafit3ket4u5yldq", - ) - assert "zara" in json.load(open(gmconfig_file, "r"))['storage_servers'] - assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] - - yield util.run_tahoe( - reactor, request, "grid-manager", "--config", gmconfig, "remove", - "zara", - ) - assert "zara" not in json.load(open(gmconfig_file, "r"))['storage_servers'] - assert "yakov" in json.load(open(gmconfig_file, "r"))['storage_servers'] - - -@pytest_twisted.inlineCallbacks -def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator): - """ - A client with happines=2 fails to upload to a Grid when it is - using Grid Manager and there is only 1 storage server with a valid - certificate. - """ - grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator) - storage0 = yield grid.add_storage_node() - storage1 = yield grid.add_storage_node() - - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "create", - ) - gm_privkey_bytes = json.loads(gm_config)['private_key'].encode('ascii') - gm_privkey, gm_pubkey = ed25519.signing_keypair_from_string(gm_privkey_bytes) - - # create certificate for the first storage-server - pubkey_fname = join(storage0.process.node_dir, "node.pubkey") - with open(pubkey_fname, 'r') as f: - pubkey_str = f.read().strip() - - gm_config = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "add", - "storage0", pubkey_str, - stdin=gm_config, - ) - assert json.loads(gm_config)['storage_servers'].keys() == ['storage0'] - - print("inserting certificate") - cert = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", "-", "sign", "storage0", "1", - stdin=gm_config, - ) - - yield util.run_tahoe( - reactor, request, "--node-directory", storage0.process.node_dir, - "admin", "add-grid-manager-cert", - "--name", "default", - "--filename", "-", - stdin=cert, - ) - - # re-start this storage server - yield storage0.restart(reactor, request) - - # now only one storage-server has the certificate .. configure - # diana to have the grid-manager certificate - - diana = yield grid.add_client("diana", needed=2, happy=2, total=2) - - config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg")) - config.add_section("grid_managers") - config.set("grid_managers", "test", ed25519.string_from_verifying_key(gm_pubkey)) - with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f: - config.write(f) - - yield diana.restart(reactor, request, servers=2) - - # try to put something into the grid, which should fail (because - # diana has happy=2 but should only find storage0 to be acceptable - # to upload to) - - try: - yield util.run_tahoe( - reactor, request, "--node-directory", diana.process.node_dir, - "put", "-", - stdin="some content\n" * 200, - ) - assert False, "Should get a failure" - except util.ProcessFailed as e: - assert 'UploadUnhappinessError' in e.output - - -@pytest_twisted.inlineCallbacks -def test_identity(reactor, request, temp_dir): - """ - Dump public key to CLI - """ - gm_config = join(temp_dir, "test_identity") - yield util.run_tahoe( - reactor, request, "grid-manager", "--config", gm_config, "create", - ) - - # ask the CLI for the grid-manager pubkey - pubkey = yield util.run_tahoe( - reactor, request, "grid-manager", "--config", gm_config, "public-identity", - ) - alleged_pubkey = ed25519.verifying_key_from_string(pubkey.strip()) - - # load the grid-manager pubkey "ourselves" - with open(join(gm_config, "config.json"), "r") as f: - real_config = json.load(f) - real_privkey, real_pubkey = ed25519.signing_keypair_from_string( - real_config["private_key"].encode("ascii"), - ) - - # confirm the CLI told us the correct thing - alleged_bytes = alleged_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) - real_bytes = real_pubkey.public_bytes(Encoding.Raw, PublicFormat.Raw) - assert alleged_bytes == real_bytes, "Keys don't match" diff --git a/integration/test_servers_of_happiness.py b/integration/test_servers_of_happiness.py index d98bfe0b8..e5e4eb565 100644 --- a/integration/test_servers_of_happiness.py +++ b/integration/test_servers_of_happiness.py @@ -39,7 +39,8 @@ def test_upload_immutable(reactor, temp_dir, introducer_furl, flog_gatherer, sto try: yield proto.done assert False, "should raise exception" - except util.ProcessFailed as e: - assert "UploadUnhappinessError" in e.output + except Exception as e: + assert isinstance(e, ProcessTerminated) - assert "shares could be placed on only" in proto.output.getvalue() + output = proto.output.getvalue() + assert "shares could be placed on only" in output diff --git a/integration/test_tor.py b/integration/test_tor.py index 2423ef7d8..28360207a 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -76,28 +76,25 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_ node_dir = join(temp_dir, name) web_port = "tcp:{}:interface=localhost".format(control_port + 2000) - if exists(node_dir): - raise RuntimeError( - "A node already exists in '{}'".format(node_dir) + if True: + print("creating", node_dir) + mkdir(node_dir) + proto = util._DumpOutputProtocol(None) + reactor.spawnProcess( + proto, + sys.executable, + ( + sys.executable, '-m', 'allmydata.scripts.runner', + 'create-node', + '--nickname', name, + '--introducer', introducer_furl, + '--hide-ip', + '--tor-control-port', 'tcp:localhost:{}'.format(control_port), + '--listen', 'tor', + node_dir, + ) ) - print("creating", node_dir) - mkdir(node_dir) - proto = util._DumpOutputProtocol(None) - reactor.spawnProcess( - proto, - sys.executable, - ( - sys.executable, '-m', 'allmydata.scripts.runner', - 'create-node', - '--nickname', name, - '--introducer', introducer_furl, - '--hide-ip', - '--tor-control-port', 'tcp:localhost:{}'.format(control_port), - '--listen', 'tor', - node_dir, - ) - ) - yield proto.done + yield proto.done with open(join(node_dir, 'tahoe.cfg'), 'w') as f: f.write(''' diff --git a/integration/test_web.py b/integration/test_web.py index 36a7d3757..575e4fc1a 100644 --- a/integration/test_web.py +++ b/integration/test_web.py @@ -96,7 +96,7 @@ def test_helper_status(storage_nodes): successfully GET the /helper_status page """ - url = util.node_url(storage_nodes[0].process.node_dir, "helper_status") + url = util.node_url(storage_nodes[0].node_dir, "helper_status") resp = requests.get(url) assert resp.status_code >= 200 and resp.status_code < 300 dom = BeautifulSoup(resp.content, "html5lib") @@ -416,7 +416,7 @@ def test_storage_info(storage_nodes): storage0 = storage_nodes[0] requests.get( - util.node_url(storage0.process.node_dir, u"storage"), + util.node_url(storage0.node_dir, u"storage"), ) @@ -427,7 +427,7 @@ def test_storage_info_json(storage_nodes): storage0 = storage_nodes[0] resp = requests.get( - util.node_url(storage0.process.node_dir, u"storage"), + util.node_url(storage0.node_dir, u"storage"), params={u"t": u"json"}, ) data = json.loads(resp.content) @@ -439,12 +439,12 @@ def test_introducer_info(introducer): retrieve and confirm /introducer URI for the introducer """ resp = requests.get( - util.node_url(introducer.process.node_dir, u""), + util.node_url(introducer.node_dir, u""), ) assert "Introducer" in resp.content resp = requests.get( - util.node_url(introducer.process.node_dir, u""), + util.node_url(introducer.node_dir, u""), params={u"t": u"json"}, ) data = json.loads(resp.content) diff --git a/integration/util.py b/integration/util.py index 5cad59c12..bbcf5efc6 100644 --- a/integration/util.py +++ b/integration/util.py @@ -5,7 +5,6 @@ from os import mkdir from os.path import exists, join from six.moves import StringIO from functools import partial -from shutil import rmtree from twisted.internet.defer import Deferred, succeed from twisted.internet.protocol import ProcessProtocol @@ -36,38 +35,15 @@ class _ProcessExitedProtocol(ProcessProtocol): self.done.callback(None) -class ProcessFailed(Exception): - """ - A subprocess has failed. - - :ivar ProcessTerminated reason: the original reason from .processExited - - :ivar StringIO output: all stdout and stderr collected to this point. - """ - - def __init__(self, reason, output): - self.reason = reason - self.output = output - - def __str__(self): - return ":\n{}".format(self.reason, self.output) - - class _CollectOutputProtocol(ProcessProtocol): """ Internal helper. Collects all output (stdout + stderr) into self.output, and callback's on done with all of it after the process exits (for any reason). """ - def __init__(self, stdin=None): + def __init__(self): self.done = Deferred() self.output = StringIO() - self._stdin = stdin - - def connectionMade(self): - if self._stdin is not None: - self.transport.write(self._stdin) - self.transport.closeStdin() def processEnded(self, reason): if not self.done.called: @@ -75,7 +51,7 @@ class _CollectOutputProtocol(ProcessProtocol): def processExited(self, reason): if not isinstance(reason.value, ProcessDone): - self.done.errback(ProcessFailed(reason, self.output.getvalue())) + self.done.errback(reason) def outReceived(self, data): self.output.write(data) @@ -147,27 +123,13 @@ def _cleanup_tahoe_process(tahoe_transport, exited): try: print("signaling {} with TERM".format(tahoe_transport.pid)) tahoe_transport.signalProcess('TERM') - print("signaled, blocking on exit {}".format(exited)) + print("signaled, blocking on exit") pytest_twisted.blockon(exited) print("exited, goodbye") except ProcessExitedAlready: pass -def run_tahoe(reactor, request, *args, **kwargs): - """ - Helper to run tahoe with optional coverage. - - :returns: a Deferred that fires when the command is done (or a - ProcessFailed exception if it exits non-zero) - """ - stdin = kwargs.get("stdin", None) - protocol = _CollectOutputProtocol(stdin=stdin) - process = _tahoe_runner_optional_coverage(protocol, reactor, request, args) - process.exited = protocol.done - return protocol.done - - def _tahoe_runner_optional_coverage(proto, reactor, request, other_args): """ Internal helper. Calls spawnProcess with `-m @@ -269,7 +231,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam if exists(node_dir): created_d = succeed(None) else: - print("creating: {}".format(node_dir)) + print("creating", node_dir) mkdir(node_dir) done_proto = _ProcessExitedProtocol() args = [ @@ -294,7 +256,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam def created(_): config_path = join(node_dir, 'tahoe.cfg') config = get_config(config_path) - set_config(config, 'node', 'log_gatherer.furl', flog_gatherer.furl) + set_config(config, 'node', 'log_gatherer.furl', flog_gatherer) write_config(config_path, config) created_d.addCallback(created) @@ -481,7 +443,7 @@ def web_post(tahoe, uri_fragment, **kwargs): return resp.content -def await_client_ready(tahoe, timeout=10, liveness=60*2, servers=1): +def await_client_ready(tahoe, timeout=10, liveness=60*2): """ Uses the status API to wait for a client-type node (in `tahoe`, a `TahoeProcess` instance usually from a fixture e.g. `alice`) to be @@ -505,8 +467,8 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, servers=1): time.sleep(1) continue - if len(js['servers']) < servers: - print("waiting because fewer than {} server(s)".format(servers)) + if len(js['servers']) == 0: + print("waiting because no servers at all") time.sleep(1) continue server_times = [