Fixes to test infrastructure.

Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
This commit is contained in:
David-Sarah Hopwood 2012-12-12 07:05:28 +00:00 committed by Daira Hopwood
parent 03e02eeece
commit c896cfc2c1
3 changed files with 175 additions and 74 deletions

View File

@ -35,11 +35,32 @@ def flush_but_dont_ignore(res):
d.addCallback(_done) d.addCallback(_done)
return d return d
class DummyProducer: class DummyProducer:
implements(IPullProducer) implements(IPullProducer)
def resumeProducing(self): def resumeProducing(self):
pass pass
class Marker:
pass
class FakeCanary:
def __init__(self, ignore_disconnectors=False):
self.ignore = ignore_disconnectors
self.disconnectors = {}
def notifyOnDisconnect(self, f, *args, **kwargs):
if self.ignore:
return
m = Marker()
self.disconnectors[m] = (f, args, kwargs)
return m
def dontNotifyOnDisconnect(self, marker):
if self.ignore:
return
del self.disconnectors[marker]
class FakeCHKFileNode: class FakeCHKFileNode:
"""I provide IImmutableFileNode, but all of my data is stored in a """I provide IImmutableFileNode, but all of my data is stored in a
class-level dictionary.""" class-level dictionary."""
@ -485,6 +506,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
d.addBoth(flush_but_dont_ignore) d.addBoth(flush_but_dont_ignore)
return d return d
def workdir(self, name):
return os.path.join("system", self.__class__.__name__, name)
def getdir(self, subdir): def getdir(self, subdir):
return os.path.join(self.basedir, subdir) return os.path.join(self.basedir, subdir)
@ -603,11 +627,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
else: else:
config += nodeconfig config += nodeconfig
fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config) # give subclasses a chance to append lines to the nodes' tahoe.cfg files.
config += self._get_extra_config(i)
# give subclasses a chance to append lines to the node's tahoe.cfg fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
# files before they are launched.
self._set_up_nodes_extra_config()
# start clients[0], wait for it's tub to be ready (at which point it # start clients[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl). # will have registered the helper furl).
@ -645,9 +668,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
d.addCallback(_connected) d.addCallback(_connected)
return d return d
def _set_up_nodes_extra_config(self): def _get_extra_config(self, i):
# for overriding by subclasses # for overriding by subclasses
pass return ""
def _grab_stats(self, res): def _grab_stats(self, res):
d = self.stats_gatherer.poll() d = self.stats_gatherer.poll()

View File

@ -51,8 +51,9 @@ class WebRenderingMixin:
ctx = self.make_context(req) ctx = self.make_context(req)
return page.renderSynchronously(ctx) return page.renderSynchronously(ctx)
def failUnlessIn(self, substring, s): def render_json(self, page):
self.failUnless(substring in s, s) d = self.render1(page, args={"t": ["json"]})
return d
def remove_tags(self, s): def remove_tags(self, s):
s = re.sub(r'<[^>]*>', ' ', s) s = re.sub(r'<[^>]*>', ' ', s)

View File

