diff --git a/docs/specifications/servers-of-happiness.rst b/docs/specifications/servers-of-happiness.rst index 6c58cf395..a9d7041d4 100644 --- a/docs/specifications/servers-of-happiness.rst +++ b/docs/specifications/servers-of-happiness.rst @@ -112,6 +112,10 @@ We calculate share placement like so: 1. Query all servers for existing shares. +1a. Query remaining space from all servers. Every server that has + enough free space is considered "readwrite" and every server with too + little space is "readonly". + 2. Construct a bipartite graph G1 of *readonly* servers to pre-existing shares, where an edge exists between an arbitrary readonly server S and an arbitrary share T if and only if S currently holds T. @@ -132,12 +136,11 @@ We calculate share placement like so: 5. Calculate a maximum matching graph of G2, call this M2, again preferring earlier servers. -6. Construct a bipartite graph G3 of (only readwrite) servers to shares. Let - an edge exist between server S and share T if and only if S already has T, - or *could* hold T (i.e. S has enough available space to hold a share of at - least T's size). Then remove (from G3) any servers and shares used in M1 - or M2 (note that we retain servers/shares that were in G1/G2 but *not* in - the M1/M2 subsets) +6. Construct a bipartite graph G3 of (only readwrite) servers to + shares (some shares may already exist on a server). Then remove + (from G3) any servers and shares used in M1 or M2 (note that we + retain servers/shares that were in G1/G2 but *not* in the M1/M2 + subsets) 7. Calculate a maximum matching graph of G3, call this M3, preferring earlier servers. The final placement table is the union of M1+M2+M3. diff --git a/src/allmydata/immutable/happiness_upload.py b/src/allmydata/immutable/happiness_upload.py index d48d57276..ecf00fdf4 100644 --- a/src/allmydata/immutable/happiness_upload.py +++ b/src/allmydata/immutable/happiness_upload.py @@ -1,7 +1,322 @@ from Queue import PriorityQueue from allmydata.util.happinessutil import augmenting_path_for, residual_network -class Happiness_Upload: +def _query_all_shares(servermap, readonly_peers): + readonly_shares = set() + readonly_map = {} + for peer in servermap: + print("peer", peer) + if peer in readonly_peers: + readonly_map.setdefault(peer, servermap[peer]) + for share in servermap[peer]: + readonly_shares.add(share) + return readonly_shares + + +def _convert_mappings(index_to_peer, index_to_share, maximum_graph): + """ + Now that a maximum spanning graph has been found, convert the indexes + back to their original ids so that the client can pass them to the + uploader. + """ + + converted_mappings = {} + for share in maximum_graph: + peer = maximum_graph[share] + if peer == None: + converted_mappings.setdefault(index_to_share[share], None) + else: + converted_mappings.setdefault(index_to_share[share], set([index_to_peer[peer]])) + return converted_mappings + +def _compute_maximum_graph(graph, shareIndices): + """ + This is an implementation of the Ford-Fulkerson method for finding + a maximum flow in a flow network applied to a bipartite graph. + Specifically, it is the Edmonds-Karp algorithm, since it uses a + breadth-first search to find the shortest augmenting path at each + iteration, if one exists. + + The implementation here is an adapation of an algorithm described in + "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. + """ + + if graph == []: + return {} + + dim = len(graph) + flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] + residual_graph, residual_function = residual_network(graph, flow_function) + + while augmenting_path_for(residual_graph): + path = augmenting_path_for(residual_graph) + # Delta is the largest amount that we can increase flow across + # all of the edges in path. Because of the way that the residual + # function is constructed, f[u][v] for a particular edge (u, v) + # is the amount of unused capacity on that edge. Taking the + # minimum of a list of those values for each edge in the + # augmenting path gives us our delta. + delta = min(map(lambda (u, v), rf=residual_function: rf[u][v], + path)) + for (u, v) in path: + flow_function[u][v] += delta + flow_function[v][u] -= delta + residual_graph, residual_function = residual_network(graph,flow_function) + + new_mappings = {} + for shareIndex in shareIndices: + peer = residual_graph[shareIndex] + if peer == [dim - 1]: + new_mappings.setdefault(shareIndex, None) + else: + new_mappings.setdefault(shareIndex, peer[0]) + + return new_mappings + +def _flow_network(peerIndices, shareIndices): + """ + Given set of peerIndices and a set of shareIndices, I create a flow network + to be used by _compute_maximum_graph. The return value is a two + dimensional list in the form of a flow network, where each index represents + a node, and the corresponding list represents all of the nodes it is connected + to. + + This function is similar to allmydata.util.happinessutil.flow_network_for, but + we connect every peer with all shares instead of reflecting a supplied servermap. + """ + graph = [] + # The first entry in our flow network is the source. + # Connect the source to every server. + graph.append(peerIndices) + sink_num = len(peerIndices + shareIndices) + 1 + # Connect every server with every share it can possibly store. + for peerIndex in peerIndices: + graph.insert(peerIndex, shareIndices) + # Connect every share with the sink. + for shareIndex in shareIndices: + graph.insert(shareIndex, [sink_num]) + # Add an empty entry for the sink. + graph.append([]) + return graph + +def _servermap_flow_graph(peers, shares, servermap): + """ + Generates a flow network of peerIndices to shareIndices from a server map + of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a + directed graph where each edge has a capacity and each edge receives a flow. + The amount of flow on an edge cannot exceed the capacity of the edge." This + is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm + converts the problem into a maximum flow problem. + """ + if servermap == {}: + return [] + + peer_to_index, index_to_peer = _reindex(peers, 1) + share_to_index, index_to_share = _reindex(shares, len(peers) + 1) + graph = [] + sink_num = len(peers) + len(shares) + 1 + graph.append([peer_to_index[peer] for peer in peers]) + for peer in peers: + indexedShares = [share_to_index[s] for s in servermap[peer]] + graph.insert(peer_to_index[peer], indexedShares) + for share in shares: + graph.insert(share_to_index[share], [sink_num]) + graph.append([]) + return graph + +def _reindex(items, base): + """ + I take an iteratble of items and give each item an index to be used in + the construction of a flow network. Indices for these items start at base + and continue to base + len(items) - 1. + + I return two dictionaries: ({item: index}, {index: item}) + """ + item_to_index = {} + index_to_item = {} + for item in items: + item_to_index.setdefault(item, base) + index_to_item.setdefault(base, item) + base += 1 + return (item_to_index, index_to_item) + +def _maximum_matching_graph(graph, servermap): + """ + :param graph: an iterable of (server, share) 2-tuples + + Calculate the maximum matching of the bipartite graph (U, V, E) + such that: + + U = peers + V = shares + E = peers x shares + + Returns a dictionary {share -> set(peer)}, indicating that the share + should be placed on each peer in the set. If a share's corresponding + value is None, the share can be placed on any server. Note that the set + of peers should only be one peer when returned. + """ + peers = [x[0] for x in graph] + shares = [x[1] for x in graph] + + peer_to_index, index_to_peer = _reindex(peers, 1) + share_to_index, index_to_share = _reindex(shares, len(peers) + 1) + shareIndices = [share_to_index[s] for s in shares] + if servermap: + graph = _servermap_flow_graph(peers, shares, servermap) + else: + peerIndices = [peer_to_index[peer] for peer in peers] + graph = _flow_network(peerIndices, shareIndices) + max_graph = _compute_maximum_graph(graph, shareIndices) + return _convert_mappings(index_to_peer, index_to_share, max_graph) + + +def _filter_g3(g3, m1, m2): + """ + This implements the last part of 'step 6' in the spec, "Then + remove (from G3) any servers and shares used in M1 or M2 (note + that we retain servers/shares that were in G1/G2 but *not* in the + M1/M2 subsets)" + """ + # m1, m2 are dicts from share -> set(peers) + # (but I think the set size is always 1 .. so maybe we could fix that everywhere) + m12_servers = reduce(lambda a, b: a.union(b), m1.values() + m2.values()) + m12_shares = set(m1.keys() + m2.keys()) + new_g3 = set() + for edge in g3: + if edge[0] not in m12_servers and edge[1] not in m12_shares: + new_g3.add(edge) + return new_g3 + + +def _merge_dicts(result, inc): + """ + given two dicts mapping key -> set(), merge the *values* of the + 'inc' dict into the value of the 'result' dict if the value is not + None. + + Note that this *mutates* 'result' + """ + for k, v in inc.items(): + existing = result.get(k, None) + if existing is None: + result[k] = v + elif v is not None: + result[k] = existing.union(v) + + +def share_placement(peers, readonly_peers, shares, peers_to_shares={}): + """ + :param servers: ordered list of servers, "Maybe *2N* of them." + + working from servers-of-happiness.rst, in kind-of pseudo-code + """ + # "1. Query all servers for existing shares." + #shares = _query_all_shares(servers, peers) + #print("shares", shares) + + # "2. Construct a bipartite graph G1 of *readonly* servers to pre-existing + # shares, where an edge exists between an arbitrary readonly server S and an + # arbitrary share T if and only if S currently holds T." + g1 = set() + for share in shares: + for server in peers: + if server in readonly_peers and share in peers_to_shares.get(server, set()): + g1.add((server, share)) + + # 3. Calculate a maximum matching graph of G1 (a set of S->T edges that has or + # is-tied-for the highest "happiness score"). There is a clever efficient + # algorithm for this, named "Ford-Fulkerson". There may be more than one + # maximum matching for this graph; we choose one of them arbitrarily, but + # prefer earlier servers. Call this particular placement M1. The placement + # maps shares to servers, where each share appears at most once, and each + # server appears at most once. + m1 = _maximum_matching_graph(g1, peers_to_shares)#peers, shares) + if False: + print("M1:") + for k, v in m1.items(): + print(" {}: {}".format(k, v)) + + # 4. Construct a bipartite graph G2 of readwrite servers to pre-existing + # shares. Then remove any edge (from G2) that uses a server or a share found + # in M1. Let an edge exist between server S and share T if and only if S + # already holds T. + g2 = set() + for g2_server, g2_shares in peers_to_shares.items(): + for share in g2_shares: + g2.add((g2_server, share)) + + for server, share in m1.items(): + for g2server, g2share in g2: + if g2server == server or g2share == share: + g2.remove((g2server, g2share)) + + # 5. Calculate a maximum matching graph of G2, call this M2, again preferring + # earlier servers. + + m2 = _maximum_matching_graph(g2, peers_to_shares) + + if False: + print("M2:") + for k, v in m2.items(): + print(" {}: {}".format(k, v)) + + # 6. Construct a bipartite graph G3 of (only readwrite) servers to + # shares (some shares may already exist on a server). Then remove + # (from G3) any servers and shares used in M1 or M2 (note that we + # retain servers/shares that were in G1/G2 but *not* in the M1/M2 + # subsets) + + # meejah: does that last sentence mean remove *any* edge with any + # server in M1?? or just "remove any edge found in M1/M2"? (Wait, + # is that last sentence backwards? G1 a subset of M1?) + readwrite = set(peers).difference(set(readonly_peers)) + g3 = [ + (server, share) for server in readwrite for share in shares + ] + + g3 = _filter_g3(g3, m1, m2) + if False: + print("G3:") + for srv, shr in g3: + print(" {}->{}".format(srv, shr)) + + # 7. Calculate a maximum matching graph of G3, call this M3, preferring earlier + # servers. The final placement table is the union of M1+M2+M3. + + m3 = _maximum_matching_graph(g3, {})#, peers_to_shares) + + answer = dict() + _merge_dicts(answer, m1) + _merge_dicts(answer, m2) + _merge_dicts(answer, m3) + + # anything left over that has "None" instead of a 1-set of peers + # should be part of the "evenly distribute amongst readwrite + # servers" thing. + + # See "Properties of Upload Strategy of Happiness" in the spec: + # "The size of the maximum bipartite matching is bounded by the size of the smaller + # set of vertices. Therefore in a situation where the set of servers is smaller + # than the set of shares, placement is not generated for a subset of shares. In + # this case the remaining shares are distributed as evenly as possible across the + # set of writable servers." + + def peer_generator(): + while True: + for peer in readwrite: + yield peer + round_robin_peers = peer_generator() + for k, v in answer.items(): + if v is None: + answer[k] = {next(round_robin_peers)} + + # XXX we should probably actually return share->peer instead of + # share->set(peer) where the set-size is 1 because sets are a pain + # to deal with (i.e. no indexing). + return answer + +class HappinessUpload: """ I handle the calculations involved with generating the maximum spanning graph for a file when given a set of peers, a set of shares, @@ -11,6 +326,7 @@ class Happiness_Upload: docs/specifications/servers-of-happiness.rst """ + # HappinessUpload(self.peers, self.full_peers, shares, self.existing_shares) def __init__(self, peers, readonly_peers, shares, servermap={}): self._happiness = 0 self.homeless_shares = set() diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index af422f173..040e4e12d 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -14,8 +14,7 @@ from allmydata.storage.server import si_b2a from allmydata.immutable import encode from allmydata.util import base32, dictutil, idlib, log, mathutil from allmydata.util.happinessutil import servers_of_happiness, \ - shares_by_server, merge_servers, \ - failure_message + merge_servers, failure_message from allmydata.util.assertutil import precondition, _assert from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ @@ -26,7 +25,7 @@ from allmydata.immutable import layout from pycryptopp.cipher.aes import AES from cStringIO import StringIO -from happiness_upload import Happiness_Upload +from happiness_upload import HappinessUpload # this wants to live in storage, not here @@ -161,14 +160,14 @@ class ServerTracker: sharenums, self.allocated_size, canary=Referenceable()) - d.addCallback(self._got_reply) + d.addCallback(self._buckets_allocated) return d def ask_about_existing_shares(self): rref = self._server.get_rref() return rref.callRemote("get_buckets", self.storage_index) - def _got_reply(self, (alreadygot, buckets)): + def _buckets_allocated(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) b = {} for sharenum, rref in buckets.iteritems(): @@ -253,7 +252,7 @@ class PeerSelector(): def get_tasks(self): shares = set(range(self.total_shares)) - self.h = Happiness_Upload(self.peers, self.full_peers, shares, self.existing_shares) + self.h = HappinessUpload(self.peers, self.full_peers, shares, self.existing_shares) return self.h.generate_mappings() def is_healthy(self): @@ -324,6 +323,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): share_size, 0, num_segments, num_share_hashes, EXTENSION_SIZE) allocated_size = wbp.get_allocated_size() + + # see docs/specifications/servers-of-happiness.rst + # 0. Start with an ordered list of servers. Maybe *2N* of them. + # + all_servers = storage_broker.get_servers_for_psi(storage_index) if not all_servers: raise NoServersError("client gave us zero servers") @@ -388,6 +392,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # servers_of_happiness accounting, then we forget about them. readonly_trackers = _make_trackers(readonly_servers) + # see docs/specifications/servers-of-happiness.rst + # 1. Query all servers for existing shares. + # + # We now ask servers that can't hold any new shares about existing # shares that they might have for our SI. Once this is done, we # start placing the shares that we haven't already accounted @@ -985,22 +993,28 @@ class CHKUploader: return defer.succeed(None) return self._encoder.abort() + @defer.inlineCallbacks def start_encrypted(self, encrypted): - """ Returns a Deferred that will fire with the UploadResults instance. """ + """ + Returns a Deferred that will fire with the UploadResults instance. + """ eu = IEncryptedUploadable(encrypted) started = time.time() - self._encoder = e = encode.Encoder( + # would be Really Nice to make Encoder just a local; only + # abort() really needs self._encoder ... + self._encoder = encode.Encoder( self._log_number, self._upload_status, progress=self._progress, ) - d = e.set_encrypted_uploadable(eu) - d.addCallback(self.locate_all_shareholders, started) - d.addCallback(self.set_shareholders, e) - d.addCallback(lambda res: e.start()) - d.addCallback(self._encrypted_done) - return d + # this just returns itself + yield self._encoder.set_encrypted_uploadable(eu) + (upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started) + yield self.set_shareholders(upload_trackers, already_serverids, self._encoder) + verifycap = yield self._encoder.start() + results = yield self._encrypted_done(verifycap) + defer.returnValue(results) def locate_all_shareholders(self, encoder, started): server_selection_started = now = time.time() @@ -1031,13 +1045,13 @@ class CHKUploader: d.addCallback(_done) return d - def set_shareholders(self, (upload_trackers, already_serverids), encoder): + def set_shareholders(self, upload_trackers, already_serverids, encoder): """ - @param upload_trackers: a sequence of ServerTracker objects that + :param upload_trackers: a sequence of ServerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the ServerTracker) - @paran already_serverids: a dict mapping sharenum to a set of + :param already_serverids: a dict mapping sharenum to a set of serverids for servers that claim to already have this share """ diff --git a/src/allmydata/test/test_happiness.py b/src/allmydata/test/test_happiness.py new file mode 100644 index 000000000..9cd667134 --- /dev/null +++ b/src/allmydata/test/test_happiness.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- + +from twisted.trial import unittest +from allmydata.immutable import happiness_upload +from allmydata.util.happinessutil import augmenting_path_for, residual_network + + +class HappinessUtils(unittest.TestCase): + """ + test-cases for utility functions augmenting_path_for and residual_network + """ + + def test_residual_0(self): + graph = happiness_upload._servermap_flow_graph( + ['peer0'], + ['share0'], + servermap={ + 'peer0': ['share0'], + } + ) + flow = [[0 for _ in graph] for _ in graph] + + residual, capacity = residual_network(graph, flow) + + # XXX no idea if these are right; hand-verify + self.assertEqual(residual, [[1], [2], [3], []]) + self.assertEqual(capacity, [[0, 1, 0, 0], [-1, 0, 1, 0], [0, -1, 0, 1], [0, 0, -1, 0]]) + + +class Happiness(unittest.TestCase): + + def test_original_easy(self): + shares = {'share0', 'share1', 'share2'} + peers = {'peer0', 'peer1'} + readonly_peers = set() + servermap = { + 'peer0': {'share0'}, + 'peer1': {'share2'}, + } + places0 = happiness_upload.HappinessUpload(peers, readonly_peers, shares, servermap).generate_mappings() + + self.assertTrue('peer0' in places0['share0']) + self.assertTrue('peer1' in places0['share2']) + + def test_placement_simple(self): + + shares = {'share0', 'share1', 'share2'} + peers = { + 'peer0', + 'peer1', + } + readonly_peers = {'peer0'} + peers_to_shares = { + 'peer0': {'share2'}, + 'peer1': [], + } + + places0 = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) + places1 = happiness_upload.HappinessUpload(peers, readonly_peers, shares).generate_mappings() + + if False: + print("places0") + for k, v in places0.items(): + print(" {} -> {}".format(k, v)) + print("places1") + for k, v in places1.items(): + print(" {} -> {}".format(k, v)) + + self.assertEqual( + places0, + { + 'share0': {'peer1'}, + 'share1': {'peer1'}, + 'share2': {'peer0'}, + } + ) + + + def test_placement_1(self): + + shares = { + 'share0', 'share1', 'share2', + 'share3', 'share4', 'share5', + 'share7', 'share8', 'share9', + } + peers = { + 'peer0', 'peer1', 'peer2', 'peer3', + 'peer4', 'peer5', 'peer6', 'peer7', + 'peer8', 'peer9', 'peerA', 'peerB', + } + readonly_peers = {'peer0', 'peer1', 'peer2', 'peer3'} + peers_to_shares = { + 'peer0': {'share0'}, + 'peer1': {'share1'}, + 'peer2': {'share2'}, + 'peer3': {'share3'}, + 'peer4': {'share4'}, + 'peer5': {'share5'}, + 'peer6': {'share6'}, + 'peer7': {'share7'}, + 'peer8': {'share8'}, + 'peer9': {'share9'}, + 'peerA': set(), + 'peerB': set(), + } + + places0 = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) + places1 = happiness_upload.HappinessUpload(peers, readonly_peers, shares).generate_mappings() + + # share N maps to peer N + # i.e. this says that share0 should be on peer0, share1 should + # be on peer1, etc. + expected = { + 'share{}'.format(i): {'peer{}'.format(i)} + for i in range(10) + } + self.assertEqual(expected, places0) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 28a3e4fe4..09873eebf 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -11,13 +11,13 @@ import allmydata # for __full_version__ from allmydata import uri, monitor, client from allmydata.immutable import upload, encode from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError -from allmydata.util import log, base32, fileutil +from allmydata.util import log, base32 from allmydata.util.assertutil import precondition from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata.test.no_network import GridTestMixin from allmydata.test.common_util import ShouldFailMixin from allmydata.util.happinessutil import servers_of_happiness, \ - shares_by_server, merge_servers + shares_by_server, merge_servers from allmydata.storage_client import StorageFarmBroker from allmydata.storage.server import storage_index_to_dir from allmydata.client import Client diff --git a/src/allmydata/test/web/test_grid.py b/src/allmydata/test/web/test_grid.py index 978848bf6..208c03881 100644 --- a/src/allmydata/test/web/test_grid.py +++ b/src/allmydata/test/web/test_grid.py @@ -1094,7 +1094,7 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi " overdue= unused= need 3. Last failure: None") msg2 = msgbase + (" ran out of shares:" " complete=" - " pending=Share(sh0-on-xgru5)" + " pending=Share(sh0-on-ysbz4st7)" " overdue= unused= need 3. Last failure: None") self.failUnless(body == msg1 or body == msg2, body) d.addCallback(_check_one_share)