diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index 4816ccd..3309286 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -137,7 +137,7 @@ class ShareFinder: peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, umid="Io7pyg") time_sent = now() - d_ev = self._download_status.add_dyhb_sent(peerid, time_sent) + d_ev = self._download_status.add_dyhb_request(peerid, time_sent) # TODO: get the timer from a Server object, it knows best self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue, req) @@ -223,7 +223,7 @@ class ShareFinder: eventually(self.share_consumer.got_shares, shares) def _got_error(self, f, peerid, req, d_ev, lp): - d_ev.finished("error", now()) + d_ev.error(now()) self.log(format="got error from [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), failure=f, level=log.UNUSUAL, parent=lp, umid="zUKdCw") diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index 04482e6..f0a73eb 100644 --- a/src/allmydata/immutable/downloader/node.py +++ b/src/allmydata/immutable/downloader/node.py @@ -72,7 +72,7 @@ class DownloadNode: # things to track callers that want data # _segment_requests can have duplicates - self._segment_requests = [] # (segnum, d, cancel_handle, logparent) + self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp) self._active_segment = None # a SegmentFetcher, with .segnum self._segsize_observers = observer.OneShotObserverList() @@ -119,14 +119,15 @@ class DownloadNode: # things called by outside callers, via CiphertextFileNode. get_segment() # may also be called by Segmentation. - def read(self, consumer, offset=0, size=None, read_ev=None): + def read(self, consumer, offset, size, read_ev): """I am the main entry point, from which FileNode.read() can get data. I feed the consumer with the desired range of ciphertext. I return a Deferred that fires (with the consumer) when the read is finished. Note that there is no notion of a 'file pointer': each call to read() - uses an independent offset= value.""" + uses an independent offset= value. + """ # for concurrent operations: each gets its own Segmentation manager if size is None: size = self._verifycap.size @@ -148,7 +149,11 @@ class DownloadNode: read_ev.finished(now()) # no data, so no producer, so no register/unregisterProducer return defer.succeed(consumer) + + # for concurrent operations, each read() gets its own Segmentation + # manager s = Segmentation(self, offset, size, consumer, read_ev, lp) + # this raises an interesting question: what segments to fetch? if # offset=0, always fetch the first segment, and then allow # Segmentation to be responsible for pulling the subsequent ones if @@ -186,10 +191,10 @@ class DownloadNode: si=base32.b2a(self._verifycap.storage_index)[:8], segnum=segnum, level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") - self._download_status.add_segment_request(segnum, now()) + seg_ev = self._download_status.add_segment_request(segnum, now()) d = defer.Deferred() c = Cancel(self._cancel_request) - self._segment_requests.append( (segnum, d, c, lp) ) + self._segment_requests.append( (segnum, d, c, seg_ev, lp) ) self._start_new_segment() return (d, c) @@ -213,13 +218,13 @@ class DownloadNode: def _start_new_segment(self): if self._active_segment is None and self._segment_requests: - segnum = self._segment_requests[0][0] + (segnum, d, c, seg_ev, lp) = self._segment_requests[0] k = self._verifycap.needed_shares - lp = self._segment_requests[0][3] log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", node=repr(self), segnum=segnum, level=log.NOISY, parent=lp, umid="wAlnHQ") self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp) + seg_ev.activate(now()) active_shares = [s for s in self._shares if s.is_alive()] fetcher.add_shares(active_shares) # this triggers the loop @@ -383,37 +388,49 @@ class DownloadNode: def fetch_failed(self, sf, f): assert sf is self._active_segment # deliver error upwards - for (d,c) in self._extract_requests(sf.segnum): + for (d,c,seg_ev) in self._extract_requests(sf.segnum): + seg_ev.error(now()) eventually(self._deliver, d, c, f) self._active_segment = None self._start_new_segment() def process_blocks(self, segnum, blocks): + start = now() d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) d.addCallback(self._check_ciphertext_hash, segnum) def _deliver(result): - ds = self._download_status - if isinstance(result, Failure): - ds.add_segment_error(segnum, now()) - else: - (offset, segment, decodetime) = result - ds.add_segment_delivery(segnum, now(), - offset, len(segment), decodetime) log.msg(format="delivering segment(%(segnum)d)", segnum=segnum, level=log.OPERATIONAL, parent=self._lp, umid="j60Ojg") - for (d,c) in self._extract_requests(segnum): - eventually(self._deliver, d, c, result) + when = now() + if isinstance(result, Failure): + # this catches failures in decode or ciphertext hash + for (d,c,seg_ev) in self._extract_requests(segnum): + seg_ev.error(when) + eventually(self._deliver, d, c, result) + else: + (offset, segment, decodetime) = result + for (d,c,seg_ev) in self._extract_requests(segnum): + # when we have two requests for the same segment, the + # second one will not be "activated" before the data is + # delivered, so to allow the status-reporting code to see + # consistent behavior, we activate them all now. The + # SegmentEvent will ignore duplicate activate() calls. + # Note that this will result in an infinite "receive + # speed" for the second request. + seg_ev.activate(when) + seg_ev.deliver(when, offset, len(segment), decodetime) + eventually(self._deliver, d, c, result) + self._download_status.add_misc_event("process_block", start, now()) self._active_segment = None self._start_new_segment() d.addBoth(_deliver) - d.addErrback(lambda f: - log.err("unhandled error during process_blocks", - failure=f, level=log.WEIRD, - parent=self._lp, umid="MkEsCg")) + d.addErrback(log.err, "unhandled error during process_blocks", + level=log.WEIRD, parent=self._lp, umid="MkEsCg") def _decode_blocks(self, segnum, blocks): + start = now() tail = (segnum == self.num_segments-1) codec = self._codec block_size = self.block_size @@ -434,7 +451,6 @@ class DownloadNode: shares.append(share) del blocks - start = now() d = codec.decode(shares, shareids) # segment del shares def _process(buffers): @@ -444,11 +460,13 @@ class DownloadNode: del buffers if tail: segment = segment[:self.tail_segment_size] + self._download_status.add_misc_event("decode", start, now()) return (segment, decodetime) d.addCallback(_process) return d def _check_ciphertext_hash(self, (segment, decodetime), segnum): + start = now() assert self._active_segment.segnum == segnum assert self.segment_size is not None offset = segnum * self.segment_size @@ -456,6 +474,7 @@ class DownloadNode: h = hashutil.crypttext_segment_hash(segment) try: self.ciphertext_hash_tree.set_hashes(leaves={segnum: h}) + self._download_status.add_misc_event("CThash", start, now()) return (offset, segment, decodetime) except (BadHashError, NotEnoughHashesError): format = ("hash failure in ciphertext_hash_tree:" @@ -479,7 +498,8 @@ class DownloadNode: def _extract_requests(self, segnum): """Remove matching requests and return their (d,c) tuples so that the caller can retire them.""" - retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests + retire = [(d,c,seg_ev) + for (segnum0,d,c,seg_ev,lp) in self._segment_requests if segnum0 == segnum] self._segment_requests = [t for t in self._segment_requests if t[0] != segnum] @@ -488,7 +508,7 @@ class DownloadNode: def _cancel_request(self, c): self._segment_requests = [t for t in self._segment_requests if t[2] != c] - segnums = [segnum for (segnum,d,c,lp) in self._segment_requests] + segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests] # self._active_segment might be None in rare circumstances, so make # sure we tolerate it if self._active_segment and self._active_segment.segnum not in segnums: diff --git a/src/allmydata/immutable/downloader/segmentation.py b/src/allmydata/immutable/downloader/segmentation.py index 7c9f5cf..84dddbe 100644 --- a/src/allmydata/immutable/downloader/segmentation.py +++ b/src/allmydata/immutable/downloader/segmentation.py @@ -123,6 +123,8 @@ class Segmentation: # the consumer might call our .pauseProducing() inside that write() # call, setting self._hungry=False self._read_ev.update(len(desired_data), 0, 0) + # note: filenode.DecryptingConsumer is responsible for calling + # _read_ev.update with how much decrypt_time was consumed self._maybe_fetch_next() def _retry_bad_segment(self, f): diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py index 78cce8e..c4e46c3 100644 --- a/src/allmydata/immutable/downloader/share.py +++ b/src/allmydata/immutable/downloader/share.py @@ -244,14 +244,18 @@ class Share: # First, consume all of the information that we currently have, for # all the segments people currently want. + start = now() while self._get_satisfaction(): pass + self._download_status.add_misc_event("satisfy", start, now()) # When we get no satisfaction (from the data we've received so far), # we determine what data we desire (to satisfy more requests). The # number of segments is finite, so I can't get no satisfaction # forever. + start = now() wanted, needed = self._desire() + self._download_status.add_misc_event("desire", start, now()) # Finally, send out requests for whatever we need (desire minus # have). You can't always get what you want, but if you try @@ -259,11 +263,13 @@ class Share: self._send_requests(wanted + needed) # and sometimes you can't even get what you need + start = now() disappointment = needed & self._unavailable if disappointment.len(): self.had_corruption = True raise DataUnavailable("need %s but will never get it" % disappointment.dump()) + self._download_status.add_misc_event("checkdis", start, now()) def _get_satisfaction(self): # return True if we retired a data block, and should therefore be @@ -727,11 +733,11 @@ class Share: share=repr(self), start=start, length=length, level=log.NOISY, parent=self._lp, umid="sgVAyA") - req_ev = ds.add_request_sent(self._peerid, self._shnum, - start, length, now()) + block_ev = ds.add_block_request(self._peerid, self._shnum, + start, length, now()) d = self._send_request(start, length) - d.addCallback(self._got_data, start, length, req_ev, lp) - d.addErrback(self._got_error, start, length, req_ev, lp) + d.addCallback(self._got_data, start, length, block_ev, lp) + d.addErrback(self._got_error, start, length, block_ev, lp) d.addCallback(self._trigger_loop) d.addErrback(lambda f: log.err(format="unhandled error during send_request", @@ -741,8 +747,8 @@ class Share: def _send_request(self, start, length): return self._rref.callRemote("read", start, length) - def _got_data(self, data, start, length, req_ev, lp): - req_ev.finished(len(data), now()) + def _got_data(self, data, start, length, block_ev, lp): + block_ev.finished(len(data), now()) if not self._alive: return log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d", @@ -784,8 +790,8 @@ class Share: # the wanted/needed span is only "wanted" for the first pass. Once # the offset table arrives, it's all "needed". - def _got_error(self, f, start, length, req_ev, lp): - req_ev.finished("error", now()) + def _got_error(self, f, start, length, block_ev, lp): + block_ev.error(now()) log.msg(format="error requesting %(start)d+%(length)d" " from %(server)s for si %(si)s", start=start, length=length, diff --git a/src/allmydata/immutable/downloader/status.py b/src/allmydata/immutable/downloader/status.py index 4576d92..ed1fdbc 100644 --- a/src/allmydata/immutable/downloader/status.py +++ b/src/allmydata/immutable/downloader/status.py @@ -3,29 +3,66 @@ import itertools from zope.interface import implements from allmydata.interfaces import IDownloadStatus -class RequestEvent: - def __init__(self, download_status, tag): - self._download_status = download_status - self._tag = tag - def finished(self, received, when): - self._download_status.add_request_finished(self._tag, received, when) +class ReadEvent: + def __init__(self, ev, ds): + self._ev = ev + self._ds = ds + def update(self, bytes, decrypttime, pausetime): + self._ev["bytes_returned"] += bytes + self._ev["decrypt_time"] += decrypttime + self._ev["paused_time"] += pausetime + def finished(self, finishtime): + self._ev["finish_time"] = finishtime + self._ds.update_last_timestamp(finishtime) + +class SegmentEvent: + def __init__(self, ev, ds): + self._ev = ev + self._ds = ds + def activate(self, when): + if self._ev["active_time"] is None: + self._ev["active_time"] = when + def deliver(self, when, start, length, decodetime): + assert self._ev["active_time"] is not None + self._ev["finish_time"] = when + self._ev["success"] = True + self._ev["decode_time"] = decodetime + self._ev["segment_start"] = start + self._ev["segment_length"] = length + self._ds.update_last_timestamp(when) + def error(self, when): + self._ev["finish_time"] = when + self._ev["success"] = False + self._ds.update_last_timestamp(when) class DYHBEvent: - def __init__(self, download_status, tag): - self._download_status = download_status - self._tag = tag + def __init__(self, ev, ds): + self._ev = ev + self._ds = ds + def error(self, when): + self._ev["finish_time"] = when + self._ev["success"] = False + self._ds.update_last_timestamp(when) def finished(self, shnums, when): - self._download_status.add_dyhb_finished(self._tag, shnums, when) + self._ev["finish_time"] = when + self._ev["success"] = True + self._ev["response_shnums"] = shnums + self._ds.update_last_timestamp(when) + +class BlockRequestEvent: + def __init__(self, ev, ds): + self._ev = ev + self._ds = ds + def finished(self, received, when): + self._ev["finish_time"] = when + self._ev["success"] = True + self._ev["response_length"] = received + self._ds.update_last_timestamp(when) + def error(self, when): + self._ev["finish_time"] = when + self._ev["success"] = False + self._ds.update_last_timestamp(when) -class ReadEvent: - def __init__(self, download_status, tag): - self._download_status = download_status - self._tag = tag - def update(self, bytes, decrypttime, pausetime): - self._download_status.update_read_event(self._tag, bytes, - decrypttime, pausetime) - def finished(self, finishtime): - self._download_status.finish_read_event(self._tag, finishtime) class DownloadStatus: # There is one DownloadStatus for each CiphertextFileNode. The status @@ -38,110 +75,122 @@ class DownloadStatus: self.size = size self.counter = self.statusid_counter.next() self.helper = False - self.started = None - # self.dyhb_requests tracks "do you have a share" requests and - # responses. It maps serverid to a tuple of: - # send time - # tuple of response shnums (None if response hasn't arrived, "error") - # response time (None if response hasn't arrived yet) - self.dyhb_requests = {} - - # self.requests tracks share-data requests and responses. It maps - # serverid to a tuple of: - # shnum, - # start,length, (of data requested) - # send time - # response length (None if reponse hasn't arrived yet, or "error") - # response time (None if response hasn't arrived) - self.requests = {} - - # self.segment_events tracks segment requests and delivery. It is a - # list of: - # type ("request", "delivery", "error") - # segment number - # event time - # segment start (file offset of first byte, None except in "delivery") - # segment length (only in "delivery") - # time spent in decode (only in "delivery") - self.segment_events = [] - # self.read_events tracks read() requests. It is a list of: + self.first_timestamp = None + self.last_timestamp = None + + # all four of these _events lists are sorted by start_time, because + # they are strictly append-only (some elements are later mutated in + # place, but none are removed or inserted in the middle). + + # self.read_events tracks read() requests. It is a list of dicts, + # each with the following keys: # start,length (of data requested) - # request time - # finish time (None until finished) - # bytes returned (starts at 0, grows as segments are delivered) - # time spent in decrypt (None for ciphertext-only reads) - # time spent paused + # start_time + # finish_time (None until finished) + # bytes_returned (starts at 0, grows as segments are delivered) + # decrypt_time (time spent in decrypt, None for ciphertext-only reads) + # paused_time (time spent paused by client via pauseProducing) self.read_events = [] + # self.segment_events tracks segment requests and their resolution. + # It is a list of dicts: + # segment_number + # start_time + # active_time (None until work has begun) + # decode_time (time spent in decode, None until delievered) + # finish_time (None until resolved) + # success (None until resolved, then boolean) + # segment_start (file offset of first byte, None until delivered) + # segment_length (None until delivered) + self.segment_events = [] + + # self.dyhb_requests tracks "do you have a share" requests and + # responses. It is a list of dicts: + # serverid (binary) + # start_time + # success (None until resolved, then boolean) + # response_shnums (tuple, None until successful) + # finish_time (None until resolved) + self.dyhb_requests = [] + + # self.block_requests tracks share-data requests and responses. It is + # a list of dicts: + # serverid (binary), + # shnum, + # start,length, (of data requested) + # start_time + # finish_time (None until resolved) + # success (None until resolved, then bool) + # response_length (None until success) + self.block_requests = [] + self.known_shares = [] # (serverid, shnum) self.problems = [] + self.misc_events = [] + + def add_misc_event(self, what, start, finish=None): + self.misc_events.append( {"what": what, + "start_time": start, + "finish_time": finish, + } ) - def add_dyhb_sent(self, serverid, when): - r = (when, None, None) - if serverid not in self.dyhb_requests: - self.dyhb_requests[serverid] = [] - self.dyhb_requests[serverid].append(r) - tag = (serverid, len(self.dyhb_requests[serverid])-1) - return DYHBEvent(self, tag) - - def add_dyhb_finished(self, tag, shnums, when): - # received="error" on error, else tuple(shnums) - (serverid, index) = tag - r = self.dyhb_requests[serverid][index] - (sent, _, _) = r - r = (sent, shnums, when) - self.dyhb_requests[serverid][index] = r - - def add_request_sent(self, serverid, shnum, start, length, when): - r = (shnum, start, length, when, None, None) - if serverid not in self.requests: - self.requests[serverid] = [] - self.requests[serverid].append(r) - tag = (serverid, len(self.requests[serverid])-1) - return RequestEvent(self, tag) - - def add_request_finished(self, tag, received, when): - # received="error" on error, else len(data) - (serverid, index) = tag - r = self.requests[serverid][index] - (shnum, start, length, sent, _, _) = r - r = (shnum, start, length, sent, received, when) - self.requests[serverid][index] = r + def add_read_event(self, start, length, when): + if self.first_timestamp is None: + self.first_timestamp = when + r = { "start": start, + "length": length, + "start_time": when, + "finish_time": None, + "bytes_returned": 0, + "decrypt_time": 0, + "paused_time": 0, + } + self.read_events.append(r) + return ReadEvent(r, self) def add_segment_request(self, segnum, when): - if self.started is None: - self.started = when - r = ("request", segnum, when, None, None, None) - self.segment_events.append(r) - def add_segment_delivery(self, segnum, when, start, length, decodetime): - r = ("delivery", segnum, when, start, length, decodetime) - self.segment_events.append(r) - def add_segment_error(self, segnum, when): - r = ("error", segnum, when, None, None, None) + if self.first_timestamp is None: + self.first_timestamp = when + r = { "segment_number": segnum, + "start_time": when, + "active_time": None, + "finish_time": None, + "success": None, + "decode_time": None, + "segment_start": None, + "segment_length": None, + } self.segment_events.append(r) + return SegmentEvent(r, self) - def add_read_event(self, start, length, when): - if self.started is None: - self.started = when - r = (start, length, when, None, 0, 0, 0) - self.read_events.append(r) - tag = len(self.read_events)-1 - return ReadEvent(self, tag) - def update_read_event(self, tag, bytes_d, decrypt_d, paused_d): - r = self.read_events[tag] - (start, length, requesttime, finishtime, bytes, decrypt, paused) = r - bytes += bytes_d - decrypt += decrypt_d - paused += paused_d - r = (start, length, requesttime, finishtime, bytes, decrypt, paused) - self.read_events[tag] = r - def finish_read_event(self, tag, finishtime): - r = self.read_events[tag] - (start, length, requesttime, _, bytes, decrypt, paused) = r - r = (start, length, requesttime, finishtime, bytes, decrypt, paused) - self.read_events[tag] = r + def add_dyhb_request(self, serverid, when): + r = { "serverid": serverid, + "start_time": when, + "success": None, + "response_shnums": None, + "finish_time": None, + } + self.dyhb_requests.append(r) + return DYHBEvent(r, self) + + def add_block_request(self, serverid, shnum, start, length, when): + r = { "serverid": serverid, + "shnum": shnum, + "start": start, + "length": length, + "start_time": when, + "finish_time": None, + "success": None, + "response_length": None, + } + self.block_requests.append(r) + return BlockRequestEvent(r, self) + + def update_last_timestamp(self, when): + if self.last_timestamp is None or when > self.last_timestamp: + self.last_timestamp = when def add_known_share(self, serverid, shnum): self.known_shares.append( (serverid, shnum) ) @@ -160,15 +209,12 @@ class DownloadStatus: # mention all outstanding segment requests outstanding = set() errorful = set() - for s_ev in self.segment_events: - (etype, segnum, when, segstart, seglen, decodetime) = s_ev - if etype == "request": - outstanding.add(segnum) - elif etype == "delivery": - outstanding.remove(segnum) - else: # "error" - outstanding.remove(segnum) - errorful.add(segnum) + outstanding = set([s_ev["segment_number"] + for s_ev in self.segment_events + if s_ev["finish_time"] is None]) + errorful = set([s_ev["segment_number"] + for s_ev in self.segment_events + if s_ev["success"] is False]) def join(segnums): if len(segnums) == 1: return "segment %s" % list(segnums)[0] @@ -191,10 +237,9 @@ class DownloadStatus: return 0.0 total_outstanding, total_received = 0, 0 for r_ev in self.read_events: - (start, length, ign1, finishtime, bytes, ign2, ign3) = r_ev - if finishtime is None: - total_outstanding += length - total_received += bytes + if r_ev["finish_time"] is None: + total_outstanding += r_ev["length"] + total_received += r_ev["bytes_returned"] # else ignore completed requests if not total_outstanding: return 1.0 @@ -207,12 +252,11 @@ class DownloadStatus: # a download is considered active if it has at least one outstanding # read() call for r_ev in self.read_events: - (ign1, ign2, ign3, finishtime, ign4, ign5, ign6) = r_ev - if finishtime is None: + if r_ev["finish_time"] is None: return True return False def get_started(self): - return self.started + return self.first_timestamp def get_results(self): return None # TODO diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index ed3785b..2adc69b 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -55,13 +55,14 @@ class CiphertextFileNode: return a Deferred that fires (with the consumer) when the read is finished.""" self._maybe_create_download_node() - actual_size = size - if actual_size is None: - actual_size = self._verifycap.size - offset - read_ev = self._download_status.add_read_event(offset, actual_size, - now()) + if size is None: + size = self._verifycap.size + # clip size so offset+size does not go past EOF + size = min(size, self._verifycap.size-offset) + read_ev = self._download_status.add_read_event(offset, size, now()) if IDownloadStatusHandlingConsumer.providedBy(consumer): consumer.set_download_status_read_event(read_ev) + consumer.set_download_status(self._download_status) return self._node.read(consumer, offset, size, read_ev) def get_segment(self, segnum): @@ -177,7 +178,8 @@ class DecryptingConsumer: def __init__(self, consumer, readkey, offset): self._consumer = consumer - self._read_event = None + self._read_ev = None + self._download_status = None # TODO: pycryptopp CTR-mode needs random-access operations: I want # either a=AES(readkey, offset) or better yet both of: # a=AES(readkey, offset=0) @@ -190,7 +192,9 @@ class DecryptingConsumer: self._decryptor.process("\x00"*offset_small) def set_download_status_read_event(self, read_ev): - self._read_event = read_ev + self._read_ev = read_ev + def set_download_status(self, ds): + self._download_status = ds def registerProducer(self, producer, streaming): # this passes through, so the real consumer can flow-control the real @@ -203,9 +207,11 @@ class DecryptingConsumer: def write(self, ciphertext): started = now() plaintext = self._decryptor.process(ciphertext) - if self._read_event: + if self._read_ev: elapsed = now() - started - self._read_event.update(0, elapsed, 0) + self._read_ev.update(0, elapsed, 0) + if self._download_status: + self._download_status.add_misc_event("AES", started, now()) self._consumer.write(plaintext) class ImmutableFileNode: diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 3a69220..4d0e364 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -1214,14 +1214,16 @@ class Status(unittest.TestCase): now = 12345.1 ds = DownloadStatus("si-1", 123) self.failUnlessEqual(ds.get_status(), "idle") - ds.add_segment_request(0, now) + ev0 = ds.add_segment_request(0, now) self.failUnlessEqual(ds.get_status(), "fetching segment 0") - ds.add_segment_delivery(0, now+1, 0, 1000, 2.0) + ev0.activate(now+0.5) + ev0.deliver(now+1, 0, 1000, 2.0) self.failUnlessEqual(ds.get_status(), "idle") - ds.add_segment_request(2, now+2) - ds.add_segment_request(1, now+2) + ev2 = ds.add_segment_request(2, now+2) + del ev2 # hush pyflakes + ev1 = ds.add_segment_request(1, now+2) self.failUnlessEqual(ds.get_status(), "fetching segments 1,2") - ds.add_segment_error(1, now+3) + ev1.error(now+3) self.failUnlessEqual(ds.get_status(), "fetching segment 2; errors on segment 1") diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index f68e98d..b4b8bdc 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -19,7 +19,7 @@ from allmydata.nodemaker import NodeMaker from allmydata.unknown import UnknownNode from allmydata.web import status, common from allmydata.scripts.debug import CorruptShareOptions, corrupt_share -from allmydata.util import fileutil, base32 +from allmydata.util import fileutil, base32, hashutil from allmydata.util.consumer import download_to_data from allmydata.util.netstring import split_netstring from allmydata.util.encodingutil import to_str @@ -78,34 +78,38 @@ def build_one_ds(): ds = DownloadStatus("storage_index", 1234) now = time.time() - ds.add_segment_request(0, now) - # segnum, when, start,len, decodetime - ds.add_segment_delivery(0, now+1, 0, 100, 0.5) - ds.add_segment_request(1, now+2) - ds.add_segment_error(1, now+3) + serverid_a = hashutil.tagged_hash("foo", "serverid_a")[:20] + serverid_b = hashutil.tagged_hash("foo", "serverid_b")[:20] + storage_index = hashutil.storage_index_hash("SI") + e0 = ds.add_segment_request(0, now) + e0.activate(now+0.5) + e0.deliver(now+1, 0, 100, 0.5) # when, start,len, decodetime + e1 = ds.add_segment_request(1, now+2) + e1.error(now+3) # two outstanding requests - ds.add_segment_request(2, now+4) - ds.add_segment_request(3, now+5) + e2 = ds.add_segment_request(2, now+4) + e3 = ds.add_segment_request(3, now+5) # simulate a segment which gets delivered faster than a system clock tick (ticket #1166) - ds.add_segment_request(4, now) - ds.add_segment_delivery(4, now, 0, 140, 0.5) + e = ds.add_segment_request(4, now) + e.activate(now) + e.deliver(now, 0, 140, 0.5) - e = ds.add_dyhb_sent("serverid_a", now) + e = ds.add_dyhb_request(serverid_a, now) e.finished([1,2], now+1) - e = ds.add_dyhb_sent("serverid_b", now+2) # left unfinished + e = ds.add_dyhb_request(serverid_b, now+2) # left unfinished e = ds.add_read_event(0, 120, now) e.update(60, 0.5, 0.1) # bytes, decrypttime, pausetime e.finished(now+1) e = ds.add_read_event(120, 30, now+2) # left unfinished - e = ds.add_request_sent("serverid_a", 1, 100, 20, now) + e = ds.add_block_request(serverid_a, 1, 100, 20, now) e.finished(20, now+1) - e = ds.add_request_sent("serverid_a", 1, 120, 30, now+1) # left unfinished + e = ds.add_block_request(serverid_a, 1, 120, 30, now+1) # left unfinished # make sure that add_read_event() can come first too - ds1 = DownloadStatus("storage_index", 1234) + ds1 = DownloadStatus(storage_index, 1234) e = ds1.add_read_event(0, 120, now) e.update(60, 0.5, 0.1) # bytes, decrypttime, pausetime e.finished(now+1) @@ -554,10 +558,25 @@ class Web(WebMixin, WebErrorMixin, testutil.StallMixin, testutil.ReallyEqualMixi def _check_dl(res): self.failUnless("File Download Status" in res, res) d.addCallback(_check_dl) - d.addCallback(lambda res: self.GET("/status/down-%d?t=json" % dl_num)) + d.addCallback(lambda res: self.GET("/status/down-%d/event_json" % dl_num)) def _check_dl_json(res): data = simplejson.loads(res) self.failUnless(isinstance(data, dict)) + self.failUnless("read" in data) + self.failUnlessEqual(data["read"][0]["length"], 120) + self.failUnlessEqual(data["segment"][0]["segment_length"], 100) + self.failUnlessEqual(data["segment"][2]["segment_number"], 2) + self.failUnlessEqual(data["segment"][2]["finish_time"], None) + phwr_id = base32.b2a(hashutil.tagged_hash("foo", "serverid_a")[:20]) + cmpu_id = base32.b2a(hashutil.tagged_hash("foo", "serverid_b")[:20]) + # serverids[] keys are strings, since that's what JSON does, but + # we'd really like them to be ints + self.failUnlessEqual(data["serverids"]["0"], "phwr") + self.failUnlessEqual(data["serverids"]["1"], "cmpu") + self.failUnlessEqual(data["server_info"][phwr_id]["short"], "phwr") + self.failUnlessEqual(data["server_info"][cmpu_id]["short"], "cmpu") + self.failUnless("dyhb" in data) + self.failUnless("misc" in data) d.addCallback(_check_dl_json) d.addCallback(lambda res: self.GET("/status/up-%d" % ul_num)) def _check_ul(res): diff --git a/src/allmydata/web/download-status-timeline.xhtml b/src/allmydata/web/download-status-timeline.xhtml new file mode 100644 index 0000000..ff893db --- /dev/null +++ b/src/allmydata/web/download-status-timeline.xhtml @@ -0,0 +1,34 @@ + +
+