@ -13,23 +13,28 @@
# Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator, # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
# or the control.furl . # or the control.furl .
import os.path import os.path, shutil
from zope.interface import implements from zope.interface import implements
from twisted.application import service from twisted.application import service
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from foolscap.api import Referenceable, fireEventually, RemoteException from foolscap.api import Referenceable, fireEventually, RemoteException
from base64 import b32encode from base64 import b32encode
from allmydata import uri as tahoe_uri from allmydata import uri as tahoe_uri
from allmydata.client import Client from allmydata.client import Client
from allmydata.storage.server import StorageServer, storage_index_to_dir from allmydata.storage.server import StorageServer
from allmydata.util import fileutil, idlib, hashutil from allmydata.storage.backends.disk.disk_backend import DiskBackend
from allmydata.util import fileutil, idlib, hashutil, log
from allmydata.util.hashutil import sha1 from allmydata.util.hashutil import sha1
from allmydata.test.common_web import HTTPClientGETFactory from allmydata.test.common_web import HTTPClientGETFactory
from allmydata.interfaces import IStorageBroker, IServer from allmydata.interfaces import IStorageBroker, IServer
from allmydata.test.common import TEST_RSA_KEY_SIZE from allmydata.test.common import TEST_RSA_KEY_SIZE
PRINT_TRACEBACKS = False
class IntentionalError(Exception): class IntentionalError(Exception):
pass pass
@ -87,23 +92,34 @@ class LocalWrapper:
return d2 return d2
return _really_call() return _really_call()
if PRINT_TRACEBACKS:
import traceback
tb = traceback.extract_stack()
d = fireEventually() d = fireEventually()
d.addCallback(lambda res: _call()) d.addCallback(lambda res: _call())
def _wrap_exception(f): def _wrap_exception(f):
if PRINT_TRACEBACKS and not f.check(NameError):
print ">>>" + ">>>".join(traceback.format_list(tb))
print "+++ %s%r %r: %s" % (methname, args, kwargs, f)
#f.printDetailedTraceback()
return Failure(RemoteException(f)) return Failure(RemoteException(f))
d.addErrback(_wrap_exception) d.addErrback(_wrap_exception)
def _return_membrane(res): def _return_membrane(res):
# rather than complete the difficult task of building a # Rather than complete the difficult task of building a
# fully-general Membrane (which would locate all Referenceable # fully-general Membrane (which would locate all Referenceable
# objects that cross the simulated wire and replace them with # objects that cross the simulated wire and replace them with
# wrappers), we special-case certain methods that we happen to # wrappers), we special-case certain methods that we happen to
# know will return Referenceables. # know will return Referenceables.
# The outer return value of such a method may be Deferred, but
# its components must not be.
if methname == "allocate_buckets": if methname == "allocate_buckets":
(alreadygot, allocated) = res (alreadygot, allocated) = res
for shnum in allocated: for shnum in allocated:
assert not isinstance(allocated[shnum], defer.Deferred), (methname, allocated)
allocated[shnum] = LocalWrapper(allocated[shnum]) allocated[shnum] = LocalWrapper(allocated[shnum])
if methname == "get_buckets": if methname == "get_buckets":
for shnum in res: for shnum in res:
assert not isinstance(res[shnum], defer.Deferred), (methname, res)
res[shnum] = LocalWrapper(res[shnum]) res[shnum] = LocalWrapper(res[shnum])
return res return res
d.addCallback(_return_membrane) d.addCallback(_return_membrane)
@ -168,11 +184,20 @@ class NoNetworkStorageBroker:
seed = server.get_permutation_seed() seed = server.get_permutation_seed()
return sha1(peer_selection_index + seed).digest() return sha1(peer_selection_index + seed).digest()
return sorted(self.get_connected_servers(), key=_permuted) return sorted(self.get_connected_servers(), key=_permuted)
def get_connected_servers(self): def get_connected_servers(self):
return self.client._servers return self.client._servers
def get_nickname_for_serverid(self, serverid): def get_nickname_for_serverid(self, serverid):
return None return None
def get_known_servers(self):
return self.get_connected_servers()
def get_all_serverids(self):
return self.client.get_all_serverids()
class NoNetworkClient(Client): class NoNetworkClient(Client):
def create_tub(self): def create_tub(self):
pass pass
@ -234,8 +259,8 @@ class NoNetworkGrid(service.MultiService):
self.clients = [] self.clients = []
for i in range(num_servers): for i in range(num_servers):
ss = self.make_server(i) server = self.make_server(i)
self.add_server(i, ss) self.add_server(i, server)
self.rebuild_serverlist() self.rebuild_serverlist()
for i in range(num_clients): for i in range(num_clients):
@ -266,23 +291,24 @@ class NoNetworkGrid(service.MultiService):
def make_server(self, i, readonly=False): def make_server(self, i, readonly=False):
serverid = hashutil.tagged_hash("serverid", str(i))[:20] serverid = hashutil.tagged_hash("serverid", str(i))[:20]
serverdir = os.path.join(self.basedir, "servers", storagedir = os.path.join(self.basedir, "servers",
idlib.shortnodeid_b2a(serverid), "storage") idlib.shortnodeid_b2a(serverid), "storage")
fileutil.make_dirs(serverdir)
ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats(),
readonly_storage=readonly)
ss._no_network_server_number = i
return ss
def add_server(self, i, ss): # The backend will make the storage directory and any necessary parents.
backend = DiskBackend(storagedir, readonly=readonly)
server = StorageServer(serverid, backend, storagedir, stats_provider=SimpleStats())
server._no_network_server_number = i
return server
def add_server(self, i, server):
# to deal with the fact that all StorageServers are named 'storage', # to deal with the fact that all StorageServers are named 'storage',
# we interpose a middleman # we interpose a middleman
middleman = service.MultiService() middleman = service.MultiService()
middleman.setServiceParent(self) middleman.setServiceParent(self)
ss.setServiceParent(middleman) server.setServiceParent(middleman)
serverid = ss.my_nodeid serverid = server.get_serverid()
self.servers_by_number[i] = ss self.servers_by_number[i] = server
aa = ss.get_accountant().get_anonymous_account() aa = server.get_accountant().get_anonymous_account()
wrapper = wrap_storage_server(aa) wrapper = wrap_storage_server(aa)
self.wrappers_by_id[serverid] = wrapper self.wrappers_by_id[serverid] = wrapper
self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper) self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
@ -299,14 +325,14 @@ class NoNetworkGrid(service.MultiService):
def remove_server(self, serverid): def remove_server(self, serverid):
# it's enough to remove the server from c._servers (we don't actually # it's enough to remove the server from c._servers (we don't actually
# have to detach and stopService it) # have to detach and stopService it)
for i,ss in self.servers_by_number.items(): for i, server in self.servers_by_number.items():
if ss.my_nodeid == serverid: if server.get_serverid() == serverid:
del self.servers_by_number[i] del self.servers_by_number[i]
break break
del self.wrappers_by_id[serverid] del self.wrappers_by_id[serverid]
del self.proxies_by_id[serverid] del self.proxies_by_id[serverid]
self.rebuild_serverlist() self.rebuild_serverlist()
return ss return server
def break_server(self, serverid, count=True): def break_server(self, serverid, count=True):
# mark the given server as broken, so it will throw exceptions when # mark the given server as broken, so it will throw exceptions when
@ -316,16 +342,16 @@ class NoNetworkGrid(service.MultiService):
def hang_server(self, serverid): def hang_server(self, serverid):
# hang the given server # hang the given server
ss = self.wrappers_by_id[serverid] server = self.wrappers_by_id[serverid]
assert ss.hung_until is None assert server.hung_until is None
ss.hung_until = defer.Deferred() server.hung_until = defer.Deferred()
def unhang_server(self, serverid): def unhang_server(self, serverid):
# unhang the given server # unhang the given server
ss = self.wrappers_by_id[serverid] server = self.wrappers_by_id[serverid]
assert ss.hung_until is not None assert server.hung_until is not None
ss.hung_until.callback(None) server.hung_until.callback(None)
ss.hung_until = None server.hung_until = None
def nuke_from_orbit(self): def nuke_from_orbit(self):
"""Empty all share directories in this grid. It's the only way to be sure ;-) """Empty all share directories in this grid. It's the only way to be sure ;-)
@ -361,66 +387,117 @@ class GridTestMixin:
def get_clientdir(self, i=0): def get_clientdir(self, i=0):
return self.g.clients[i].basedir return self.g.clients[i].basedir
def get_server(self, i):
return self.g.servers_by_number[i]
def get_serverdir(self, i): def get_serverdir(self, i):
return self.g.servers_by_number[i].storedir return self.g.servers_by_number[i].backend._storedir
def remove_server(self, i):
self.g.remove_server(self.g.servers_by_number[i].get_serverid())
def iterate_servers(self): def iterate_servers(self):
for i in sorted(self.g.servers_by_number.keys()): for i in sorted(self.g.servers_by_number.keys()):
ss = self.g.servers_by_number[i] server = self.g.servers_by_number[i]
yield (i, ss, ss.storedir) yield (i, server, server.backend._storedir)
def find_uri_shares(self, uri): def find_uri_shares(self, uri):
si = tahoe_uri.from_string(uri).get_storage_index() si = tahoe_uri.from_string(uri).get_storage_index()
prefixdir = storage_index_to_dir(si) sharelist = []
shares = [] d = defer.succeed(None)
for i,ss in self.g.servers_by_number.items(): for i, server in self.g.servers_by_number.items():
serverid = ss.my_nodeid d.addCallback(lambda ign, server=server: server.backend.get_shareset(si).get_shares())
basedir = os.path.join(ss.sharedir, prefixdir) def _append_shares( (shares_for_server, corrupted), server=server):
if not os.path.exists(basedir): assert len(corrupted) == 0, (shares_for_server, corrupted)
continue for share in shares_for_server:
for f in os.listdir(basedir): assert not isinstance(share, defer.Deferred), share
try: sharelist.append( (share.get_shnum(), server.get_serverid(), share._get_path()) )
shnum = int(f) d.addCallback(_append_shares)
shares.append((shnum, serverid, os.path.join(basedir, f)))
except ValueError: d.addCallback(lambda ign: sorted(sharelist))
pass return d
return sorted(shares)
def add_server(self, server_number, readonly=False):
assert self.g, "I tried to find a grid at self.g, but failed"
ss = self.g.make_server(server_number, readonly)
log.msg("just created a server, number: %s => %s" % (server_number, ss,))
self.g.add_server(server_number, ss)
def add_server_with_share(self, uri, server_number, share_number=None,
readonly=False):
self.add_server(server_number, readonly)
if share_number is not None:
self.copy_share_to_server(uri, server_number, share_number)
def copy_share_to_server(self, uri, server_number, share_number):
ss = self.g.servers_by_number[server_number]
self.copy_share(self.shares[share_number], uri, ss)
def copy_shares(self, uri): def copy_shares(self, uri):
shares = {} shares = {}
for (shnum, serverid, sharefile) in self.find_uri_shares(uri): d = self.find_uri_shares(uri)
shares[sharefile] = open(sharefile, "rb").read() def _got_shares(sharelist):
return shares for (shnum, serverid, sharefile) in sharelist:
shares[sharefile] = fileutil.read(sharefile)
return shares
d.addCallback(_got_shares)
return d
def copy_share(self, from_share, uri, to_server):
si = tahoe_uri.from_string(uri).get_storage_index()
(i_shnum, i_serverid, i_sharefile) = from_share
shares_dir = to_server.backend.get_shareset(si)._get_sharedir()
new_sharefile = os.path.join(shares_dir, str(i_shnum))
fileutil.make_dirs(shares_dir)
if os.path.normpath(i_sharefile) != os.path.normpath(new_sharefile):
shutil.copy(i_sharefile, new_sharefile)
def restore_all_shares(self, shares): def restore_all_shares(self, shares):
for sharefile, data in shares.items(): for sharefile, data in shares.items():
open(sharefile, "wb").write(data) fileutil.write(sharefile, data)
def delete_share(self, (shnum, serverid, sharefile)): def delete_share(self, (shnum, serverid, sharefile)):
os.unlink(sharefile) fileutil.remove(sharefile)
def delete_shares_numbered(self, uri, shnums): def delete_shares_numbered(self, uri, shnums):
for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri): d = self.find_uri_shares(uri)
if i_shnum in shnums: def _got_shares(sharelist):
os.unlink(i_sharefile) for (i_shnum, i_serverid, i_sharefile) in sharelist:
if i_shnum in shnums:
fileutil.remove(i_sharefile)
d.addCallback(_got_shares)
return d
def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function): def delete_all_shares(self, uri):
sharedata = open(sharefile, "rb").read() d = self.find_uri_shares(uri)
corruptdata = corruptor_function(sharedata) def _got_shares(shares):
open(sharefile, "wb").write(corruptdata) for sh in shares:
self.delete_share(sh)
d.addCallback(_got_shares)
return d
def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function, debug=False):
sharedata = fileutil.read(sharefile)
corruptdata = corruptor_function(sharedata, debug=debug)
fileutil.write(sharefile, corruptdata)
def corrupt_shares_numbered(self, uri, shnums, corruptor, debug=False): def corrupt_shares_numbered(self, uri, shnums, corruptor, debug=False):
for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri): d = self.find_uri_shares(uri)
if i_shnum in shnums: def _got_shares(sharelist):
sharedata = open(i_sharefile, "rb").read() for (i_shnum, i_serverid, i_sharefile) in sharelist:
corruptdata = corruptor(sharedata, debug=debug) if i_shnum in shnums:
open(i_sharefile, "wb").write(corruptdata) self.corrupt_share((i_shnum, i_serverid, i_sharefile), corruptor, debug=debug)
d.addCallback(_got_shares)
return d
def corrupt_all_shares(self, uri, corruptor, debug=False): def corrupt_all_shares(self, uri, corruptor, debug=False):
for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri): d = self.find_uri_shares(uri)
sharedata = open(i_sharefile, "rb").read() def _got_shares(sharelist):
corruptdata = corruptor(sharedata, debug=debug) for (i_shnum, i_serverid, i_sharefile) in sharelist:
open(i_sharefile, "wb").write(corruptdata) self.corrupt_share((i_shnum, i_serverid, i_sharefile), corruptor, debug=debug)
d.addCallback(_got_shares)
return d
def GET(self, urlpath, followRedirect=False, return_response=False, def GET(self, urlpath, followRedirect=False, return_response=False,
method="GET", clientnum=0, **kwargs): method="GET", clientnum=0, **kwargs):