diff --git a/src/allmydata/immutable/download2.py b/src/allmydata/immutable/download2.py
new file mode 100644
index 0000000..440459c
--- /dev/null
+++ b/src/allmydata/immutable/download2.py
@@ -0,0 +1,1273 @@
+
+import binascii
+from allmydata.util.hashtree import IncompleteHashTree, BadHashError, \
+     NotEnoughHashesError
+
+(UNUSED, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \
+ ("UNUSED", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM")
+
+class BadSegmentNumberError(Exception):
+    pass
+
+class Share:
+    # this is a specific implementation of IShare for tahoe's native storage
+    # servers. A different backend would use a different class.
+    """I represent a single instance of a single share (e.g. I reference the
+    shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
+    I am associated with a CommonShare that remembers data that is held in
+    common among e.g. SI=abcde/shnum2 across all servers. I am also
+    associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
+    servers).
+    """
+
+    def __init__(self, rref, verifycap, commonshare, node, peerid, shnum):
+        self._rref = rref
+        self._guess_offsets(verifycap, node.guessed_segment_size)
+        self.actual_offsets = None
+        self.actual_segment_size = None
+        self._UEB_length = None
+        self._commonshare = commonshare # holds block_hash_tree
+        self._node = node # holds share_hash_tree and UEB
+        self._peerid = peerid
+        self._shnum = shnum
+
+        self._wanted = Spans() # desired metadata
+        self._wanted_blocks = Spans() # desired block data
+        self._requested = Spans() # we've sent a request for this
+        self._received = Spans() # we've received a response for this
+        self._received_data = DataSpans() # the response contents, still unused
+        self._requested_blocks = [] # (segnum, set(observer2..))
+        ver = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+        self._overrun_ok = ver["tolerates-immutable-read-overrun"]
+        # If _overrun_ok and we guess the offsets correctly, we can get
+        # everything in one RTT. If _overrun_ok and we guess wrong, we might
+        # need two RTT (but we could get lucky and do it in one). If overrun
+        # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
+        # 2=offset table, 3=UEB_length and everything else (hashes, block),
+        # 4=UEB.
+
+    def _guess_offsets(self, verifycap, guessed_segment_size):
+        self.guessed_segment_size = guessed_segment_size
+        size = verifycap.size
+        k = verifycap.needed_shares
+        N = verifycap.total_shares
+        offsets = {}
+        for i,field in enumerate('data',
+                                 'plaintext_hash_tree', # UNUSED
+                                 'crypttext_hash_tree',
+                                 'block_hashes',
+                                 'share_hashes',
+                                 'uri_extension',
+                                 ):
+            offsets[field] = i # bad guesses are easy :) # XXX stub
+        self.guessed_offsets = offsets
+        self._fieldsize = 4
+        self._fieldstruct = ">L"
+
+    # called by our client, the SegmentFetcher
+    def get_block(self, segnum):
+        """Add a block number to the list of requests. This will eventually
+        result in a fetch of the data necessary to validate the block, then
+        the block itself. The fetch order is generally
+        first-come-first-served, but requests may be answered out-of-order if
+        data becomes available sooner.
+
+        I return an Observer2, which has two uses. The first is to call
+        o.subscribe(), which gives me a place to send state changes and
+        eventually the data block. The second is o.cancel(), which removes
+        the request (if it is still active).
+        """
+        o = Observer2()
+        o.set_canceler(self._cancel_block_request)
+        for i,(segnum0,observers) in enumerate(self._requested_blocks):
+            if segnum0 == segnum:
+                observers.add(o)
+                break
+        else:
+            self._requested_blocks.append(segnum, set([o]))
+        eventually(self.loop)
+        return o
+
+    def _cancel_block_request(self, o):
+        new_requests = []
+        for e in self._requested_blocks:
+            (segnum0, observers) = e
+            observers.discard(o)
+            if observers:
+                new_requests.append(e)
+        self._requested_blocks = new_requests
+
+    # internal methods
+    def _active_segnum(self):
+        if self._requested_blocks:
+            return self._requested_blocks[0]
+        return None
+
+    def _active_segnum_and_observers(self):
+        if self._requested_blocks:
+            # we only retrieve information for one segment at a time, to
+            # minimize alacrity (first come, first served)
+            return self._requested_blocks[0]
+        return None, []
+
+    def loop(self):
+        # TODO: if any exceptions occur here, kill the download
+
+        # we are (eventually) called after all state transitions:
+        #  new segments added to self._requested_blocks
+        #  new data received from servers (responses to our read() calls)
+        #  impatience timer fires (server appears slow)
+
+        # First, consume all of the information that we currently have, for
+        # all the segments people currently want.
+        while self._get_satisfaction():
+            pass
+
+        # When we get no satisfaction (from the data we've received so far),
+        # we determine what data we desire (to satisfy more requests). The
+        # number of segments is finite, so I can't get no satisfaction
+        # forever.
+        self._desire()
+
+        # finally send out requests for whatever we need (desire minus have).
+        # You can't always get what you want, but, sometimes, you get what
+        # you need.
+        self._request_needed() # express desire
+
+    def _get_satisfaction(self):
+        # return True if we retired a data block, and should therefore be
+        # called again. Return False if we don't retire a data block (even if
+        # we do retire some other data, like hash chains).
+
+        if self.actual_offsets is None:
+            if not self._satisfy_offsets():
+                # can't even look at anything without the offset table
+                return False
+
+        if self._node.UEB is None:
+            if not self._satisfy_UEB():
+                # can't check any hashes without the UEB
+                return False
+
+        segnum, observers = self._active_segnum_and_observers()
+        if segnum >= self._node.UEB.num_segments:
+            for o in observers:
+                o.notify(state=BADSEGNUM)
+            self._requested_blocks.pop(0)
+            return True
+
+        if self._node.share_hash_tree.needed_hashes(self.shnum):
+            if not self._satisfy_share_hash_tree():
+                # can't check block_hash_tree without a root
+                return False
+
+        if segnum is None:
+            return False # we don't want any particular segment right now
+
+        # block_hash_tree
+        needed_hashes = self._commonshare.block_hash_tree.needed_hashes(segnum)
+        if needed_hashes:
+            if not self._satisfy_block_hash_tree(needed_hashes):
+                # can't check block without block_hash_tree
+                return False
+
+        # data blocks
+        return self._satisfy_data_block(segnum, observers)
+
+    def _satisfy_offsets(self):
+        version_s = self._received_data.get(0, 4)
+        if version_s is None:
+            return False
+        (version,) = struct.unpack(">L", version_s)
+        if version == 1:
+            table_start = 0x0c
+            self._fieldsize = 0x4
+            self._fieldstruct = ">L"
+        else:
+            table_start = 0x14
+            self._fieldsize = 0x8
+            self._fieldstruct = ">Q"
+        offset_table_size = 6 * self._fieldsize
+        table_s = self._received_data.pop(table_start, offset_table_size)
+        if table_s is None:
+            return False
+        fields = struct.unpack(6*self._fieldstruct, table_s)
+        offsets = {}
+        for i,field in enumerate('data',
+                                 'plaintext_hash_tree', # UNUSED
+                                 'crypttext_hash_tree',
+                                 'block_hashes',
+                                 'share_hashes',
+                                 'uri_extension',
+                                 ):
+            offsets[field] = fields[i]
+        self.actual_offsets = offsets
+        self._received_data.remove(0, 4) # don't need this anymore
+        return True
+
+    def _satisfy_UEB(self):
+        o = self.actual_offsets
+        fsize = self._fieldsize
+        rdata = self._received_data
+        UEB_length_s = rdata.get(o["uri_extension"], fsize)
+        if not UEB_length_s:
+            return False
+        UEB_length = struct.unpack(UEB_length_s, self._fieldstruct)
+        UEB_s = rdata.pop(o["uri_extension"]+fsize, UEB_length)
+        if not UEB_s:
+            return False
+        rdata.remove(o["uri_extension"], fsize)
+        try:
+            self._node.validate_UEB(UEB_s) # stores in self._node.UEB # XXX
+            self.actual_segment_size = self._node.segment_size
+            assert self.actual_segment_size is not None
+            return True
+        except hashtree.BadHashError:
+            # TODO: if this UEB was bad, we'll keep trying to validate it
+            # over and over again. Only log.err on the first one, or better
+            # yet skip all but the first
+            f = Failure()
+            self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
+            return False
+
+    def _satisfy_share_hash_tree(self):
+        # the share hash chain is stored as (hashnum,hash) tuples, so you
+        # can't fetch just the pieces you need, because you don't know
+        # exactly where they are. So fetch everything, and parse the results
+        # later.
+        o = self.actual_offsets
+        rdata = self._received_data
+        hashlen = o["uri_extension"] - o["share_hashes"]
+        assert hashlen % (2+HASH_SIZE) == 0
+        hashdata = rdata.get(o["share_hashes"], hashlen)
+        if not hashdata:
+            return False
+        share_hashes = {}
+        for i in range(0, hashlen, 2+HASH_SIZE):
+            hashnum = struct.unpack(">H", hashdata[i:i+2])[0]
+            hashvalue = hashdata[i+2:i+2+HASH_SIZE]
+            share_hashes[hashnum] = hashvalue
+        try:
+            self._node.process_share_hashes(share_hashes)
+            # adds to self._node.share_hash_tree
+            rdata.remove(o["share_hashes"], hashlen)
+            return True
+        except IndexError, hashtree.BadHashError, hashtree.NotEnoughHashesError:
+            f = Failure()
+            self._signal_corruption(f, o["share_hashes"], hashlen)
+            return False
+
+    def _signal_corruption(self, f, start, offset):
+        # there was corruption somewhere in the given range
+        print f # XXX
+        pass
+
+    def _satisfy_block_hash_tree(self, needed_hashes):
+        o = self.actual_offsets
+        rdata = self._received_data
+        block_hashes = {}
+        for hashnum in needed_hashes:
+            hashdata = rdata.get(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
+            if hashdata:
+                block_hashes[hashnum] = hashdata
+            else:
+                return False # missing some hashes
+        # note that we don't submit any hashes to the block_hash_tree until
+        # we've gotten them all, because the hash tree will throw an
+        # exception if we only give it a partial set (which it therefore
+        # cannot validate)
+        ok = commonshare.process_block_hashes(block_hashes) # XXX
+        if not ok:
+            return False
+        for hashnum in needed_hashes:
+            rdata.remove(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
+        return True
+
+    def _satisfy_data_block(self, segnum, observers):
+        o = self.actual_offsets
+        segsize = self._node.UEB["segment_size"]
+        needed_shares = self._node.UEB["needed_shares"]
+        sharesize = mathutil.div_ceil(self._node.UEB["size"],
+                                      needed_shares)
+        blocksize = mathutil.div_ceil(segsize, needed_shares) # XXX
+        blockstart = o["data"] + segnum * blocksize
+        if blocknum < NUM_BLOCKS-1:
+            blocklen = blocksize
+        else:
+            blocklen = sharesize % blocksize
+            if blocklen == 0:
+                blocklen = blocksize
+        block = rdata.pop(blockstart, blocklen)
+        if not block:
+            return False
+        # this block is being retired, either as COMPLETE or CORRUPT, since
+        # no further data reads will help
+        assert self._requested_blocks[0][0] == segnum
+        ok = commonshare.check_block(segnum, block)
+        if ok:
+            state = COMPLETE
+        else:
+            state = CORRUPT
+        for o in observers:
+            # goes to SegmentFetcher._block_request_activity
+            o.notify(state=state, block=block)
+        self._requested_blocks.pop(0) # retired
+        return True # got satisfaction
+
+    def _desire(self):
+        segnum, observers = self._active_segnum_and_observers()
+        fsize = self._fieldsize
+        rdata = self._received_data
+        commonshare = self._commonshare
+
+        if not self.actual_offsets:
+            self._desire_offsets()
+
+        # we can use guessed offsets as long as this server tolerates overrun
+        if not self.actual_offsets and not self._overrun_ok:
+            return # must wait for the offsets to arrive
+
+        o = self.actual_offsets or self.guessed_offsets
+        segsize = self.actual_segment_size or self.guessed_segment_size
+        if self._node.UEB is None:
+            self._desire_UEB(o)
+
+        if self._node.share_hash_tree.needed_hashes(self.shnum):
+            hashlen = o["uri_extension"] - o["share_hashes"]
+            self._wanted.add(o["share_hashes"], hashlen)
+
+        if segnum is None:
+            return # only need block hashes or blocks for active segments
+
+        # block hash chain
+        for hashnum in commonshare.block_hash_tree.needed_hashes(segnum):
+            self._wanted.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
+
+        # data
+        blockstart, blocklen = COMPUTE(segnum, segsize, etc) # XXX
+        self._wanted_blocks.add(blockstart, blocklen)
+
+
+    def _desire_offsets(self):
+        if self._overrun_ok:
+            # easy! this includes version number, sizes, and offsets
+            self._wanted.add(0,1024)
+            return
+
+        # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
+        # To be conservative, only request the data that we know lives there,
+        # even if that means more roundtrips.
+
+        self._wanted.add(0,4)  # version number, always safe
+        version_s = self._received_data.get(0, 4)
+        if not version_s:
+            return
+        (version,) = struct.unpack(">L", version_s)
+        if version == 1:
+            table_start = 0x0c
+            fieldsize = 0x4
+        else:
+            table_start = 0x14
+            fieldsize = 0x8
+        offset_table_size = 6 * fieldsize
+        self._wanted.add(table_start, offset_table_size)
+
+    def _desire_UEB(self, o):
+        # UEB data is stored as (length,data).
+        if self._overrun_ok:
+            # We can pre-fetch 2kb, which should probably cover it. If it
+            # turns out to be larger, we'll come back here later with a known
+            # length and fetch the rest.
+            self._wanted.add(o["uri_extension"], 2048)
+            # now, while that is probably enough to fetch the whole UEB, it
+            # might not be, so we need to do the next few steps as well. In
+            # most cases, the following steps will not actually add anything
+            # to self._wanted
+
+        self._wanted.add(o["uri_extension"], self._fieldsize)
+        # only use a length if we're sure it's correct, otherwise we'll
+        # probably fetch a huge number
+        if not self.actual_offsets:
+            return
+        UEB_length_s = rdata.get(o["uri_extension"], self._fieldsize)
+        if UEB_length_s:
+            UEB_length = struct.unpack(UEB_length_s, self._fieldstruct)
+            # we know the length, so make sure we grab everything
+            self._wanted.add(o["uri_extension"]+self._fieldsize, UEB_length)
+
+    def _request_needed(self):
+        # send requests for metadata first, to avoid hanging on to large data
+        # blocks any longer than necessary.
+        self._send_requests(self._wanted - self._received - self._requested)
+        # then send requests for data blocks. All the hashes should arrive
+        # before the blocks, so the blocks can be consumed and released in a
+        # single turn.
+        self._send_requests(self._wanted_blocks - self._received - self._requested
+
+    def _send_requests(self, needed):
+        for (start, length) in needed:
+            # TODO: quantize to reasonably-large blocks
+            self._requested.add(start, length)
+            d = self._send_request(start, length)
+            d.addCallback(self._got_data, start, length)
+            d.addErrback(self._got_error)
+            d.addErrback(log.err, ...) # XXX
+
+    def _send_request(self, start, length):
+        return self._rref.callRemote("read", start, length)
+
+    def _got_data(self, data, start, length):
+        span = (start, length)
+        assert span in self._requested
+        self._requested.remove(start, length)
+        self._received.add(start, length)
+        self._received_data.add(start, data)
+        eventually(self.loop)
+
+    def _got_error(self, f): # XXX
+        ...
+
+
+class CommonShare:
+    """I hold data that is common across all instances of a single share,
+    like sh2 on both servers A and B. This is just the block hash tree.
+    """
+    def __init__(self, numsegs):
+        if numsegs is not None:
+            self._block_hash_tree = IncompleteHashTree(numsegs)
+
+    def got_numsegs(self, numsegs):
+        self._block_hash_tree = IncompleteHashTree(numsegs)
+
+    def process_block_hashes(self, block_hashes):
+        self._block_hash_tree.add_hashes(block_hashes)
+        return True
+    def check_block(self, segnum, block):
+        h = hashutil.block_hash(block)
+        try:
+            self._block_hash_tree.set_hashes(leaves={segnum: h})
+        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+            LOG(...)
+            return False
+        return True
+
+# all classes are also Services, and the rule is that you don't initiate more
+# work unless self.running
+
+# GC: decide whether each service is restartable or not. For non-restartable
+# services, stopService() should delete a lot of attributes to kill reference
+# cycles. The primary goal is to decref remote storage BucketReaders when a
+# download is complete.
+
+class SegmentFetcher:
+    """I am responsible for acquiring blocks for a single segment. I will use
+    the Share instances passed to my add_shares() method to locate, retrieve,
+    and validate those blocks. I expect my parent node to call my
+    no_more_shares() method when there are no more shares available. I will
+    call my parent's want_more_shares() method when I want more: I expect to
+    see at least one call to add_shares or no_more_shares afterwards.
+
+    When I have enough validated blocks, I will call my parent's
+    process_blocks() method with a dictionary that maps shnum to blockdata.
+    If I am unable to provide enough blocks, I will call my parent's
+    fetch_failed() method with (self, f). After either of these events, I
+    will shut down and do no further work. My parent can also call my stop()
+    method to have me shut down early."""
+
+    def __init__(self, node, segnum, k):
+        self._node = node # CiphertextFileNode
+        self.segnum = segnum
+        self._k = k
+        self._shares = {} # maps non-dead Share instance to a state, one of
+                          # (UNUSED, PENDING, OVERDUE, COMPLETE, CORRUPT).
+                          # State transition map is:
+                          #  UNUSED -(send-read)-> PENDING
+                          #  PENDING -(timer)-> OVERDUE
+                          #  PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
+                          #  OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
+                          # If a share becomes DEAD, it is removed from the
+                          # dict. If it becomes BADSEGNUM, the whole fetch is
+                          # terminated.
+        self._share_observers = {} # maps Share to Observer2 for active ones
+        self._shnums = DictOfSets() # maps shnum to the shares that provide it
+        self._blocks = {} # maps shnum to validated block data
+        self._no_more_shares = False
+        self._bad_segnum = False
+        self._running = True
+
+    def stop(self):
+        self._cancel_all_requests()
+        self._running = False
+        del self._shares # let GC work # ???
+
+
+    # called by our parent CiphertextFileNode
+
+    def add_shares(self, shares):
+        # called when ShareFinder locates a new share, and when a non-initial
+        # segment fetch is started and we already know about shares from the
+        # previous segment
+        for s in shares:
+            self._shares[s] = UNUSED
+            self._shnums[s.shnum].add(s)
+        eventually(self._loop)
+
+    def no_more_shares(self):
+        # ShareFinder tells us it's reached the end of its list
+        self._no_more_shares = True
+
+    # internal methods
+
+    def _count_shnums(self, *states):
+        """shnums for which at least one state is in the following list"""
+        shnums = []
+        for shnum,shares in self._shnums.iteritems():
+            matches = [s for s in shares if s.state in states]
+            if matches:
+                shnums.append(shnum)
+        return len(shnums)
+
+    def _loop(self):
+        if not self._running:
+            return
+        if self._bad_segnum:
+            # oops, we were asking for a segment number beyond the end of the
+            # file. This is an error.
+            self.stop()
+            e = BadSegmentNumberError("%d > %d" % (self.segnum,
+                                                   self._node.num_segments))
+            f = Failure(e)
+            self._node.fetch_failed(self, f)
+            return
+
+        # are we done?
+        if self._count_shnums(COMPLETE) >= self._k:
+            # yay!
+            self.stop()
+            self._node.process_blocks(self.segnum, self._blocks)
+            return
+
+        # we may have exhausted everything
+        if (self._no_more_shares and
+            self._count_shnums(UNUSED, PENDING, OVERDUE, COMPLETE) < self._k):
+            # no more new shares are coming, and the remaining hopeful shares
+            # aren't going to be enough. boo!
+            self.stop()
+            e = NotEnoughShares("...") # XXX
+            f = Failure(e)
+            self._node.fetch_failed(self, f)
+            return
+
+        # nope, not done. Are we "block-hungry" (i.e. do we want to send out
+        # more read requests, or do we think we have enough in flight
+        # already?)
+        while self._count_shnums(PENDING, COMPLETE) < self._k:
+            # we're hungry.. are there any unused shares?
+            sent = self._send_new_request()
+            if not sent:
+                break
+
+        # ok, now are we "share-hungry" (i.e. do we have enough known shares
+        # to make us happy, or should we ask the ShareFinder to get us more?)
+        if self._count_shnums(UNUSED, PENDING, COMPLETE) < self._k:
+            # we're hungry for more shares
+            self._node.want_more_shares()
+            # that will trigger the ShareFinder to keep looking
+
+    def _send_new_request(self):
+        for shnum,shares in self._shnums.iteritems():
+            states = [self._shares[s] for s in shares]
+            if COMPLETE in states or PENDING in states:
+                # don't send redundant requests
+                continue
+            if UNUSED not in states:
+                # no candidates for this shnum, move on
+                continue
+            # here's a candidate. Send a request.
+            s = find_one(shares, UNUSED) # XXX could choose fastest
+            self._shares[s] = PENDING
+            self._share_observers[s] = o = s.get_block(segnum)
+            o.subscribe(self._block_request_activity, share=s, shnum=shnum)
+            # TODO: build up a list of candidates, then walk through the
+            # list, sending requests to the most desireable servers,
+            # re-checking our block-hunger each time. For non-initial segment
+            # fetches, this would let us stick with faster servers.
+            return True
+        # nothing was sent: don't call us again until you have more shares to
+        # work with, or one of the existing shares has been declared OVERDUE
+        return False
+
+    def _cancel_all_requests(self):
+        for o in self._share_observers.values():
+            o.cancel()
+        self._share_observers = {}
+
+    def _block_request_activity(self, share, shnum, state, block=None):
+        # called by Shares, in response to our s.send_request() calls.
+        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
+        if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
+            del self._share_observers[share]
+        if state is COMPLETE:
+            # 'block' is fully validated
+            self._shares[share] = COMPLETE
+            self._blocks[shnum] = block
+        elif state is OVERDUE:
+            self._shares[share] = OVERDUE
+            # OVERDUE is not terminal: it will eventually transition to
+            # COMPLETE, CORRUPT, or DEAD.
+        elif state is CORRUPT:
+            self._shares[share] = CORRUPT
+        elif state is DEAD:
+            del self._shares[share]
+            self._shnums[shnum].remove(share)
+        elif state is BADSEGNUM:
+            self._shares[share] = BADSEGNUM # ???
+            self._bad_segnum = True
+        eventually(self._loop)
+
+
+class RequestToken:
+    def __init__(self, peerid):
+        self.peerid = peerid
+
+class ShareFinder:
+    def __init__(self, storage_broker, storage_index,
+                 share_consumer, max_outstanding_requests=10):
+        self.running = True
+        s = storage_broker.get_servers_for_index(storage_index)
+        self._servers = iter(s)
+        self.share_consumer = share_consumer
+        self.max_outstanding = max_outstanding_requests
+
+        self._hungry = False
+
+        self._commonshares = {} # shnum to CommonShare instance
+        self.undelivered_shares = []
+        self.pending_requests = set()
+
+        self._si_prefix = base32.b2a_l(storage_index[:8], 60)
+        self._lp = log.msg(format="ShareFinder[si=%(si)s] starting",
+                           si=self._si_prefix, level=log.NOISY, umid="2xjj2A")
+
+        self._num_segments = None
+        d = share_consumer.get_num_segments()
+        d.addCallback(self._got_numsegs)
+        d.addErrback(log.err, ...) # XXX
+
+    def log(self, *args, **kwargs):
+        if "parent" not in kwargs:
+            kwargs["parent"] = self._lp
+        return log.msg(*args, **kwargs)
+
+    def stop(self):
+        self.running = False
+
+    def _got_numsegs(self, numsegs):
+        for cs in self._commonshares.values():
+            cs.got_numsegs(numsegs)
+        self._num_segments = numsegs
+
+    # called by our parent CiphertextDownloader
+    def hungry(self):
+        log.msg(format="ShareFinder[si=%(si)s] hungry",
+                si=self._si_prefix, level=log.NOISY, umid="NywYaQ")
+        self._hungry = True
+        eventually(self.loop)
+
+    # internal methods
+    def loop(self):
+        log.msg(format="ShareFinder[si=%(si)s] loop: running=%(running)s"
+                " hungry=%(hungry)s, undelivered=%(undelivered)s,"
+                " pending=%(pending)s",
+                si=self._si_prefix, running=self._running, hungry=self._hungry,
+                undelivered=",".join(["sh%d@%s" % (s._shnum,
+                                                   idlib.shortnodeid_b2a(s._peerid))
+                                      for s in self.undelivered_shares]),
+                pending=",".join([idlib.shortnodeid_b2a(rt.peerid)
+                                  for rt in self.pending_requests]), # sort?
+                level=log.NOISY, umid="kRtS4Q")
+        if not self.running:
+            return
+        if not self._hungry:
+            return
+        if self.undelivered_shares:
+            sh = self.undelivered_shares.pop(0)
+            # they will call hungry() again if they want more
+            self._hungry = False
+            eventually(self.share_consumer.got_shares, [sh])
+            return
+        if len(self.pending_requests) >= self.max_outstanding_requests:
+            # cannot send more requests, must wait for some to retire
+            return
+
+        server = None
+        try:
+            if self._servers:
+                server = self._servers.next()
+        except StopIteration:
+            self._servers = None
+
+        if server:
+            self.send_request(server)
+            return
+
+        if self.pending_requests:
+            # no server, but there are still requests in flight: maybe one of
+            # them will make progress
+            return
+
+        # we've run out of servers (so we can't send any more requests), and
+        # we have nothing in flight. No further progress can be made. They
+        # are destined to remain hungry.
+        self.share_consumer.no_more_shares()
+        self.stop()
+
+
+    def send_request(self, server):
+        peerid, rref = server
+        req = RequestToken(peerid)
+        self.pending_requests.add(req)
+        lp = self.log(format="sending DYHB to [%(peerid)s]",
+                      peerid=idlib.shortnodeid_b2a(peerid),
+                      level=log.NOISY, umid="Io7pyg")
+        d = rref.callRemote("get_buckets", self._storage_index)
+        d.addBoth(incidentally, self.pending_requests.discard, req)
+        d.addCallbacks(self._got_response, self._got_error,
+                       callbackArgs=(peerid, req, lp))
+        d.addErrback(log.err, format="error in send_request",
+                     level=log.WEIRD, parent=lp, umid="rpdV0w")
+        d.addCallback(incidentally, eventually, self.loop)
+
+    def _got_response(self, buckets, peerid, req, lp):
+        if buckets:
+            shnums_s = ",".join([str(shnum) for shnum in buckets])
+            self.log(format="got shnums [%s] from [%(peerid)s]" % shnums_s,
+                     peerid=idlib.shortnodeid_b2a(peerid),
+                     level=log.NOISY, parent=lp, umid="0fcEZw")
+        else:
+            self.log(format="no shares from [%(peerid)s]",
+                     peerid=idlib.shortnodeid_b2a(peerid),
+                     level=log.NOISY, parent=lp, umid="U7d4JA")
+        for shnum, bucket in buckets.iteritems():
+            if shnum not in self._commonshares:
+                self._commonshares[shnum] = CommonShare(self._num_segments)
+            cs = self._commonshares[shnum]
+            s = Share(bucket, self.verifycap, cs, self.node,
+                      peerid, shnum)
+            self.undelivered_shares.append(s)
+
+    def _got_error(self, f, peerid, req):
+        self.log(format="got error from [%(peerid)s]",
+                 peerid=idlib.shortnodeid_b2a(peerid), failure=f,
+                 level=log.UNUSUAL, parent=lp, umid="zUKdCw")
+
+
+
+class Segmentation:
+    """I am responsible for a single offset+size read of the file. I handle
+    segmentation: I figure out which segments are necessary, request them
+    (from my CiphertextDownloader) in order, and trim the segments down to
+    match the offset+size span. I use the Producer/Consumer interface to only
+    request one segment at a time.
+    """
+    implements(IPushProducer)
+    def __init__(self, node, offset, size, consumer):
+        self._node = node
+        self._hungry = True
+        self._active_segnum = None
+        self._cancel_segment_request = None
+        # these are updated as we deliver data. At any given time, we still
+        # want to download file[offset:offset+size]
+        self._offset = offset
+        self._size = size
+        self._consumer = consumer
+
+    def start(self):
+        self._alive = True
+        self._deferred = defer.Deferred()
+        self._consumer.registerProducer(self) # XXX???
+        self._maybe_fetch_next()
+        return self._deferred
+
+    def _maybe_fetch_next(self):
+        if not self._alive or not self._hungry:
+            return
+        if self._active_segnum is not None:
+            return
+        self._fetch_next()
+
+    def _fetch_next(self):
+        if self._size == 0:
+            # done!
+            self._alive = False
+            self._hungry = False
+            self._consumer.unregisterProducer()
+            self._deferred.callback(self._consumer)
+            return
+        n = self._node
+        have_actual_segment_size = n.actual_segment_size is not None
+        segment_size = n.actual_segment_size or n.guessed_segment_size
+        if self._offset == 0:
+            # great! we want segment0 for sure
+            wanted_segnum = 0
+        else:
+            # this might be a guess
+            wanted_segnum = self._offset // segment_size
+        self._active_segnum = wanted_segnum
+        d,c = self._node.get_segment(wanted_segnum)
+        self._cancel_segment_request = c
+        d.addBoth(self._request_retired)
+        d.addCallback(self._got_segment, have_actual_segment_size)
+        d.addErrback(self._retry_bad_segment, have_actual_segment_size)
+        d.addErrback(self._error)
+
+    def _request_retired(self, res):
+        self._active_segnum = None
+        self._cancel_segment_request = None
+        return res
+
+    def _got_segment(self, (segment_start,segment), had_actual_segment_size):
+        self._active_segnum = None
+        self._cancel_segment_request = None
+        # we got file[segment_start:segment_start+len(segment)]
+        # we want file[self._offset:self._offset+self._size]
+        o = overlap(segment_start, len(segment),  self._offset, self._size)
+        # the overlap is file[o[0]:o[0]+o[1]]
+        if not o or o[0] != self._offset:
+            # we didn't get the first byte, so we can't use this segment
+            if have_actual_segment_size:
+                # and we should have gotten it right. This is big problem.
+                raise SOMETHING
+            # we've wasted some bandwidth, but now we can grab the right one,
+            # because we should know the segsize by now.
+            assert self._node.actual_segment_size is not None
+            self._maybe_fetch_next()
+            return
+        offset_in_segment = self._offset - segment_start
+        desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
+
+        self._offset += len(desired_data)
+        self._size -= len(desired_data)
+        self._consumer.write(desired_data)
+        self._maybe_fetch_next()
+
+    def _retry_bad_segment(self, f, had_actual_segment_size):
+        f.trap(BadSegmentNumberError): # guessed way wrong, off the end
+        if had_actual_segment_size:
+            # but we should have known better, so this is a real error
+            return f
+        # we didn't know better: try again with more information
+        return self._maybe_fetch_next()
+
+    def _error(self, f):
+        self._alive = False
+        self._hungry = False
+        self._consumer.unregisterProducer()
+        self._deferred.errback(f)
+
+    def stopProducing(self):
+        self._hungry = False
+        self._alive = False
+        # cancel any outstanding segment request
+        if self._cancel_segment_request:
+            self._cancel_segment_request()
+            self._cancel_segment_request = None
+    def pauseProducing(self):
+        self._hungry = False
+    def resumeProducing(self):
+        self._hungry = True
+        eventually(self._maybe_fetch_next)
+
+class Cancel:
+    def __init__(self, f):
+        self._f = f
+        self.cancelled = False
+    def cancel(self):
+        if not self.cancelled:
+            self.cancelled = True
+            self._f(self)
+
+class CiphertextFileNode:
+    # Share._node points to me
+    def __init__(self, verifycap, storage_broker, secret_holder,
+                 terminator, history):
+        assert isinstance(verifycap, CHKFileVerifierURI)
+        self.u = verifycap
+        storage_index = verifycap.storage_index
+        self._needed_shares = verifycap.needed_shares
+        self._total_shares = verifycap.total_shares
+        self.running = True
+        terminator.register(self) # calls self.stop() at stopService()
+        # the rule is: only send network requests if you're active
+        # (self.running is True). You can do eventual-sends any time. This
+        # rule should mean that once stopService()+flushEventualQueue()
+        # fires, everything will be done.
+        self._secret_holder = secret_holder
+        self._history = history
+
+        self.share_hash_tree = IncompleteHashTree(self.u.total_shares)
+
+        # we guess the segment size, so Segmentation can pull non-initial
+        # segments in a single roundtrip
+        k = verifycap.needed_shares
+        max_segment_size = 128*KiB # TODO: pull from elsewhere, maybe the
+                                   # same place as upload.BaseUploadable
+        s = mathutil.next_multiple(min(verifycap.size, max_segment_size), k)
+        self.guessed_segment_size = s
+
+        # filled in when we parse a valid UEB
+        self.have_UEB = False
+        self.num_segments = None
+        self.segment_size = None
+        self.tail_data_size = None
+        self.tail_segment_size = None
+        self.block_size = None
+        self.share_size = None
+        self.ciphertext_hash_tree = None # size depends on num_segments
+        self.ciphertext_hash = None # flat hash, optional
+
+        # things to track callers that want data
+        self._segsize_observers = OneShotObserverList()
+        self._numsegs_observers = OneShotObserverList()
+        # _segment_requests can have duplicates
+        self._segment_requests = [] # (segnum, d, cancel_handle)
+        self._active_segment = None # a SegmentFetcher, with .segnum
+
+        self._sharefinder = ShareFinder(storage_broker, storage_index, self)
+        self._shares = set()
+
+    def stop(self):
+        # called by the Terminator at shutdown, mostly for tests
+        if self._active_segment:
+            self._active_segment.stop()
+            self._active_segment = None
+        self._sharefinder.stop()
+
+    # things called by our client, either a filenode user or an
+    # ImmutableFileNode wrapper
+
+    def read(self, consumer, offset=0, size=None):
+        """I am the main entry point, from which FileNode.read() can get
+        data. I feed the consumer with the desired range of ciphertext. I
+        return a Deferred that fires (with the consumer) when the read is
+        finished."""
+        # for concurrent operations: each gets its own Segmentation manager
+        if size is None:
+            size = self._size - offset
+        s = Segmentation(self, offset, size, consumer)
+        # this raises an interesting question: what segments to fetch? if
+        # offset=0, always fetch the first segment, and then allow
+        # Segmentation to be responsible for pulling the subsequent ones if
+        # the first wasn't large enough. If offset>0, we're going to need an
+        # extra roundtrip to get the UEB (and therefore the segment size)
+        # before we can figure out which segment to get. TODO: allow the
+        # offset-table-guessing code (which starts by guessing the segsize)
+        # to assist the offset>0 process.
+        d = s.start()
+        return d
+
+    def get_segment(self, segnum):
+        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
+        Deferred that fires with (offset,data) when the desired segment is
+        available, and c is an object on which c.cancel() can be called to
+        disavow interest in the segment (after which 'd' will never fire).
+
+        You probably need to know the segment size before calling this,
+        unless you want the first few bytes of the file. If you ask for a
+        segment number which turns out to be too large, the Deferred will
+        errback with BadSegmentNumberError.
+
+        The Deferred fires with the offset of the first byte of the data
+        segment, so that you can call get_segment() before knowing the
+        segment size, and still know which data you received.
+        """
+        d = defer.Deferred()
+        c = Cancel(self._cancel_request)
+        self._segment_requests.append( (segnum, d, c) )
+        self._start_new_segment()
+        eventually(self._loop)
+        return (d, c)
+
+    # things called by the Segmentation object used to transform
+    # arbitrary-sized read() calls into quantized segment fetches
+
+    def get_segment_size(self):
+        """I return a Deferred that fires with the segment_size used by this
+        file."""
+        return self._segsize_observers.when_fired()
+    def get_num_segments(self):
+        """I return a Deferred that fires with the number of segments used by
+        this file."""
+        return self._numsegs_observers.when_fired()
+
+    def _start_new_segment(self):
+        if self._active_segment is None and self._segment_requests:
+            segnum = self._segment_requests[0][0]
+            self._active_segment = fetcher = SegmentFetcher(self, segnum,
+                                                            self._needed_shares)
+            active_shares = [s for s in self._shares if s.not_dead()]
+            fetcher.add_shares(active_shares) # this triggers the loop
+
+
+    # called by our child ShareFinder
+    def got_shares(self, shares):
+        self._shares.update(shares)
+        if self._active_segment
+            self._active_segment.add_shares(shares)
+    def no_more_shares(self):
+        self._no_more_shares = True
+        if self._active_segment:
+            self._active_segment.no_more_shares()
+
+    # things called by our Share instances
+
+    def validate_UEB(self, UEB_s):
+        h = hashutil.uri_extension_hash(UEB_s)
+        if h != self._verifycap.uri_extension_hash:
+            raise hashutil.BadHashError
+        UEB_dict = uri.unpack_extension(data)
+        self._parse_UEB(self, UEB_dict) # sets self._stuff
+        # TODO: a malformed (but authentic) UEB could throw an assertion in
+        # _parse_UEB, and we should abandon the download.
+        self.have_UEB = True
+        self._segsize_observers.fire(self.segment_size)
+        self._numsegs_observers.fire(self.num_segments)
+
+
+    def _parse_UEB(self, d):
+        # Note: the UEB contains needed_shares and total_shares. These are
+        # redundant and inferior (the filecap contains the authoritative
+        # values). However, because it is possible to encode the same file in
+        # multiple ways, and the encoders might choose (poorly) to use the
+        # same key for both (therefore getting the same SI), we might
+        # encounter shares for both types. The UEB hashes will be different,
+        # however, and we'll disregard the "other" encoding's shares as
+        # corrupted.
+
+        # therefore, we ignore d['total_shares'] and d['needed_shares'].
+
+        self.share_size = mathutil.div_ceil(self._verifycap.size,
+                                            self._needed_shares)
+
+        self.segment_size = d['segment_size']
+        for r in self._readers:
+            r.set_segment_size(self.segment_size)
+
+        self.block_size = mathutil.div_ceil(self._segsize, self._needed_shares)
+        self.num_segments = mathutil.div_ceil(self._size, self.segment_size)
+
+        self.tail_data_size = self._size % self.segment_size
+        if self.tail_data_size == 0:
+            self.tail_data_size = self.segment_size
+        # padding for erasure code
+        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
+                                                        self._needed_shares)
+
+        # zfec.Decode() instantiation is fast, but still, let's use the same
+        # codec for anything we can. 3-of-10 takes 15us on my laptop,
+        # 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is 2.5ms,
+        # worst-case 254-of-255 is 9.3ms
+        self._codec = codec.CRSDecoder()
+        self._codec.set_params(self.segment_size,
+                               self._needed_shares, self._total_shares)
+
+
+        # Ciphertext hash tree root is mandatory, so that there is at most
+        # one ciphertext that matches this read-cap or verify-cap. The
+        # integrity check on the shares is not sufficient to prevent the
+        # original encoder from creating some shares of file A and other
+        # shares of file B.
+        self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
+        self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
+
+        self.share_hash_tree.set_hashes({0: d['share_root_hash']})
+
+        # crypttext_hash is optional. We only pull this from the first UEB
+        # that we see.
+        if 'crypttext_hash' in d:
+            if len(d["crypttext_hash"]) == hashutil.CRYPTO_VAL_SIZE:
+                self.ciphertext_hash = d['crypttext_hash']
+            else:
+                log.msg("ignoring bad-length UEB[crypttext_hash], "
+                        "got %d bytes, want %d" % (len(d['crypttext_hash']),
+                                                   hashutil.CRYPTO_VAL_SIZE),
+                        umid="oZkGLA", level=log.WEIRD)
+
+        # Our job is a fast download, not verification, so we ignore any
+        # redundant fields. The Verifier uses a different code path which
+        # does not ignore them.
+
+
+    def process_share_hashes(self, share_hashes):
+        self.share_hash_tree.set_hashes(share_hashes)
+
+    # called by our child SegmentFetcher
+
+    def want_more_shares(self):
+        self._sharefinder.hungry()
+
+    def fetch_failed(self, sf, f):
+        assert sf is self._active_segment
+        sf.disownServiceParent()
+        self._active_segment = None
+        # deliver error upwards
+        for (d,c) in self._extract_requests(sf.segnum):
+            eventually(self._deliver_error, d, c, f)
+
+    def _deliver_error(self, d, c, f):
+        # this method exists to handle cancel() that occurs between
+        # _got_segment and _deliver_error
+        if not c.cancelled:
+            d.errback(f)
+
+    def process_blocks(self, segnum, blocks):
+        codec = self._codec
+        if segnum == self.num_segments-1:
+            codec = codec.CRSDecoder()
+            k, N = self._needed_shares, self._total_shares
+            codec.set_params(self.tail_segment_size, k, N)
+
+        shares = []
+        shareids = []
+        for (shareid, share) in blocks.iteritems():
+            shareids.append(shareid)
+            shares.append(share)
+        del blocks
+        segment = codec.decode(shares, shareids)
+        del shares
+        self._process_segment(segnum, segment)
+
+    def _process_segment(self, segnum, segment):
+        h = hashutil.crypttext_hash(segment)
+        try:
+            self.ciphertext_hash_tree.set_hashes(leaves={segnum, h})
+        except SOMETHING:
+            SOMETHING
+        assert self._active_segment.segnum == segnum
+        assert self.segment_size is not None
+        offset = segnum * self.segment_size
+        for (d,c) in self._extract_requests(segnum):
+            eventually(self._deliver, d, c, offset, segment)
+        self._active_segment = None
+        self._start_new_segment()
+
+    def _deliver(self, d, c, offset, segment):
+        # this method exists to handle cancel() that occurs between
+        # _got_segment and _deliver
+        if not c.cancelled:
+            d.callback((offset,segment))
+
+    def _extract_requests(self, segnum):
+        """Remove matching requests and return their (d,c) tuples so that the
+        caller can retire them."""
+        retire = [(d,c) for (segnum0, d, c) in self._segment_requests
+                  if segnum0 == segnum]
+        self._segment_requests = [t for t in self._segment_requests
+                                  if t[0] != segnum]
+        return retire
+
+    def _cancel_request(self, c):
+        self._segment_requests = [t for t in self._segment_requests
+                                  if t[2] != c]
+        segnums = [segnum for (segnum,d,c) in self._segment_requests]
+        if self._active_segment.segnum not in segnums:
+            self._active_segment.stop()
+            self._active_segment = None
+            self._start_new_segment()
+
+class DecryptingConsumer:
+    """I sit between a CiphertextDownloader (which acts as a Producer) and
+    the real Consumer, decrypting everything that passes by. The real
+    Consumer sees the real Producer, but the Producer sees us instead of the
+    real consumer."""
+    implements(IConsumer)
+
+    def __init__(self, consumer, readkey, offset):
+        self._consumer = consumer
+        # TODO: pycryptopp CTR-mode needs random-access operations: I want
+        # either a=AES(readkey, offset) or better yet both of:
+        #  a=AES(readkey, offset=0)
+        #  a.process(ciphertext, offset=xyz)
+        # For now, we fake it with the existing iv= argument.
+        offset_big = offset // 16
+        offset_small = offset % 16
+        iv = binascii.unhexlify("%032x" % offset_big)
+        self._decryptor = AES(readkey, iv=iv)
+        self._decryptor.process("\x00"*offset_small)
+
+    def registerProducer(self, producer):
+        # this passes through, so the real consumer can flow-control the real
+        # producer. Therefore we don't need to provide any IPushProducer
+        # methods. We implement all the IConsumer methods as pass-throughs,
+        # and only intercept write() to perform decryption.
+        self._consumer.registerProducer(producer)
+    def unregisterProducer(self):
+        self._consumer.unregisterProducer()
+    def write(self, ciphertext):
+        plaintext = self._decryptor.process(ciphertext)
+        self._consumer.write(plaintext)
+
+class ImmutableFileNode:
+    # I wrap a CiphertextFileNode with a decryption key
+    def __init__(self, filecap, storage_broker, secret_holder, downloader,
+                 history):
+        assert isinstance(filecap, CHKFileURI)
+        verifycap = filecap.get_verify_cap()
+        self._cnode = CiphertextFileNode(verifycap, storage_broker,
+                                         secret_holder, downloader, history)
+        assert isinstance(filecap, CHKFileURI)
+        self.u = filecap
+
+    def read(self, consumer, offset=0, size=None):
+        decryptor = DecryptingConsumer(consumer, self._readkey, offset)
+        return self._cnode.read(decryptor, offset, size)
+
+
+# TODO: if server1 has all shares, and server2-10 have one each, make the
+# loop stall slightly before requesting all shares from the first server, to
+# give it a chance to learn about the other shares and get some diversity.
+# Or, don't bother, let the first block all come from one server, and take
+# comfort in the fact that we'll learn about the other servers by the time we
+# fetch the second block.
+#
+# davidsarah points out that we could use sequential (instead of parallel)
+# fetching of multiple block from a single server: by the time the first
+# block arrives, we'll hopefully have heard about other shares. This would
+# induce some RTT delays (i.e. lose pipelining) in the case that this server
+# has the only shares, but that seems tolerable. We could rig it to only use
+# sequential requests on the first segment.
+
+# as a query gets later, we're more willing to duplicate work.
+
+# should change server read protocol to allow small shares to be fetched in a
+# single RTT. Instead of get_buckets-then-read, just use read(shnums, readv),
+# where shnums=[] means all shares, and the return value is a dict of
+# # shnum->ta (like with mutable files). The DYHB query should also fetch the
+# offset table, since everything else can be located once we have that.
+
+
+# ImmutableFileNode
+#    DecryptingConsumer
+#  CiphertextFileNode
+#    Segmentation
+#   ShareFinder
+#   SegmentFetcher[segnum] (one at a time)
+#   CommonShare[shnum]
+#   Share[shnum,server]
+
+# TODO: when we learn numsegs, any get_segment() calls for bad blocknumbers
+# should be failed with BadSegmentNumberError. But should this be the
+# responsibility of CiphertextFileNode, or SegmentFetcher? The knowledge will
+# first appear when a Share receives a valid UEB and calls
+# CiphertextFileNode.validate_UEB, then _parse_UEB. The SegmentFetcher is
+# expecting to hear from the Share, via the _block_request_activity observer.
+
+# make it the responsibility of the SegmentFetcher. Each Share that gets a
+# valid UEB will tell the SegmentFetcher BADSEGNUM (instead of COMPLETE or
+# CORRUPT). The SegmentFetcher it then responsible for shutting down, and
+# informing its parent (the CiphertextFileNode) of the BadSegmentNumberError,
+# which is then passed to the client of get_segment().
+
+
+# TODO: if offset table is corrupt, attacker could cause us to fetch whole
+# (large) share
diff --git a/src/allmydata/immutable/download2_off.py b/src/allmydata/immutable/download2_off.py
new file mode 100755
index 0000000..d2b8b99
--- /dev/null
+++ b/src/allmydata/immutable/download2_off.py
@@ -0,0 +1,634 @@
+#! /usr/bin/python
+
+# known (shnum,Server) pairs are sorted into a list according to
+# desireability. This sort is picking a winding path through a matrix of
+# [shnum][server]. The goal is to get diversity of both shnum and server.
+
+# The initial order is:
+#  find the lowest shnum on the first server, add it
+#  look at the next server, find the lowest shnum that we don't already have
+#   if any
+#  next server, etc, until all known servers are checked
+#  now look at servers that we skipped (because ...
+
+# Keep track of which block requests are outstanding by (shnum,Server). Don't
+# bother prioritizing "validated" shares: the overhead to pull the share hash
+# chain is tiny (4 hashes = 128 bytes), and the overhead to pull a new block
+# hash chain is also tiny (1GB file, 8192 segments of 128KiB each, 13 hashes,
+# 832 bytes). Each time a block request is sent, also request any necessary
+# hashes. Don't bother with a "ValidatedShare" class (as distinct from some
+# other sort of Share). Don't bother avoiding duplicate hash-chain requests.
+
+# For each outstanding segread, walk the list and send requests (skipping
+# outstanding shnums) until requests for k distinct shnums are in flight. If
+# we can't do that, ask for more. If we get impatient on a request, find the
+# first non-outstanding
+
+# start with the first Share in the list, and send a request. Then look at
+# the next one. If we already have a pending request for the same shnum or
+# server, push that Share down onto the fallback list and try the next one,
+# etc. If we run out of non-fallback shares, use the fallback ones,
+# preferring shnums that we don't have outstanding requests for (i.e. assume
+# that all requests will complete). Do this by having a second fallback list.
+
+# hell, I'm reviving the Herder. But remember, we're still talking 3 objects
+# per file, not thousands.
+
+# actually, don't bother sorting the initial list. Append Shares as the
+# responses come back, that will put the fastest servers at the front of the
+# list, and give a tiny preference to servers that are earlier in the
+# permuted order.
+
+# more ideas:
+#  sort shares by:
+#   1: number of roundtrips needed to get some data
+#   2: share number
+#   3: ms of RTT delay
+# maybe measure average time-to-completion of requests, compare completion
+# time against that, much larger indicates congestion on the server side
+# or the server's upstream speed is less than our downstream. Minimum
+# time-to-completion indicates min(our-downstream,their-upstream). Could
+# fetch shares one-at-a-time to measure that better.
+
+# when should we risk duplicate work and send a new request?
+
+def walk(self):
+    shares = sorted(list)
+    oldshares = copy(shares)
+    outstanding = list()
+    fallbacks = list()
+    second_fallbacks = list()
+    while len(outstanding.nonlate.shnums) < k: # need more requests
+        while oldshares:
+            s = shares.pop(0)
+            if s.server in outstanding.servers or s.shnum in outstanding.shnums:
+                fallbacks.append(s)
+                continue
+            outstanding.append(s)
+            send_request(s)
+            break #'while need_more_requests'
+        # must use fallback list. Ask for more servers while we're at it.
+        ask_for_more_servers()
+        while fallbacks:
+            s = fallbacks.pop(0)
+            if s.shnum in outstanding.shnums:
+                # assume that the outstanding requests will complete, but
+                # send new requests for other shnums to existing servers
+                second_fallbacks.append(s)
+                continue
+            outstanding.append(s)
+            send_request(s)
+            break #'while need_more_requests'
+        # if we get here, we're being forced to send out multiple queries per
+        # share. We've already asked for more servers, which might help. If
+        # there are no late outstanding queries, then duplicate shares won't
+        # help. Don't send queries for duplicate shares until some of the
+        # queries are late.
+        if outstanding.late:
+            # we're allowed to try any non-outstanding share
+            while second_fallbacks:
+                pass
+    newshares = outstanding + fallbacks + second_fallbacks + oldshares
+        
+
+class Server:
+    """I represent an abstract Storage Server. One day, the StorageBroker
+    will return instances of me. For now, the StorageBroker returns (peerid,
+    RemoteReference) tuples, and this code wraps a Server instance around
+    them.
+    """
+    def __init__(self, peerid, ss):
+        self.peerid = peerid
+        self.remote = ss
+        self._remote_buckets = {} # maps shnum to RIBucketReader
+        # TODO: release the bucket references on shares that we no longer
+        # want. OTOH, why would we not want them? Corruption?
+
+    def send_query(self, storage_index):
+        """I return a Deferred that fires with a set of shnums. If the server
+        had shares available, I will retain the RemoteReferences to its
+        buckets, so that get_data(shnum, range) can be called later."""
+        d = self.remote.callRemote("get_buckets", self.storage_index)
+        d.addCallback(self._got_response)
+        return d
+
+    def _got_response(self, r):
+        self._remote_buckets = r
+        return set(r.keys())
+
+class ShareOnAServer:
+    """I represent one instance of a share, known to live on a specific
+    server. I am created every time a server responds affirmatively to a
+    do-you-have-block query."""
+
+    def __init__(self, shnum, server):
+        self._shnum = shnum
+        self._server = server
+        self._block_hash_tree = None
+
+    def cost(self, segnum):
+        """I return a tuple of (roundtrips, bytes, rtt), indicating how
+        expensive I think it would be to fetch the given segment. Roundtrips
+        indicates how many roundtrips it is likely to take (one to get the
+        data and hashes, plus one to get the offset table and UEB if this is
+        the first segment we've ever fetched). 'bytes' is how many bytes we
+        must fetch (estimated). 'rtt' is estimated round-trip time (float) in
+        seconds for a trivial request. The downloading algorithm will compare
+        costs to decide which shares should be used."""
+        # the most significant factor here is roundtrips: a Share for which
+        # we already have the offset table is better to than a brand new one
+
+    def max_bandwidth(self):
+        """Return a float, indicating the highest plausible bytes-per-second
+        that I've observed coming from this share. This will be based upon
+        the minimum (bytes-per-fetch / time-per-fetch) ever observed. This
+        can we used to estimate the server's upstream bandwidth. Clearly this
+        is only accurate if a share is retrieved with no contention for
+        either the upstream, downstream, or middle of the connection, but it
+        may still serve as a useful metric for deciding which servers to pull
+        from."""
+
+    def get_segment(self, segnum):
+        """I return a Deferred that will fire with the segment data, or
+        errback."""
+
+class NativeShareOnAServer(ShareOnAServer):
+    """For tahoe native (foolscap) servers, I contain a RemoteReference to
+    the RIBucketReader instance."""
+    def __init__(self, shnum, server, rref):
+        ShareOnAServer.__init__(self, shnum, server)
+        self._rref = rref # RIBucketReader
+
+class Share:
+    def __init__(self, shnum):
+        self._shnum = shnum
+        # _servers are the Server instances which appear to hold a copy of
+        # this share. It is populated when the ValidShare is first created,
+        # or when we receive a get_buckets() response for a shnum that
+        # already has a ValidShare instance. When we lose the connection to a
+        # server, we remove it.
+        self._servers = set()
+        # offsets, UEB, and share_hash_tree all live in the parent.
+        # block_hash_tree lives here.
+        self._block_hash_tree = None
+
+        self._want
+
+    def get_servers(self):
+        return self._servers
+
+
+    def get_block(self, segnum):
+        # read enough data to obtain a single validated block
+        if not self.have_offsets:
+            # we get the offsets in their own read, since they tell us where
+            # everything else lives. We must fetch offsets for each share
+            # separately, since they aren't directly covered by the UEB.
+            pass
+        if not self.parent.have_ueb:
+            # use _guessed_segsize to make a guess about the layout, so we
+            # can fetch both the offset table and the UEB in the same read.
+            # This also requires making a guess about the presence or absence
+            # of the plaintext_hash_tree. Oh, and also the version number. Oh
+            # well.
+            pass
+
+class CiphertextDownloader:
+    """I manage all downloads for a single file. I operate a state machine
+    with input events that are local read() requests, responses to my remote
+    'get_bucket' and 'read_bucket' messages, and connection establishment and
+    loss. My outbound events are connection establishment requests and bucket
+    read requests messages.
+    """
+    # eventually this will merge into the FileNode
+    ServerClass = Server # for tests to override
+
+    def __init__(self, storage_index, ueb_hash, size, k, N, storage_broker,
+                 shutdowner):
+        # values we get from the filecap
+        self._storage_index = si = storage_index
+        self._ueb_hash = ueb_hash
+        self._size = size
+        self._needed_shares = k
+        self._total_shares = N
+        self._share_hash_tree = IncompleteHashTree(self._total_shares)
+        # values we discover when we first fetch the UEB
+        self._ueb = None # is dict after UEB fetch+validate
+        self._segsize = None
+        self._numsegs = None
+        self._blocksize = None
+        self._tail_segsize = None
+        self._ciphertext_hash = None # optional
+        # structures we create when we fetch the UEB, then continue to fill
+        # as we download the file
+        self._share_hash_tree = None # is IncompleteHashTree after UEB fetch
+        self._ciphertext_hash_tree = None
+
+        # values we learn as we download the file
+        self._offsets = {} # (shnum,Server) to offset table (dict)
+        self._block_hash_tree = {} # shnum to IncompleteHashTree
+        # other things which help us
+        self._guessed_segsize = min(128*1024, size)
+        self._active_share_readers = {} # maps shnum to Reader instance
+        self._share_readers = [] # sorted by preference, best first
+        self._readers = set() # set of Reader instances
+        self._recent_horizon = 10 # seconds
+
+        # 'shutdowner' is a MultiService parent used to cancel all downloads
+        # when the node is shutting down, to let tests have a clean reactor.
+
+        self._init_available_servers()
+        self._init_find_enough_shares()
+
+    # _available_servers is an iterator that provides us with Server
+    # instances. Each time we pull out a Server, we immediately send it a
+    # query, so we don't need to keep track of who we've sent queries to.
+
+    def _init_available_servers(self):
+        self._available_servers = self._get_available_servers()
+        self._no_more_available_servers = False
+
+    def _get_available_servers(self):
+        """I am a generator of servers to use, sorted by the order in which
+        we should query them. I make sure there are no duplicates in this
+        list."""
+        # TODO: make StorageBroker responsible for this non-duplication, and
+        # replace this method with a simple iter(get_servers_for_index()),
+        # plus a self._no_more_available_servers=True
+        seen = set()
+        sb = self._storage_broker
+        for (peerid, ss) in sb.get_servers_for_index(self._storage_index):
+            if peerid not in seen:
+                yield self.ServerClass(peerid, ss) # Server(peerid, ss)
+                seen.add(peerid)
+        self._no_more_available_servers = True
+
+    # this block of code is responsible for having enough non-problematic
+    # distinct shares/servers available and ready for download, and for
+    # limiting the number of queries that are outstanding. The idea is that
+    # we'll use the k fastest/best shares, and have the other ones in reserve
+    # in case those servers stop responding or respond too slowly. We keep
+    # track of all known shares, but we also keep track of problematic shares
+    # (ones with hash failures or lost connections), so we can put them at
+    # the bottom of the list.
+
+    def _init_find_enough_shares(self):
+        # _unvalidated_sharemap maps shnum to set of Servers, and remembers
+        # where viable (but not yet validated) shares are located. Each
+        # get_bucket() response adds to this map, each act of validation
+        # removes from it.
+        self._sharemap = DictOfSets()
+
+        # _sharemap maps shnum to set of Servers, and remembers where viable
+        # shares are located. Each get_bucket() response adds to this map,
+        # each hash failure or disconnect removes from it. (TODO: if we
+        # disconnect but reconnect later, we should be allowed to re-query).
+        self._sharemap = DictOfSets()
+
+        # _problem_shares is a set of (shnum, Server) tuples, and
+
+        # _queries_in_flight maps a Server to a timestamp, which remembers
+        # which servers we've sent queries to (and when) but have not yet
+        # heard a response. This lets us put a limit on the number of
+        # outstanding queries, to limit the size of the work window (how much
+        # extra work we ask servers to do in the hopes of keeping our own
+        # pipeline filled). We remove a Server from _queries_in_flight when
+        # we get an answer/error or we finally give up. If we ever switch to
+        # a non-connection-oriented protocol (like UDP, or forwarded Chord
+        # queries), we can use this information to retransmit any query that
+        # has gone unanswered for too long.
+        self._queries_in_flight = dict()
+
+    def _count_recent_queries_in_flight(self):
+        now = time.time()
+        recent = now - self._recent_horizon
+        return len([s for (s,when) in self._queries_in_flight.items()
+                    if when > recent])
+
+    def _find_enough_shares(self):
+        # goal: have 2*k distinct not-invalid shares available for reading,
+        # from 2*k distinct servers. Do not have more than 4*k "recent"
+        # queries in flight at a time.
+        if (len(self._sharemap) >= 2*self._needed_shares
+            and len(self._sharemap.values) >= 2*self._needed_shares):
+            return
+        num = self._count_recent_queries_in_flight()
+        while num < 4*self._needed_shares:
+            try:
+                s = self._available_servers.next()
+            except StopIteration:
+                return # no more progress can be made
+            self._queries_in_flight[s] = time.time()
+            d = s.send_query(self._storage_index)
+            d.addBoth(incidentally, self._queries_in_flight.discard, s)
+            d.addCallbacks(lambda shnums: [self._sharemap.add(shnum, s)
+                                           for shnum in shnums],
+                           lambda f: self._query_error(f, s))
+            d.addErrback(self._error)
+            d.addCallback(self._reschedule)
+            num += 1
+
+    def _query_error(self, f, s):
+        # a server returned an error, log it gently and ignore
+        level = log.WEIRD
+        if f.check(DeadReferenceError):
+            level = log.UNUSUAL
+        log.msg("Error during get_buckets to server=%(server)s", server=str(s),
+                failure=f, level=level, umid="3uuBUQ")
+
+    # this block is responsible for turning known shares into usable shares,
+    # by fetching enough data to validate their contents.
+
+    # UEB (from any share)
+    # share hash chain, validated (from any share, for given shnum)
+    # block hash (any share, given shnum)
+
+    def _got_ueb(self, ueb_data, share):
+        if self._ueb is not None:
+            return
+        if hashutil.uri_extension_hash(ueb_data) != self._ueb_hash:
+            share.error("UEB hash does not match")
+            return
+        d = uri.unpack_extension(ueb_data)
+        self.share_size = mathutil.div_ceil(self._size, self._needed_shares)
+
+
+        # There are several kinds of things that can be found in a UEB.
+        # First, things that we really need to learn from the UEB in order to
+        # do this download. Next: things which are optional but not redundant
+        # -- if they are present in the UEB they will get used. Next, things
+        # that are optional and redundant. These things are required to be
+        # consistent: they don't have to be in the UEB, but if they are in
+        # the UEB then they will be checked for consistency with the
+        # already-known facts, and if they are inconsistent then an exception
+        # will be raised. These things aren't actually used -- they are just
+        # tested for consistency and ignored. Finally: things which are
+        # deprecated -- they ought not be in the UEB at all, and if they are
+        # present then a warning will be logged but they are otherwise
+        # ignored.
+
+        # First, things that we really need to learn from the UEB:
+        # segment_size, crypttext_root_hash, and share_root_hash.
+        self._segsize = d['segment_size']
+
+        self._blocksize = mathutil.div_ceil(self._segsize, self._needed_shares)
+        self._numsegs = mathutil.div_ceil(self._size, self._segsize)
+
+        self._tail_segsize = self._size % self._segsize
+        if self._tail_segsize == 0:
+            self._tail_segsize = self._segsize
+        # padding for erasure code
+        self._tail_segsize = mathutil.next_multiple(self._tail_segsize,
+                                                    self._needed_shares)
+
+        # Ciphertext hash tree root is mandatory, so that there is at most
+        # one ciphertext that matches this read-cap or verify-cap. The
+        # integrity check on the shares is not sufficient to prevent the
+        # original encoder from creating some shares of file A and other
+        # shares of file B.
+        self._ciphertext_hash_tree = IncompleteHashTree(self._numsegs)
+        self._ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
+
+        self._share_hash_tree.set_hashes({0: d['share_root_hash']})
+
+
+        # Next: things that are optional and not redundant: crypttext_hash
+        if 'crypttext_hash' in d:
+            if len(self._ciphertext_hash) == hashutil.CRYPTO_VAL_SIZE:
+                self._ciphertext_hash = d['crypttext_hash']
+            else:
+                log.msg("ignoring bad-length UEB[crypttext_hash], "
+                        "got %d bytes, want %d" % (len(d['crypttext_hash']),
+                                                   hashutil.CRYPTO_VAL_SIZE),
+                        umid="oZkGLA", level=log.WEIRD)
+
+        # we ignore all of the redundant fields when downloading. The
+        # Verifier uses a different code path which does not ignore them.
+
+        # finally, set self._ueb as a marker that we don't need to request it
+        # anymore
+        self._ueb = d
+
+    def _got_share_hashes(self, hashes, share):
+        assert isinstance(hashes, dict)
+        try:
+            self._share_hash_tree.set_hashes(hashes)
+        except (IndexError, BadHashError, NotEnoughHashesError), le:
+            share.error("Bad or missing hashes")
+            return
+
+    #def _got_block_hashes(
+
+    def _init_validate_enough_shares(self):
+        # _valid_shares maps shnum to ValidatedShare instances, and is
+        # populated once the block hash root has been fetched and validated
+        # (which requires any valid copy of the UEB, and a valid copy of the
+        # share hash chain for each shnum)
+        self._valid_shares = {}
+
+        # _target_shares is an ordered list of ReadyShare instances, each of
+        # which is a (shnum, server) tuple. It is sorted in order of
+        # preference: we expect to get the fastest response from the
+        # ReadyShares at the front of the list. It is also sorted to
+        # distribute the shnums, so that fetching shares from
+        # _target_shares[:k] is likely (but not guaranteed) to give us k
+        # distinct shares. The rule is that we skip over entries for blocks
+        # that we've already received, limit the number of recent queries for
+        # the same block, 
+        self._target_shares = []
+
+    def _validate_enough_shares(self):
+        # my goal is to have at least 2*k distinct validated shares from at
+        # least 2*k distinct servers
+        valid_share_servers = set()
+        for vs in self._valid_shares.values():
+            valid_share_servers.update(vs.get_servers())
+        if (len(self._valid_shares) >= 2*self._needed_shares
+            and len(self._valid_share_servers) >= 2*self._needed_shares):
+            return
+        #for 
+
+    def _reschedule(self, _ign):
+        # fire the loop again
+        if not self._scheduled:
+            self._scheduled = True
+            eventually(self._loop)
+
+    def _loop(self):
+        self._scheduled = False
+        # what do we need?
+
+        self._find_enough_shares()
+        self._validate_enough_shares()
+
+        if not self._ueb:
+            # we always need a copy of the UEB
+            pass
+
+    def _error(self, f):
+        # this is an unexpected error: a coding bug
+        log.err(f, level=log.UNUSUAL)
+            
+
+
+# using a single packed string (and an offset table) may be an artifact of
+# our native storage server: other backends might allow cheap multi-part
+# files (think S3, several buckets per share, one for each section).
+
+# find new names for:
+#  data_holder
+#  Share / Share2  (ShareInstance / Share? but the first is more useful)
+
+class IShare(Interface):
+    """I represent a single instance of a single share (e.g. I reference the
+    shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
+    This interface is used by SegmentFetcher to retrieve validated blocks.
+    """
+    def get_block(segnum):
+        """Return an Observer2, which will be notified with the following
+        events:
+         state=COMPLETE, block=data (terminal): validated block data
+         state=OVERDUE (non-terminal): we have reason to believe that the
+                                       request might have stalled, or we
+                                       might just be impatient
+         state=CORRUPT (terminal): the data we received was corrupt
+         state=DEAD (terminal): the connection has failed
+        """
+
+
+# it'd be nice if we receive the hashes before the block, or just
+# afterwards, so we aren't stuck holding on to unvalidated blocks
+# that we can't process. If we guess the offsets right, we can
+# accomplish this by sending the block request after the metadata
+# requests (by keeping two separate requestlists), and have a one RTT
+# pipeline like:
+#  1a=metadata, 1b=block
+#  1b->process+deliver : one RTT
+
+# But if we guess wrong, and fetch the wrong part of the block, we'll
+# have a pipeline that looks like:
+#  1a=wrong metadata, 1b=wrong block
+#  1a->2a=right metadata,2b=right block
+#  2b->process+deliver
+# which means two RTT and buffering one block (which, since we'll
+# guess the segsize wrong for everything, means buffering one
+# segment)
+
+# if we start asking for multiple segments, we could get something
+# worse:
+#  1a=wrong metadata, 1b=wrong block0, 1c=wrong block1, ..
+#  1a->2a=right metadata,2b=right block0,2c=right block1, .
+#  2b->process+deliver
+
+# which means two RTT but fetching and buffering the whole file
+# before delivering anything. However, since we don't know when the
+# other shares are going to arrive, we need to avoid having more than
+# one block in the pipeline anyways. So we shouldn't be able to get
+# into this state.
+
+# it also means that, instead of handling all of
+# self._requested_blocks at once, we should only be handling one
+# block at a time: one of the requested block should be special
+# (probably FIFO). But retire all we can.
+
+    # this might be better with a Deferred, using COMPLETE as the success
+    # case and CORRUPT/DEAD in an errback, because that would let us hold the
+    # 'share' and 'shnum' arguments locally (instead of roundtripping them
+    # through Share.send_request). But that OVERDUE is not terminal. So I
+    # want a new sort of callback mechanism, with the extra-argument-passing
+    # aspects of Deferred, but without being so one-shot. Is this a job for
+    # Observer? No, it doesn't take extra arguments. So this uses Observer2.
+
+
+class Reader:
+    """I am responsible for a single offset+size read of the file. I handle
+    segmentation: I figure out which segments are necessary, request them
+    (from my CiphertextDownloader) in order, and trim the segments down to
+    match the offset+size span. I use the Producer/Consumer interface to only
+    request one segment at a time.
+    """
+    implements(IPushProducer)
+    def __init__(self, consumer, offset, size):
+        self._needed = []
+        self._consumer = consumer
+        self._hungry = False
+        self._offset = offset
+        self._size = size
+        self._segsize = None
+    def start(self):
+        self._alive = True
+        self._deferred = defer.Deferred()
+        # the process doesn't actually start until set_segment_size()
+        return self._deferred
+
+    def set_segment_size(self, segsize):
+        if self._segsize is not None:
+            return
+        self._segsize = segsize
+        self._compute_segnums()
+
+    def _compute_segnums(self, segsize):
+        # now that we know the file's segsize, what segments (and which
+        # ranges of each) will we need?
+        size = self._size
+        offset = self._offset
+        while size:
+            assert size >= 0
+            this_seg_num = int(offset / self._segsize)
+            this_seg_offset = offset - (seg_num*self._segsize)
+            this_seg_size = min(size, self._segsize-seg_offset)
+            size -= this_seg_size
+            if size:
+                offset += this_seg_size
+            yield (this_seg_num, this_seg_offset, this_seg_size)
+
+    def get_needed_segments(self):
+        return set([segnum for (segnum, off, size) in self._needed])
+
+
+    def stopProducing(self):
+        self._hungry = False
+        self._alive = False
+        # TODO: cancel the segment requests
+    def pauseProducing(self):
+        self._hungry = False
+    def resumeProducing(self):
+        self._hungry = True
+    def add_segment(self, segnum, offset, size):
+        self._needed.append( (segnum, offset, size) )
+    def got_segment(self, segnum, segdata):
+        """Return True if this schedule has more to go, or False if it is
+        done."""
+        assert self._needed[0][segnum] == segnum
+        (_ign, offset, size) = self._needed.pop(0)
+        data = segdata[offset:offset+size]
+        self._consumer.write(data)
+        if not self._needed:
+            # we're done
+            self._alive = False
+            self._hungry = False
+            self._consumer.unregisterProducer()
+            self._deferred.callback(self._consumer)
+    def error(self, f):
+        self._alive = False
+        self._hungry = False
+        self._consumer.unregisterProducer()
+        self._deferred.errback(f)
+
+
+
+class x:
+    def OFFread(self, consumer, offset=0, size=None):
+        """I am the main entry point, from which FileNode.read() can get
+        data."""
+        # tolerate concurrent operations: each gets its own Reader
+        if size is None:
+            size = self._size - offset
+        r = Reader(consumer, offset, size)
+        self._readers.add(r)
+        d = r.start()
+        if self.segment_size is not None:
+            r.set_segment_size(self.segment_size)
+            # TODO: if we can't find any segments, and thus never get a
+            # segsize, tell the Readers to give up
+        return d
diff --git a/src/allmydata/immutable/download2_util.py b/src/allmydata/immutable/download2_util.py
new file mode 100755
index 0000000..48f2f0a
--- /dev/null
+++ b/src/allmydata/immutable/download2_util.py
@@ -0,0 +1,73 @@
+
+import weakref
+
+class Observer2:
+    """A simple class to distribute multiple events to a single subscriber.
+    It accepts arbitrary kwargs, but no posargs."""
+    def __init__(self):
+        self._watcher = None
+        self._undelivered_results = []
+        self._canceler = None
+
+    def set_canceler(self, f):
+        # we use a weakref to avoid creating a cycle between us and the thing
+        # we're observing: they'll be holding a reference to us to compare
+        # against the value we pass to their canceler function.
+        self._canceler = weakref(f)
+
+    def subscribe(self, observer, **watcher_kwargs):
+        self._watcher = (observer, watcher_kwargs)
+        while self._undelivered_results:
+            self._notify(self._undelivered_results.pop(0))
+
+    def notify(self, **result_kwargs):
+        if self._watcher:
+            self._notify(result_kwargs)
+        else:
+            self._undelivered_results.append(result_kwargs)
+
+    def _notify(self, result_kwargs):
+        o, watcher_kwargs = self._watcher
+        kwargs = dict(result_kwargs)
+        kwargs.update(watcher_kwargs)
+        eventually(o, **kwargs)
+
+    def cancel(self):
+        f = self._canceler()
+        if f:
+            f(self)
+
+class DictOfSets:
+    def add(self, key, value): pass
+    def values(self): # return set that merges all value sets
+        r = set()
+        for key in self:
+            r.update(self[key])
+        return r
+
+
+def incidentally(res, f, *args, **kwargs):
+    """Add me to a Deferred chain like this:
+     d.addBoth(incidentally, func, arg)
+    and I'll behave as if you'd added the following function:
+     def _(res):
+         func(arg)
+         return res
+    This is useful if you want to execute an expression when the Deferred
+    fires, but don't care about its value.
+    """
+    f(*args, **kwargs)
+    return res
+
+
+import weakref
+class Terminator(service.Service):
+    def __init__(self):
+        service.Service.__init__(self)
+        self._clients = weakref.WeakKeyDictionary()
+    def register(self, c):
+        self._clients[c] = None
+    def stopService(self):
+        for c in self._clients:
+            c.stop()
+        return service.Service.stopService(self)
diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py
index 6874655..b7537d7 100644
--- a/src/allmydata/test/test_util.py
+++ b/src/allmydata/test/test_util.py
@@ -7,12 +7,14 @@ from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
 from twisted.python import log
+from hashlib import md5
 
 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
 from allmydata.util import limiter, time_format, pollmixin, cachedir
 from allmydata.util import statistics, dictutil, pipeline
 from allmydata.util import log as tahoe_log
+from allmydata.util.spans import Spans, overlap, DataSpans
 
 class Base32(unittest.TestCase):
     def test_b2a_matches_Pythons(self):
@@ -1511,3 +1513,528 @@ class Log(unittest.TestCase):
         tahoe_log.err(format="intentional sample error",
                       failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
         self.flushLoggedErrors(SampleError)
+
+
+class SimpleSpans:
+    # this is a simple+inefficient form of util.spans.Spans . We compare the
+    # behavior of this reference model against the real (efficient) form.
+
+    def __init__(self, _span_or_start=None, length=None):
+        self._have = set()
+        if length is not None:
+            for i in range(_span_or_start, _span_or_start+length):
+                self._have.add(i)
+        elif _span_or_start:
+            for (start,length) in _span_or_start:
+                self.add(start, length)
+
+    def add(self, start, length):
+        for i in range(start, start+length):
+            self._have.add(i)
+        return self
+
+    def remove(self, start, length):
+        for i in range(start, start+length):
+            self._have.discard(i)
+        return self
+
+    def each(self):
+        return sorted(self._have)
+
+    def __iter__(self):
+        items = sorted(self._have)
+        prevstart = None
+        prevend = None
+        for i in items:
+            if prevstart is None:
+                prevstart = prevend = i
+                continue
+            if i == prevend+1:
+                prevend = i
+                continue
+            yield (prevstart, prevend-prevstart+1)
+            prevstart = prevend = i
+        if prevstart is not None:
+            yield (prevstart, prevend-prevstart+1)
+
+    def __len__(self):
+        # this also gets us bool(s)
+        return len(self._have)
+
+    def __add__(self, other):
+        s = self.__class__(self)
+        for (start, length) in other:
+            s.add(start, length)
+        return s
+
+    def __sub__(self, other):
+        s = self.__class__(self)
+        for (start, length) in other:
+            s.remove(start, length)
+        return s
+
+    def __iadd__(self, other):
+        for (start, length) in other:
+            self.add(start, length)
+        return self
+
+    def __isub__(self, other):
+        for (start, length) in other:
+            self.remove(start, length)
+        return self
+
+    def __contains__(self, (start,length)):
+        for i in range(start, start+length):
+            if i not in self._have:
+                return False
+        return True
+
+class ByteSpans(unittest.TestCase):
+    def test_basic(self):
+        s = Spans()
+        self.failUnlessEqual(list(s), [])
+        self.failIf(s)
+        self.failIf((0,1) in s)
+        self.failUnlessEqual(len(s), 0)
+
+        s1 = Spans(3, 4) # 3,4,5,6
+        self._check1(s1)
+
+        s2 = Spans(s1)
+        self._check1(s2)
+
+        s2.add(10,2) # 10,11
+        self._check1(s1)
+        self.failUnless((10,1) in s2)
+        self.failIf((10,1) in s1)
+        self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
+        self.failUnlessEqual(len(s2), 6)
+
+        s2.add(15,2).add(20,2)
+        self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
+        self.failUnlessEqual(len(s2), 10)
+
+        s2.remove(4,3).remove(15,1)
+        self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
+        self.failUnlessEqual(len(s2), 6)
+
+    def _check1(self, s):
+        self.failUnlessEqual(list(s), [(3,4)])
+        self.failUnless(s)
+        self.failUnlessEqual(len(s), 4)
+        self.failIf((0,1) in s)
+        self.failUnless((3,4) in s)
+        self.failUnless((3,1) in s)
+        self.failUnless((5,2) in s)
+        self.failUnless((6,1) in s)
+        self.failIf((6,2) in s)
+        self.failIf((7,1) in s)
+        self.failUnlessEqual(list(s.each()), [3,4,5,6])
+
+    def test_math(self):
+        s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
+        s2 = Spans(5, 3) # 5,6,7
+        s3 = Spans(8, 4) # 8,9,10,11
+
+        s = s1 - s2
+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
+        s = s1 - s3
+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
+        s = s2 - s3
+        self.failUnlessEqual(list(s.each()), [5,6,7])
+
+        s = s1 + s2
+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
+        s = s1 + s3
+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
+        s = s2 + s3
+        self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
+
+        s = Spans(s1)
+        s -= s2
+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
+        s = Spans(s1)
+        s -= s3
+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
+        s = Spans(s2)
+        s -= s3
+        self.failUnlessEqual(list(s.each()), [5,6,7])
+
+        s = Spans(s1)
+        s += s2
+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
+        s = Spans(s1)
+        s += s3
+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
+        s = Spans(s2)
+        s += s3
+        self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
+
+    def test_random(self):
+        # attempt to increase coverage of corner cases by comparing behavior
+        # of a simple-but-slow model implementation against the
+        # complex-but-fast actual implementation, in a large number of random
+        # operations
+        S1 = SimpleSpans
+        S2 = Spans
+        s1 = S1(); s2 = S2()
+        seed = ""
+        def _create(subseed):
+            ns1 = S1(); ns2 = S2()
+            for i in range(10):
+                what = md5(subseed+str(i)).hexdigest()
+                start = int(what[2:4], 16)
+                length = max(1,int(what[5:6], 16))
+                ns1.add(start, length); ns2.add(start, length)
+            return ns1, ns2
+
+        #print
+        for i in range(1000):
+            what = md5(seed+str(i)).hexdigest()
+            op = what[0]
+            subop = what[1]
+            start = int(what[2:4], 16)
+            length = max(1,int(what[5:6], 16))
+            #print what
+            if op in "0":
+                if subop in "01234":
+                    s1 = S1(); s2 = S2()
+                elif subop in "5678":
+                    s1 = S1(start, length); s2 = S2(start, length)
+                else:
+                    s1 = S1(s1); s2 = S2(s2)
+                #print "s2 = %s" % s2.dump()
+            elif op in "123":
+                #print "s2.add(%d,%d)" % (start, length)
+                s1.add(start, length); s2.add(start, length)
+            elif op in "456":
+                #print "s2.remove(%d,%d)" % (start, length)
+                s1.remove(start, length); s2.remove(start, length)
+            elif op in "78":
+                ns1, ns2 = _create(what[7:11])
+                #print "s2 + %s" % ns2.dump()
+                s1 = s1 + ns1; s2 = s2 + ns2
+            elif op in "9a":
+                ns1, ns2 = _create(what[7:11])
+                #print "%s - %s" % (s2.dump(), ns2.dump())
+                s1 = s1 - ns1; s2 = s2 - ns2
+            elif op in "bc":
+                ns1, ns2 = _create(what[7:11])
+                #print "s2 += %s" % ns2.dump()
+                s1 += ns1; s2 += ns2
+            else:
+                ns1, ns2 = _create(what[7:11])
+                #print "%s -= %s" % (s2.dump(), ns2.dump())
+                s1 -= ns1; s2 -= ns2
+            #print "s2 now %s" % s2.dump()
+            self.failUnlessEqual(list(s1.each()), list(s2.each()))
+            self.failUnlessEqual(len(s1), len(s2))
+            self.failUnlessEqual(bool(s1), bool(s2))
+            self.failUnlessEqual(list(s1), list(s2))
+            for j in range(10):
+                what = md5(what[12:14]+str(j)).hexdigest()
+                start = int(what[2:4], 16)
+                length = max(1, int(what[5:6], 16))
+                span = (start, length)
+                self.failUnlessEqual(bool(span in s1), bool(span in s2))
+
+
+    # s()
+    # s(start,length)
+    # s(s0)
+    # s.add(start,length) : returns s
+    # s.remove(start,length)
+    # s.each() -> list of byte offsets, mostly for testing
+    # list(s) -> list of (start,length) tuples, one per span
+    # (start,length) in s -> True if (start..start+length-1) are all members
+    #  NOT equivalent to x in list(s)
+    # len(s) -> number of bytes, for testing, bool(), and accounting/limiting
+    # bool(s)  (__len__)
+    # s = s1+s2, s1-s2, +=s1, -=s1
+
+    def test_overlap(self):
+        for a in range(20):
+            for b in range(10):
+                for c in range(20):
+                    for d in range(10):
+                        self._test_overlap(a,b,c,d)
+
+    def _test_overlap(self, a, b, c, d):
+        s1 = set(range(a,a+b))
+        s2 = set(range(c,c+d))
+        #print "---"
+        #self._show_overlap(s1, "1")
+        #self._show_overlap(s2, "2")
+        o = overlap(a,b,c,d)
+        expected = s1.intersection(s2)
+        if not expected:
+            self.failUnlessEqual(o, None)
+        else:
+            start,length = o
+            so = set(range(start,start+length))
+            #self._show(so, "o")
+            self.failUnlessEqual(so, expected)
+
+    def _show_overlap(self, s, c):
+        import sys
+        out = sys.stdout
+        if s:
+            for i in range(max(s)):
+                if i in s:
+                    out.write(c)
+                else:
+                    out.write(" ")
+        out.write("\n")
+
+def extend(s, start, length, fill):
+    if len(s) >= start+length:
+        return s
+    assert len(fill) == 1
+    return s + fill*(start+length-len(s))
+
+def replace(s, start, data):
+    assert len(s) >= start+len(data)
+    return s[:start] + data + s[start+len(data):]
+
+class SimpleDataSpans:
+    def __init__(self, other=None):
+        self.missing = "" # "1" where missing, "0" where found
+        self.data = ""
+        if other:
+            for (start, data) in other.get_spans():
+                self.add(start, data)
+
+    def __len__(self):
+        return len(self.missing.translate(None, "1"))
+    def _dump(self):
+        return [i for (i,c) in enumerate(self.missing) if c == "0"]
+    def _have(self, start, length):
+        m = self.missing[start:start+length]
+        if not m or len(m)<length or int(m):
+            return False
+        return True
+    def get_spans(self):
+        for i in self._dump():
+            yield (i, self.data[i])
+    def get(self, start, length):
+        if self._have(start, length):
+            return self.data[start:start+length]
+        return None
+    def pop(self, start, length):
+        data = self.get(start, length)
+        if data:
+            self.remove(start, length)
+        return data
+    def remove(self, start, length):
+        self.missing = replace(extend(self.missing, start, length, "1"),
+                               start, "1"*length)
+    def add(self, start, data):
+        self.missing = replace(extend(self.missing, start, len(data), "1"),
+                               start, "0"*len(data))
+        self.data = replace(extend(self.data, start, len(data), " "),
+                            start, data)
+
+
+class StringSpans(unittest.TestCase):
+    def do_basic(self, klass):
+        ds = klass()
+        self.failUnlessEqual(len(ds), 0)
+        self.failUnlessEqual(list(ds._dump()), [])
+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 0)
+        self.failUnlessEqual(ds.get(0, 4), None)
+        self.failUnlessEqual(ds.pop(0, 4), None)
+        ds.remove(0, 4)
+
+        ds.add(2, "four")
+        self.failUnlessEqual(len(ds), 4)
+        self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4)
+        self.failUnlessEqual(ds.get(0, 4), None)
+        self.failUnlessEqual(ds.pop(0, 4), None)
+        self.failUnlessEqual(ds.get(4, 4), None)
+
+        ds2 = klass(ds)
+        self.failUnlessEqual(len(ds2), 4)
+        self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
+        self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 4)
+        self.failUnlessEqual(ds2.get(0, 4), None)
+        self.failUnlessEqual(ds2.pop(0, 4), None)
+        self.failUnlessEqual(ds2.pop(2, 3), "fou")
+        self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 1)
+        self.failUnlessEqual(ds2.get(2, 3), None)
+        self.failUnlessEqual(ds2.get(5, 1), "r")
+        self.failUnlessEqual(ds.get(2, 3), "fou")
+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4)
+
+        ds.add(0, "23")
+        self.failUnlessEqual(len(ds), 6)
+        self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 6)
+        self.failUnlessEqual(ds.get(0, 4), "23fo")
+        self.failUnlessEqual(ds.pop(0, 4), "23fo")
+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 2)
+        self.failUnlessEqual(ds.get(0, 4), None)
+        self.failUnlessEqual(ds.pop(0, 4), None)
+
+        ds = klass()
+        ds.add(2, "four")
+        ds.add(3, "ea")
+        self.failUnlessEqual(ds.get(2, 4), "fear")
+
+    def do_scan(self, klass):
+        # do a test with gaps and spans of size 1 and 2
+        #  left=(1,11) * right=(1,11) * gapsize=(1,2)
+        # 111, 112, 121, 122, 211, 212, 221, 222
+        #    211
+        #      121
+        #         112
+        #            212
+        #               222
+        #                   221
+        #                      111
+        #                        122
+        #  11 1  1 11 11  11  1 1  111
+        # 0123456789012345678901234567
+        # abcdefghijklmnopqrstuvwxyz-=
+        pieces = [(1, "bc"),
+                  (4, "e"),
+                  (7, "h"),
+                  (9, "jk"),
+                  (12, "mn"),
+                  (16, "qr"),
+                  (20, "u"),
+                  (22, "w"),
+                  (25, "z-="),
+                  ]
+        p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
+        S = "abcdefghijklmnopqrstuvwxyz-="
+        # TODO: when adding data, add capital letters, to make sure we aren't
+        # just leaving the old data in place
+        l = len(S)
+        def base():
+            ds = klass()
+            for start, data in pieces:
+                ds.add(start, data)
+            return ds
+        def dump(s):
+            p = set(s._dump())
+            # wow, this is the first time I've ever wanted ?: in python
+            # note: this requires python2.5
+            d = "".join([(S[i] if i in p else " ") for i in range(l)])
+            assert len(d) == l
+            return d
+        DEBUG = False
+        for start in range(0, l):
+            for end in range(start+1, l):
+                # add [start-end) to the baseline
+                which = "%d-%d" % (start, end-1)
+                p_added = set(range(start, end))
+                b = base()
+                if DEBUG:
+                    print
+                    print dump(b), which
+                    add = klass(); add.add(start, S[start:end])
+                    print dump(add)
+                b.add(start, S[start:end])
+                if DEBUG:
+                    print dump(b)
+                # check that the new span is there
+                d = b.get(start, end-start)
+                self.failUnlessEqual(d, S[start:end], which)
+                # check that all the original pieces are still there
+                for t_start, t_data in pieces:
+                    t_len = len(t_data)
+                    self.failUnlessEqual(b.get(t_start, t_len),
+                                         S[t_start:t_start+t_len],
+                                         "%s %d+%d" % (which, t_start, t_len))
+                # check that a lot of subspans are mostly correct
+                for t_start in range(l):
+                    for t_len in range(1,4):
+                        d = b.get(t_start, t_len)
+                        if d is not None:
+                            which2 = "%s+(%d-%d)" % (which, t_start,
+                                                     t_start+t_len-1)
+                            self.failUnlessEqual(d, S[t_start:t_start+t_len],
+                                                 which2)
+                        # check that removing a subspan gives the right value
+                        b2 = klass(b)
+                        b2.remove(t_start, t_len)
+                        removed = set(range(t_start, t_start+t_len))
+                        for i in range(l):
+                            exp = (((i in p_elements) or (i in p_added))
+                                   and (i not in removed))
+                            which2 = "%s-(%d-%d)" % (which, t_start,
+                                                     t_start+t_len-1)
+                            self.failUnlessEqual(bool(b2.get(i, 1)), exp,
+                                                 which2+" %d" % i)
+
+    def test_test(self):
+        self.do_basic(SimpleDataSpans)
+        self.do_scan(SimpleDataSpans)
+
+    def test_basic(self):
+        self.do_basic(DataSpans)
+        self.do_scan(DataSpans)
+
+    def test_random(self):
+        # attempt to increase coverage of corner cases by comparing behavior
+        # of a simple-but-slow model implementation against the
+        # complex-but-fast actual implementation, in a large number of random
+        # operations
+        S1 = SimpleDataSpans
+        S2 = DataSpans
+        s1 = S1(); s2 = S2()
+        seed = ""
+        def _randstr(length, seed):
+            created = 0
+            pieces = []
+            while created < length:
+                piece = md5(seed + str(created)).hexdigest()
+                pieces.append(piece)
+                created += len(piece)
+            return "".join(pieces)[:length]
+        def _create(subseed):
+            ns1 = S1(); ns2 = S2()
+            for i in range(10):
+                what = md5(subseed+str(i)).hexdigest()
+                start = int(what[2:4], 16)
+                length = max(1,int(what[5:6], 16))
+                ns1.add(start, _randstr(length, what[7:9]));
+                ns2.add(start, _randstr(length, what[7:9]))
+            return ns1, ns2
+
+        #print
+        for i in range(1000):
+            what = md5(seed+str(i)).hexdigest()
+            op = what[0]
+            subop = what[1]
+            start = int(what[2:4], 16)
+            length = max(1,int(what[5:6], 16))
+            #print what
+            if op in "0":
+                if subop in "0123456":
+                    s1 = S1(); s2 = S2()
+                else:
+                    s1, s2 = _create(what[7:11])
+                #print "s2 = %s" % list(s2._dump())
+            elif op in "123456":
+                #print "s2.add(%d,%d)" % (start, length)
+                s1.add(start, _randstr(length, what[7:9]));
+                s2.add(start, _randstr(length, what[7:9]))
+            elif op in "789abc":
+                #print "s2.remove(%d,%d)" % (start, length)
+                s1.remove(start, length); s2.remove(start, length)
+            else:
+                #print "s2.pop(%d,%d)" % (start, length)
+                d1 = s1.pop(start, length); d2 = s2.pop(start, length)
+                self.failUnlessEqual(d1, d2)
+            #print "s1 now %s" % list(s1._dump())
+            #print "s2 now %s" % list(s2._dump())
+            self.failUnlessEqual(len(s1), len(s2))
+            self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
+            for j in range(100):
+                what = md5(what[12:14]+str(j)).hexdigest()
+                start = int(what[2:4], 16)
+                length = max(1, int(what[5:6], 16))
+                d1 = s1.get(start, length); d2 = s2.get(start, length)
+                self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))
diff --git a/src/allmydata/util/spans.py b/src/allmydata/util/spans.py
new file mode 100755
index 0000000..336fddf
--- /dev/null
+++ b/src/allmydata/util/spans.py
@@ -0,0 +1,414 @@
+
+class Spans:
+    """I represent a compressed list of booleans, one per index (an integer).
+    Typically, each index represents an offset into a large string, pointing
+    to a specific byte of a share. In this context, True means that byte has
+    been received, or has been requested.
+
+    Another way to look at this is maintaining a set of integers, optimized
+    for operations on spans like 'add range to set' and 'is range in set?'.
+
+    This is a python equivalent of perl's Set::IntSpan module, frequently
+    used to represent .newsrc contents.
+
+    Rather than storing an actual (large) list or dictionary, I represent my
+    internal state as a sorted list of spans, each with a start and a length.
+    My API is presented in terms of start+length pairs. I provide set
+    arithmetic operators, to efficiently answer questions like 'I want bytes
+    XYZ, I already requested bytes ABC, and I've already received bytes DEF:
+    what bytes should I request now?'.
+
+    The new downloader will use it to keep track of which bytes we've requested
+    or received already.
+    """
+
+    def __init__(self, _span_or_start=None, length=None):
+        self._spans = list()
+        if length is not None:
+            self._spans.append( (_span_or_start, length) )
+        elif _span_or_start:
+            for (start,length) in _span_or_start:
+                self.add(start, length)
+        self._check()
+
+    def _check(self):
+        assert sorted(self._spans) == self._spans
+        prev_end = None
+        try:
+            for (start,length) in self._spans:
+                if prev_end is not None:
+                    assert start > prev_end
+                prev_end = start+length
+        except AssertionError:
+            print "BAD:", self.dump()
+            raise
+
+    def add(self, start, length):
+        assert start >= 0
+        assert length > 0
+        #print " ADD [%d+%d -%d) to %s" % (start, length, start+length, self.dump())
+        first_overlap = last_overlap = None
+        for i,(s_start,s_length) in enumerate(self._spans):
+            #print "  (%d+%d)-> overlap=%s adjacent=%s" % (s_start,s_length, overlap(s_start, s_length, start, length), adjacent(s_start, s_length, start, length))
+            if (overlap(s_start, s_length, start, length)
+                or adjacent(s_start, s_length, start, length)):
+                last_overlap = i
+                if first_overlap is None:
+                    first_overlap = i
+                continue
+            # no overlap
+            if first_overlap is not None:
+                break
+        #print "  first_overlap", first_overlap, last_overlap
+        if first_overlap is None:
+            # no overlap, so just insert the span and sort by starting
+            # position.
+            self._spans.insert(0, (start,length))
+            self._spans.sort()
+        else:
+            # everything from [first_overlap] to [last_overlap] overlapped
+            first_start,first_length = self._spans[first_overlap]
+            last_start,last_length = self._spans[last_overlap]
+            newspan_start = min(start, first_start)
+            newspan_end = max(start+length, last_start+last_length)
+            newspan_length = newspan_end - newspan_start
+            newspan = (newspan_start, newspan_length)
+            self._spans[first_overlap:last_overlap+1] = [newspan]
+        #print "  ADD done: %s" % self.dump()
+        self._check()
+
+        return self
+
+    def remove(self, start, length):
+        assert start >= 0
+        assert length > 0
+        #print " REMOVE [%d+%d -%d) from %s" % (start, length, start+length, self.dump())
+        first_complete_overlap = last_complete_overlap = None
+        for i,(s_start,s_length) in enumerate(self._spans):
+            s_end = s_start + s_length
+            o = overlap(s_start, s_length, start, length)
+            if o:
+                o_start, o_length = o
+                o_end = o_start+o_length
+                if o_start == s_start and o_end == s_end:
+                    # delete this span altogether
+                    if first_complete_overlap is None:
+                        first_complete_overlap = i
+                    last_complete_overlap = i
+                elif o_start == s_start:
+                    # we only overlap the left side, so trim the start
+                    #    1111
+                    #  rrrr
+                    #    oo
+                    # ->   11
+                    new_start = o_end
+                    new_end = s_end
+                    assert new_start > s_start
+                    new_length = new_end - new_start
+                    self._spans[i] = (new_start, new_length)
+                elif o_end == s_end:
+                    # we only overlap the right side
+                    #    1111
+                    #      rrrr
+                    #      oo
+                    # -> 11
+                    new_start = s_start
+                    new_end = o_start
+                    assert new_end < s_end
+                    new_length = new_end - new_start
+                    self._spans[i] = (new_start, new_length)
+                else:
+                    # we overlap the middle, so create a new span. No need to
+                    # examine any other spans.
+                    #    111111
+                    #      rr
+                    #    LL  RR
+                    left_start = s_start
+                    left_end = o_start
+                    left_length = left_end - left_start
+                    right_start = o_end
+                    right_end = s_end
+                    right_length = right_end - right_start
+                    self._spans[i] = (left_start, left_length)
+                    self._spans.append( (right_start, right_length) )
+                    self._spans.sort()
+                    break
+        if first_complete_overlap is not None:
+            del self._spans[first_complete_overlap:last_complete_overlap+1]
+        #print "  REMOVE done: %s" % self.dump()
+        self._check()
+        return self
+
+    def dump(self):
+        return "len=%d: %s" % (len(self),
+                               ",".join(["[%d-%d]" % (start,start+l-1)
+                                         for (start,l) in self._spans]) )
+
+    def each(self):
+        for start, length in self._spans:
+            for i in range(start, start+length):
+                yield i
+
+    def __iter__(self):
+        for s in self._spans:
+            yield s
+
+    def __len__(self):
+        # this also gets us bool(s)
+        return sum([length for start,length in self._spans])
+
+    def __add__(self, other):
+        s = self.__class__(self)
+        for (start, length) in other:
+            s.add(start, length)
+        return s
+
+    def __sub__(self, other):
+        s = self.__class__(self)
+        for (start, length) in other:
+            s.remove(start, length)
+        return s
+
+    def __iadd__(self, other):
+        for (start, length) in other:
+            self.add(start, length)
+        return self
+
+    def __isub__(self, other):
+        for (start, length) in other:
+            self.remove(start, length)
+        return self
+
+    def __contains__(self, (start,length)):
+        for span_start,span_length in self._spans:
+            o = overlap(start, length, span_start, span_length)
+            if o:
+                o_start,o_length = o
+                if o_start == start and o_length == length:
+                    return True
+        return False
+
+def overlap(start0, length0, start1, length1):
+    # return start2,length2 of the overlapping region, or None
+    #  00      00   000   0000  00  00 000  00   00  00      00
+    #     11    11   11    11   111 11 11  1111 111 11    11
+    left = max(start0, start1)
+    right = min(start0+length0, start1+length1)
+    # if there is overlap, 'left' will be its start, and right-1 will
+    # be the end'
+    if left < right:
+        return (left, right-left)
+    return None
+
+def adjacent(start0, length0, start1, length1):
+    if (start0 < start1) and start0+length0 == start1:
+        return True
+    elif (start1 < start0) and start1+length1 == start0:
+        return True
+    return False
+
+class DataSpans:
+    """I represent portions of a large string. Equivalently, I can be said to
+    maintain a large array of characters (with gaps of empty elements). I can
+    be used to manage access to a remote share, where some pieces have been
+    retrieved, some have been requested, and others have not been read.
+    """
+
+    def __init__(self, other=None):
+        self.spans = [] # (start, data) tuples, non-overlapping, merged
+        if other:
+            for (start, data) in other.get_spans():
+                self.add(start, data)
+
+    def __len__(self):
+        # return number of bytes we're holding
+        return sum([len(data) for (start,data) in self.spans])
+
+    def _dump(self):
+        # return iterator of sorted list of offsets, one per byte
+        for (start,data) in self.spans:
+            for i in range(start, start+len(data)):
+                yield i
+
+    def get_spans(self):
+        return list(self.spans)
+
+    def assert_invariants(self):
+        if not self.spans:
+            return
+        prev_start = self.spans[0][0]
+        prev_end = prev_start + len(self.spans[0][1])
+        for start, data in self.spans[1:]:
+            if not start > prev_end:
+                # adjacent or overlapping: bad
+                print "ASSERTION FAILED", self.spans
+                raise AssertionError
+
+    def get(self, start, length):
+        # returns a string of LENGTH, or None
+        #print "get", start, length, self.spans
+        end = start+length
+        for (s_start,s_data) in self.spans:
+            s_end = s_start+len(s_data)
+            #print " ",s_start,s_end
+            if s_start <= start < s_end:
+                # we want some data from this span. Because we maintain
+                # strictly merged and non-overlapping spans, everything we
+                # want must be in this span.
+                offset = start - s_start
+                if offset + length > len(s_data):
+                    #print " None, span falls short"
+                    return None # span falls short
+                #print " some", s_data[offset:offset+length]
+                return s_data[offset:offset+length]
+            if s_start >= end:
+                # we've gone too far: no further spans will overlap
+                #print " None, gone too far"
+                return None
+        #print " None, ran out of spans"
+        return None
+
+    def add(self, start, data):
+        # first: walk through existing spans, find overlap, modify-in-place
+        #  create list of new spans
+        #  add new spans
+        #  sort
+        #  merge adjacent spans
+        #print "add", start, data, self.spans
+        end = start + len(data)
+        i = 0
+        while len(data):
+            #print " loop", start, data, i, len(self.spans), self.spans
+            if i >= len(self.spans):
+                #print " append and done"
+                # append a last span
+                self.spans.append( (start, data) )
+                break
+            (s_start,s_data) = self.spans[i]
+            # five basic cases:
+            #  a: OLD  b:OLDD  c1:OLD  c2:OLD   d1:OLDD  d2:OLD  e: OLLDD
+            #    NEW     NEW      NEW     NEWW      NEW      NEW     NEW
+            #
+            # we handle A by inserting a new segment (with "N") and looping,
+            # turning it into B or C. We handle B by replacing a prefix and
+            # terminating. We handle C (both c1 and c2) by replacing the
+            # segment (and, for c2, looping, turning it into A). We handle D
+            # by replacing a suffix (and, for d2, looping, turning it into
+            # A). We handle E by replacing the middle and terminating.
+            if start < s_start:
+                # case A: insert a new span, then loop with the remainder
+                #print " insert new psan"
+                s_len = s_start-start
+                self.spans.insert(i, (start, data[:s_len]))
+                i += 1
+                start = s_start
+                data = data[s_len:]
+                continue
+            s_len = len(s_data)
+            s_end = s_start+s_len
+            if s_start <= start < s_end:
+                #print " modify this span", s_start, start, s_end
+                # we want to modify some data in this span: a prefix, a
+                # suffix, or the whole thing
+                if s_start == start:
+                    if s_end <= end:
+                        #print " replace whole segment"
+                        # case C: replace this segment
+                        self.spans[i] = (s_start, data[:s_len])
+                        i += 1
+                        start += s_len
+                        data = data[s_len:]
+                        # C2 is where len(data)>0
+                        continue
+                    # case B: modify the prefix, retain the suffix
+                    #print " modify prefix"
+                    self.spans[i] = (s_start, data + s_data[len(data):])
+                    break
+                if start > s_start and end < s_end:
+                    # case E: modify the middle
+                    #print " modify middle"
+                    prefix_len = start - s_start # we retain this much
+                    suffix_len = s_end - end # and retain this much
+                    newdata = s_data[:prefix_len] + data + s_data[-suffix_len:]
+                    self.spans[i] = (s_start, newdata)
+                    break
+                # case D: retain the prefix, modify the suffix
+                #print " modify suffix"
+                prefix_len = start - s_start # we retain this much
+                suffix_len = s_len - prefix_len # we replace this much
+                #print "  ", s_data, prefix_len, suffix_len, s_len, data
+                self.spans[i] = (s_start,
+                                 s_data[:prefix_len] + data[:suffix_len])
+                i += 1
+                start += suffix_len
+                data = data[suffix_len:]
+                #print "  now", start, data
+                # D2 is where len(data)>0
+                continue
+            # else we're not there yet
+            #print " still looking"
+            i += 1
+            continue
+        # now merge adjacent spans
+        #print " merging", self.spans
+        newspans = []
+        for (s_start,s_data) in self.spans:
+            if newspans and adjacent(newspans[-1][0], len(newspans[-1][1]),
+                                     s_start, len(s_data)):
+                newspans[-1] = (newspans[-1][0], newspans[-1][1] + s_data)
+            else:
+                newspans.append( (s_start, s_data) )
+        self.spans = newspans
+        self.assert_invariants()
+        #print " done", self.spans
+
+    def remove(self, start, length):
+        i = 0
+        end = start + length
+        #print "remove", start, length, self.spans
+        while i < len(self.spans):
+            (s_start,s_data) = self.spans[i]
+            if s_start >= end:
+                # this segment is entirely right of the removed region, and
+                # all further segments are even further right. We're done.
+                break
+            s_len = len(s_data)
+            s_end = s_start + s_len
+            o = overlap(start, length, s_start, s_len)
+            if not o:
+                i += 1
+                continue
+            o_start, o_len = o
+            o_end = o_start + o_len
+            if o_len == s_len:
+                # remove the whole segment
+                del self.spans[i]
+                continue
+            if o_start == s_start:
+                # remove a prefix, leaving the suffix from o_end to s_end
+                prefix_len = o_end - o_start
+                self.spans[i] = (o_end, s_data[prefix_len:])
+                i += 1
+                continue
+            elif o_end == s_end:
+                # remove a suffix, leaving the prefix from s_start to o_start
+                prefix_len = o_start - s_start
+                self.spans[i] = (s_start, s_data[:prefix_len])
+                i += 1
+                continue
+            # remove the middle, creating a new segment
+            # left is s_start:o_start, right is o_end:s_end
+            left_len = o_start - s_start
+            left = s_data[:left_len]
+            right_len = s_end - o_end
+            right = s_data[-right_len:]
+            self.spans[i] = (s_start, left)
+            self.spans.insert(i+1, (o_end, right))
+            break
+        #print " done", self.spans
+
+    def pop(self, start, length):
+        data = self.get(start, length)
+        if data:
+            self.remove(start, length)
+        return data