prepare for viz: improve DownloadStatus events

consolidate IDownloadStatusHandlingConsumer stuff into DownloadNode
This commit is contained in:
Brian Warner 2011-06-29 15:22:04 -07:00
parent ed836157aa
commit fc5c2208fb
7 changed files with 239 additions and 185 deletions

View File

@ -135,7 +135,7 @@ class ShareFinder:
lp = self.log(format="sending DYHB to [%(name)s]", name=server.name(), lp = self.log(format="sending DYHB to [%(name)s]", name=server.name(),
level=log.NOISY, umid="Io7pyg") level=log.NOISY, umid="Io7pyg")
time_sent = now() time_sent = now()
d_ev = self._download_status.add_dyhb_sent(server.get_serverid(), d_ev = self._download_status.add_dyhb_request(server.get_serverid(),
time_sent) time_sent)
# TODO: get the timer from a Server object, it knows best # TODO: get the timer from a Server object, it knows best
self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
@ -218,7 +218,7 @@ class ShareFinder:
eventually(self.share_consumer.got_shares, shares) eventually(self.share_consumer.got_shares, shares)
def _got_error(self, f, server, req, d_ev, lp): def _got_error(self, f, server, req, d_ev, lp):
d_ev.finished("error", now()) d_ev.error(now())
self.log(format="got error from [%(name)s]", self.log(format="got error from [%(name)s]",
name=server.name(), failure=f, name=server.name(), failure=f,
level=log.UNUSUAL, parent=lp, umid="zUKdCw") level=log.UNUSUAL, parent=lp, umid="zUKdCw")

View File

@ -1,6 +1,7 @@
import time import time
now = time.time now = time.time
from zope.interface import Interface
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.internet import defer from twisted.internet import defer
from foolscap.api import eventually from foolscap.api import eventually
@ -17,6 +18,11 @@ from fetcher import SegmentFetcher
from segmentation import Segmentation from segmentation import Segmentation
from common import BadCiphertextHashError from common import BadCiphertextHashError
class IDownloadStatusHandlingConsumer(Interface):
def set_download_status_read_event(read_ev):
"""Record the DownloadStatus 'read event', to be updated with the
time it takes to decrypt each chunk of data."""
class Cancel: class Cancel:
def __init__(self, f): def __init__(self, f):
self._f = f self._f = f
@ -72,7 +78,7 @@ class DownloadNode:
# things to track callers that want data # things to track callers that want data
# _segment_requests can have duplicates # _segment_requests can have duplicates
self._segment_requests = [] # (segnum, d, cancel_handle, logparent) self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
self._active_segment = None # a SegmentFetcher, with .segnum self._active_segment = None # a SegmentFetcher, with .segnum
self._segsize_observers = observer.OneShotObserverList() self._segsize_observers = observer.OneShotObserverList()
@ -119,22 +125,25 @@ class DownloadNode:
# things called by outside callers, via CiphertextFileNode. get_segment() # things called by outside callers, via CiphertextFileNode. get_segment()
# may also be called by Segmentation. # may also be called by Segmentation.
def read(self, consumer, offset=0, size=None, read_ev=None): def read(self, consumer, offset, size):
"""I am the main entry point, from which FileNode.read() can get """I am the main entry point, from which FileNode.read() can get
data. I feed the consumer with the desired range of ciphertext. I data. I feed the consumer with the desired range of ciphertext. I
return a Deferred that fires (with the consumer) when the read is return a Deferred that fires (with the consumer) when the read is
finished. finished.
Note that there is no notion of a 'file pointer': each call to read() Note that there is no notion of a 'file pointer': each call to read()
uses an independent offset= value.""" uses an independent offset= value.
"""
# for concurrent operations: each gets its own Segmentation manager # for concurrent operations: each gets its own Segmentation manager
if size is None: if size is None:
size = self._verifycap.size size = self._verifycap.size
# ignore overruns: clip size so offset+size does not go past EOF, and # ignore overruns: clip size so offset+size does not go past EOF, and
# so size is not negative (which indicates that offset >= EOF) # so size is not negative (which indicates that offset >= EOF)
size = max(0, min(size, self._verifycap.size-offset)) size = max(0, min(size, self._verifycap.size-offset))
if read_ev is None:
read_ev = self._download_status.add_read_event(offset, size, now()) read_ev = self._download_status.add_read_event(offset, size, now())
if IDownloadStatusHandlingConsumer.providedBy(consumer):
consumer.set_download_status_read_event(read_ev)
lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)", lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
si=base32.b2a(self._verifycap.storage_index)[:8], si=base32.b2a(self._verifycap.storage_index)[:8],
@ -148,7 +157,11 @@ class DownloadNode:
read_ev.finished(now()) read_ev.finished(now())
# no data, so no producer, so no register/unregisterProducer # no data, so no producer, so no register/unregisterProducer
return defer.succeed(consumer) return defer.succeed(consumer)
# for concurrent operations, each read() gets its own Segmentation
# manager
s = Segmentation(self, offset, size, consumer, read_ev, lp) s = Segmentation(self, offset, size, consumer, read_ev, lp)
# this raises an interesting question: what segments to fetch? if # this raises an interesting question: what segments to fetch? if
# offset=0, always fetch the first segment, and then allow # offset=0, always fetch the first segment, and then allow
# Segmentation to be responsible for pulling the subsequent ones if # Segmentation to be responsible for pulling the subsequent ones if
@ -186,10 +199,10 @@ class DownloadNode:
si=base32.b2a(self._verifycap.storage_index)[:8], si=base32.b2a(self._verifycap.storage_index)[:8],
segnum=segnum, segnum=segnum,
level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
self._download_status.add_segment_request(segnum, now()) seg_ev = self._download_status.add_segment_request(segnum, now())
d = defer.Deferred() d = defer.Deferred()
c = Cancel(self._cancel_request) c = Cancel(self._cancel_request)
self._segment_requests.append( (segnum, d, c, lp) ) self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
self._start_new_segment() self._start_new_segment()
return (d, c) return (d, c)
@ -213,13 +226,13 @@ class DownloadNode:
def _start_new_segment(self): def _start_new_segment(self):
if self._active_segment is None and self._segment_requests: if self._active_segment is None and self._segment_requests:
segnum = self._segment_requests[0][0] (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
k = self._verifycap.needed_shares k = self._verifycap.needed_shares
lp = self._segment_requests[0][3]
log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
node=repr(self), segnum=segnum, node=repr(self), segnum=segnum,
level=log.NOISY, parent=lp, umid="wAlnHQ") level=log.NOISY, parent=lp, umid="wAlnHQ")
self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp) self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
seg_ev.activate(now())
active_shares = [s for s in self._shares if s.is_alive()] active_shares = [s for s in self._shares if s.is_alive()]
fetcher.add_shares(active_shares) # this triggers the loop fetcher.add_shares(active_shares) # this triggers the loop
@ -383,7 +396,8 @@ class DownloadNode:
def fetch_failed(self, sf, f): def fetch_failed(self, sf, f):
assert sf is self._active_segment assert sf is self._active_segment
# deliver error upwards # deliver error upwards
for (d,c) in self._extract_requests(sf.segnum): for (d,c,seg_ev) in self._extract_requests(sf.segnum):
seg_ev.error(now())
eventually(self._deliver, d, c, f) eventually(self._deliver, d, c, f)
self._active_segment = None self._active_segment = None
self._start_new_segment() self._start_new_segment()
@ -392,26 +406,34 @@ class DownloadNode:
d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
d.addCallback(self._check_ciphertext_hash, segnum) d.addCallback(self._check_ciphertext_hash, segnum)
def _deliver(result): def _deliver(result):
ds = self._download_status
if isinstance(result, Failure):
ds.add_segment_error(segnum, now())
else:
(offset, segment, decodetime) = result
ds.add_segment_delivery(segnum, now(),
offset, len(segment), decodetime)
log.msg(format="delivering segment(%(segnum)d)", log.msg(format="delivering segment(%(segnum)d)",
segnum=segnum, segnum=segnum,
level=log.OPERATIONAL, parent=self._lp, level=log.OPERATIONAL, parent=self._lp,
umid="j60Ojg") umid="j60Ojg")
for (d,c) in self._extract_requests(segnum): when = now()
if isinstance(result, Failure):
# this catches failures in decode or ciphertext hash
for (d,c,seg_ev) in self._extract_requests(segnum):
seg_ev.error(when)
eventually(self._deliver, d, c, result)
else:
(offset, segment, decodetime) = result
for (d,c,seg_ev) in self._extract_requests(segnum):
# when we have two requests for the same segment, the
# second one will not be "activated" before the data is
# delivered, so to allow the status-reporting code to see
# consistent behavior, we activate them all now. The
# SegmentEvent will ignore duplicate activate() calls.
# Note that this will result in an inaccurate "receive
# speed" for the second request.
seg_ev.activate(when)
seg_ev.deliver(when, offset, len(segment), decodetime)
eventually(self._deliver, d, c, result) eventually(self._deliver, d, c, result)
self._active_segment = None self._active_segment = None
self._start_new_segment() self._start_new_segment()
d.addBoth(_deliver) d.addBoth(_deliver)
d.addErrback(lambda f: d.addErrback(log.err, "unhandled error during process_blocks",
log.err("unhandled error during process_blocks", level=log.WEIRD, parent=self._lp, umid="MkEsCg")
failure=f, level=log.WEIRD,
parent=self._lp, umid="MkEsCg"))
def _decode_blocks(self, segnum, blocks): def _decode_blocks(self, segnum, blocks):
tail = (segnum == self.num_segments-1) tail = (segnum == self.num_segments-1)
@ -479,7 +501,8 @@ class DownloadNode:
def _extract_requests(self, segnum): def _extract_requests(self, segnum):
"""Remove matching requests and return their (d,c) tuples so that the """Remove matching requests and return their (d,c) tuples so that the
caller can retire them.""" caller can retire them."""
retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests retire = [(d,c,seg_ev)
for (segnum0,d,c,seg_ev,lp) in self._segment_requests
if segnum0 == segnum] if segnum0 == segnum]
self._segment_requests = [t for t in self._segment_requests self._segment_requests = [t for t in self._segment_requests
if t[0] != segnum] if t[0] != segnum]
@ -488,7 +511,7 @@ class DownloadNode:
def _cancel_request(self, c): def _cancel_request(self, c):
self._segment_requests = [t for t in self._segment_requests self._segment_requests = [t for t in self._segment_requests
if t[2] != c] if t[2] != c]
segnums = [segnum for (segnum,d,c,lp) in self._segment_requests] segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
# self._active_segment might be None in rare circumstances, so make # self._active_segment might be None in rare circumstances, so make
# sure we tolerate it # sure we tolerate it
if self._active_segment and self._active_segment.segnum not in segnums: if self._active_segment and self._active_segment.segnum not in segnums:

View File

@ -123,6 +123,8 @@ class Segmentation:
# the consumer might call our .pauseProducing() inside that write() # the consumer might call our .pauseProducing() inside that write()
# call, setting self._hungry=False # call, setting self._hungry=False
self._read_ev.update(len(desired_data), 0, 0) self._read_ev.update(len(desired_data), 0, 0)
# note: filenode.DecryptingConsumer is responsible for calling
# _read_ev.update with how much decrypt_time was consumed
self._maybe_fetch_next() self._maybe_fetch_next()
def _retry_bad_segment(self, f): def _retry_bad_segment(self, f):

View File

@ -726,12 +726,12 @@ class Share:
share=repr(self), share=repr(self),
start=start, length=length, start=start, length=length,
level=log.NOISY, parent=self._lp, umid="sgVAyA") level=log.NOISY, parent=self._lp, umid="sgVAyA")
req_ev = ds.add_request_sent(self._server.get_serverid(), block_ev = ds.add_block_request(self._server.get_serverid(),
self._shnum, self._shnum,
start, length, now()) start, length, now())
d = self._send_request(start, length) d = self._send_request(start, length)
d.addCallback(self._got_data, start, length, req_ev, lp) d.addCallback(self._got_data, start, length, block_ev, lp)
d.addErrback(self._got_error, start, length, req_ev, lp) d.addErrback(self._got_error, start, length, block_ev, lp)
d.addCallback(self._trigger_loop) d.addCallback(self._trigger_loop)
d.addErrback(lambda f: d.addErrback(lambda f:
log.err(format="unhandled error during send_request", log.err(format="unhandled error during send_request",
@ -741,8 +741,8 @@ class Share:
def _send_request(self, start, length): def _send_request(self, start, length):
return self._rref.callRemote("read", start, length) return self._rref.callRemote("read", start, length)
def _got_data(self, data, start, length, req_ev, lp): def _got_data(self, data, start, length, block_ev, lp):
req_ev.finished(len(data), now()) block_ev.finished(len(data), now())
if not self._alive: if not self._alive:
return return
log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d", log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
@ -784,8 +784,8 @@ class Share:
# the wanted/needed span is only "wanted" for the first pass. Once # the wanted/needed span is only "wanted" for the first pass. Once
# the offset table arrives, it's all "needed". # the offset table arrives, it's all "needed".
def _got_error(self, f, start, length, req_ev, lp): def _got_error(self, f, start, length, block_ev, lp):
req_ev.finished("error", now()) block_ev.error(now())
log.msg(format="error requesting %(start)d+%(length)d" log.msg(format="error requesting %(start)d+%(length)d"
" from %(server)s for si %(si)s", " from %(server)s for si %(si)s",
start=start, length=length, start=start, length=length,

View File

@ -3,29 +3,66 @@ import itertools
from zope.interface import implements from zope.interface import implements
from allmydata.interfaces import IDownloadStatus from allmydata.interfaces import IDownloadStatus
class RequestEvent: class ReadEvent:
def __init__(self, download_status, tag): def __init__(self, ev, ds):
self._download_status = download_status self._ev = ev
self._tag = tag self._ds = ds
def finished(self, received, when): def update(self, bytes, decrypttime, pausetime):
self._download_status.add_request_finished(self._tag, received, when) self._ev["bytes_returned"] += bytes
self._ev["decrypt_time"] += decrypttime
self._ev["paused_time"] += pausetime
def finished(self, finishtime):
self._ev["finish_time"] = finishtime
self._ds.update_last_timestamp(finishtime)
class SegmentEvent:
def __init__(self, ev, ds):
self._ev = ev
self._ds = ds
def activate(self, when):
if self._ev["active_time"] is None:
self._ev["active_time"] = when
def deliver(self, when, start, length, decodetime):
assert self._ev["active_time"] is not None
self._ev["finish_time"] = when
self._ev["success"] = True
self._ev["decode_time"] = decodetime
self._ev["segment_start"] = start
self._ev["segment_length"] = length
self._ds.update_last_timestamp(when)
def error(self, when):
self._ev["finish_time"] = when
self._ev["success"] = False
self._ds.update_last_timestamp(when)
class DYHBEvent: class DYHBEvent:
def __init__(self, download_status, tag): def __init__(self, ev, ds):
self._download_status = download_status self._ev = ev
self._tag = tag self._ds = ds
def error(self, when):
self._ev["finish_time"] = when
self._ev["success"] = False
self._ds.update_last_timestamp(when)
def finished(self, shnums, when): def finished(self, shnums, when):
self._download_status.add_dyhb_finished(self._tag, shnums, when) self._ev["finish_time"] = when
self._ev["success"] = True
self._ev["response_shnums"] = shnums
self._ds.update_last_timestamp(when)
class BlockRequestEvent:
def __init__(self, ev, ds):
self._ev = ev
self._ds = ds
def finished(self, received, when):
self._ev["finish_time"] = when
self._ev["success"] = True
self._ev["response_length"] = received
self._ds.update_last_timestamp(when)
def error(self, when):
self._ev["finish_time"] = when
self._ev["success"] = False
self._ds.update_last_timestamp(when)
class ReadEvent:
def __init__(self, download_status, tag):
self._download_status = download_status
self._tag = tag
def update(self, bytes, decrypttime, pausetime):
self._download_status.update_read_event(self._tag, bytes,
decrypttime, pausetime)
def finished(self, finishtime):
self._download_status.finish_read_event(self._tag, finishtime)
class DownloadStatus: class DownloadStatus:
# There is one DownloadStatus for each CiphertextFileNode. The status # There is one DownloadStatus for each CiphertextFileNode. The status
@ -38,110 +75,115 @@ class DownloadStatus:
self.size = size self.size = size
self.counter = self.statusid_counter.next() self.counter = self.statusid_counter.next()
self.helper = False self.helper = False
self.started = None
# self.dyhb_requests tracks "do you have a share" requests and
# responses. It maps serverid to a tuple of:
# send time
# tuple of response shnums (None if response hasn't arrived, "error")
# response time (None if response hasn't arrived yet)
self.dyhb_requests = {}
# self.requests tracks share-data requests and responses. It maps self.first_timestamp = None
# serverid to a tuple of: self.last_timestamp = None
# shnum,
# start,length, (of data requested)
# send time
# response length (None if reponse hasn't arrived yet, or "error")
# response time (None if response hasn't arrived)
self.requests = {}
# self.segment_events tracks segment requests and delivery. It is a # all four of these _events lists are sorted by start_time, because
# list of: # they are strictly append-only (some elements are later mutated in
# type ("request", "delivery", "error") # place, but none are removed or inserted in the middle).
# segment number
# event time # self.read_events tracks read() requests. It is a list of dicts,
# segment start (file offset of first byte, None except in "delivery") # each with the following keys:
# segment length (only in "delivery") # start,length (of data requested)
# time spent in decode (only in "delivery") # start_time
# finish_time (None until finished)
# bytes_returned (starts at 0, grows as segments are delivered)
# decrypt_time (time spent in decrypt, None for ciphertext-only reads)
# paused_time (time spent paused by client via pauseProducing)
self.read_events = []
# self.segment_events tracks segment requests and their resolution.
# It is a list of dicts:
# segment_number
# start_time
# active_time (None until work has begun)
# decode_time (time spent in decode, None until delievered)
# finish_time (None until resolved)
# success (None until resolved, then boolean)
# segment_start (file offset of first byte, None until delivered)
# segment_length (None until delivered)
self.segment_events = [] self.segment_events = []
# self.read_events tracks read() requests. It is a list of: # self.dyhb_requests tracks "do you have a share" requests and
# start,length (of data requested) # responses. It is a list of dicts:
# request time # serverid (binary)
# finish time (None until finished) # start_time
# bytes returned (starts at 0, grows as segments are delivered) # success (None until resolved, then boolean)
# time spent in decrypt (None for ciphertext-only reads) # response_shnums (tuple, None until successful)
# time spent paused # finish_time (None until resolved)
self.read_events = [] self.dyhb_requests = []
# self.block_requests tracks share-data requests and responses. It is
# a list of dicts:
# serverid (binary),
# shnum,
# start,length, (of data requested)
# start_time
# finish_time (None until resolved)
# success (None until resolved, then bool)
# response_length (None until success)
self.block_requests = []
self.known_shares = [] # (serverid, shnum) self.known_shares = [] # (serverid, shnum)
self.problems = [] self.problems = []
def add_dyhb_sent(self, serverid, when): def add_read_event(self, start, length, when):
r = (when, None, None) if self.first_timestamp is None:
if serverid not in self.dyhb_requests: self.first_timestamp = when
self.dyhb_requests[serverid] = [] r = { "start": start,
self.dyhb_requests[serverid].append(r) "length": length,
tag = (serverid, len(self.dyhb_requests[serverid])-1) "start_time": when,
return DYHBEvent(self, tag) "finish_time": None,
"bytes_returned": 0,
def add_dyhb_finished(self, tag, shnums, when): "decrypt_time": 0,
# received="error" on error, else tuple(shnums) "paused_time": 0,
(serverid, index) = tag }
r = self.dyhb_requests[serverid][index] self.read_events.append(r)
(sent, _, _) = r return ReadEvent(r, self)
r = (sent, shnums, when)
self.dyhb_requests[serverid][index] = r
def add_request_sent(self, serverid, shnum, start, length, when):
r = (shnum, start, length, when, None, None)
if serverid not in self.requests:
self.requests[serverid] = []
self.requests[serverid].append(r)
tag = (serverid, len(self.requests[serverid])-1)
return RequestEvent(self, tag)
def add_request_finished(self, tag, received, when):
# received="error" on error, else len(data)
(serverid, index) = tag
r = self.requests[serverid][index]
(shnum, start, length, sent, _, _) = r
r = (shnum, start, length, sent, received, when)
self.requests[serverid][index] = r
def add_segment_request(self, segnum, when): def add_segment_request(self, segnum, when):
if self.started is None: if self.first_timestamp is None:
self.started = when self.first_timestamp = when
r = ("request", segnum, when, None, None, None) r = { "segment_number": segnum,
self.segment_events.append(r) "start_time": when,
def add_segment_delivery(self, segnum, when, start, length, decodetime): "active_time": None,
r = ("delivery", segnum, when, start, length, decodetime) "finish_time": None,
self.segment_events.append(r) "success": None,
def add_segment_error(self, segnum, when): "decode_time": None,
r = ("error", segnum, when, None, None, None) "segment_start": None,
"segment_length": None,
}
self.segment_events.append(r) self.segment_events.append(r)
return SegmentEvent(r, self)
def add_read_event(self, start, length, when): def add_dyhb_request(self, serverid, when):
if self.started is None: r = { "serverid": serverid,
self.started = when "start_time": when,
r = (start, length, when, None, 0, 0, 0) "success": None,
self.read_events.append(r) "response_shnums": None,
tag = len(self.read_events)-1 "finish_time": None,
return ReadEvent(self, tag) }
def update_read_event(self, tag, bytes_d, decrypt_d, paused_d): self.dyhb_requests.append(r)
r = self.read_events[tag] return DYHBEvent(r, self)
(start, length, requesttime, finishtime, bytes, decrypt, paused) = r
bytes += bytes_d def add_block_request(self, serverid, shnum, start, length, when):
decrypt += decrypt_d r = { "serverid": serverid,
paused += paused_d "shnum": shnum,
r = (start, length, requesttime, finishtime, bytes, decrypt, paused) "start": start,
self.read_events[tag] = r "length": length,
def finish_read_event(self, tag, finishtime): "start_time": when,
r = self.read_events[tag] "finish_time": None,
(start, length, requesttime, _, bytes, decrypt, paused) = r "success": None,
r = (start, length, requesttime, finishtime, bytes, decrypt, paused) "response_length": None,
self.read_events[tag] = r }
self.block_requests.append(r)
return BlockRequestEvent(r, self)
def update_last_timestamp(self, when):
if self.last_timestamp is None or when > self.last_timestamp:
self.last_timestamp = when
def add_known_share(self, serverid, shnum): def add_known_share(self, serverid, shnum):
self.known_shares.append( (serverid, shnum) ) self.known_shares.append( (serverid, shnum) )
@ -160,15 +202,12 @@ class DownloadStatus:
# mention all outstanding segment requests # mention all outstanding segment requests
outstanding = set() outstanding = set()
errorful = set() errorful = set()
for s_ev in self.segment_events: outstanding = set([s_ev["segment_number"]
(etype, segnum, when, segstart, seglen, decodetime) = s_ev for s_ev in self.segment_events
if etype == "request": if s_ev["finish_time"] is None])
outstanding.add(segnum) errorful = set([s_ev["segment_number"]
elif etype == "delivery": for s_ev in self.segment_events
outstanding.remove(segnum) if s_ev["success"] is False])
else: # "error"
outstanding.remove(segnum)
errorful.add(segnum)
def join(segnums): def join(segnums):
if len(segnums) == 1: if len(segnums) == 1:
return "segment %s" % list(segnums)[0] return "segment %s" % list(segnums)[0]
@ -191,10 +230,9 @@ class DownloadStatus:
return 0.0 return 0.0
total_outstanding, total_received = 0, 0 total_outstanding, total_received = 0, 0
for r_ev in self.read_events: for r_ev in self.read_events:
(start, length, ign1, finishtime, bytes, ign2, ign3) = r_ev if r_ev["finish_time"] is None:
if finishtime is None: total_outstanding += r_ev["length"]
total_outstanding += length total_received += r_ev["bytes_returned"]
total_received += bytes
# else ignore completed requests # else ignore completed requests
if not total_outstanding: if not total_outstanding:
return 1.0 return 1.0
@ -213,6 +251,6 @@ class DownloadStatus:
return False return False
def get_started(self): def get_started(self):
return self.started return self.first_timestamp
def get_results(self): def get_results(self):
return None # TODO return None # TODO

View File

@ -3,7 +3,7 @@ import binascii
import copy import copy
import time import time
now = time.time now = time.time
from zope.interface import implements, Interface from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.interfaces import IConsumer from twisted.internet.interfaces import IConsumer
@ -16,14 +16,10 @@ from pycryptopp.cipher.aes import AES
# local imports # local imports
from allmydata.immutable.checker import Checker from allmydata.immutable.checker import Checker
from allmydata.immutable.repairer import Repairer from allmydata.immutable.repairer import Repairer
from allmydata.immutable.downloader.node import DownloadNode from allmydata.immutable.downloader.node import DownloadNode, \
IDownloadStatusHandlingConsumer
from allmydata.immutable.downloader.status import DownloadStatus from allmydata.immutable.downloader.status import DownloadStatus
class IDownloadStatusHandlingConsumer(Interface):
def set_download_status_read_event(read_ev):
"""Record the DownloadStatus 'read event', to be updated with the
time it takes to decrypt each chunk of data."""
class CiphertextFileNode: class CiphertextFileNode:
def __init__(self, verifycap, storage_broker, secret_holder, def __init__(self, verifycap, storage_broker, secret_holder,
terminator, history): terminator, history):
@ -55,14 +51,7 @@ class CiphertextFileNode:
return a Deferred that fires (with the consumer) when the read is return a Deferred that fires (with the consumer) when the read is
finished.""" finished."""
self._maybe_create_download_node() self._maybe_create_download_node()
actual_size = size return self._node.read(consumer, offset, size)
if actual_size is None:
actual_size = self._verifycap.size - offset
read_ev = self._download_status.add_read_event(offset, actual_size,
now())
if IDownloadStatusHandlingConsumer.providedBy(consumer):
consumer.set_download_status_read_event(read_ev)
return self._node.read(consumer, offset, size, read_ev)
def get_segment(self, segnum): def get_segment(self, segnum):
"""Begin downloading a segment. I return a tuple (d, c): 'd' is a """Begin downloading a segment. I return a tuple (d, c): 'd' is a
@ -177,7 +166,7 @@ class DecryptingConsumer:
def __init__(self, consumer, readkey, offset): def __init__(self, consumer, readkey, offset):
self._consumer = consumer self._consumer = consumer
self._read_event = None self._read_ev = None
# TODO: pycryptopp CTR-mode needs random-access operations: I want # TODO: pycryptopp CTR-mode needs random-access operations: I want
# either a=AES(readkey, offset) or better yet both of: # either a=AES(readkey, offset) or better yet both of:
# a=AES(readkey, offset=0) # a=AES(readkey, offset=0)
@ -190,7 +179,7 @@ class DecryptingConsumer:
self._decryptor.process("\x00"*offset_small) self._decryptor.process("\x00"*offset_small)
def set_download_status_read_event(self, read_ev): def set_download_status_read_event(self, read_ev):
self._read_event = read_ev self._read_ev = read_ev
def registerProducer(self, producer, streaming): def registerProducer(self, producer, streaming):
# this passes through, so the real consumer can flow-control the real # this passes through, so the real consumer can flow-control the real
@ -203,9 +192,9 @@ class DecryptingConsumer:
def write(self, ciphertext): def write(self, ciphertext):
started = now() started = now()
plaintext = self._decryptor.process(ciphertext) plaintext = self._decryptor.process(ciphertext)
if self._read_event: if self._read_ev:
elapsed = now() - started elapsed = now() - started
self._read_event.update(0, elapsed, 0) self._read_ev.update(0, elapsed, 0)
self._consumer.write(plaintext) self._consumer.write(plaintext)
class ImmutableFileNode: class ImmutableFileNode:

View File

@ -1217,14 +1217,16 @@ class Status(unittest.TestCase):
now = 12345.1 now = 12345.1
ds = DownloadStatus("si-1", 123) ds = DownloadStatus("si-1", 123)
self.failUnlessEqual(ds.get_status(), "idle") self.failUnlessEqual(ds.get_status(), "idle")
ds.add_segment_request(0, now) ev0 = ds.add_segment_request(0, now)
self.failUnlessEqual(ds.get_status(), "fetching segment 0") self.failUnlessEqual(ds.get_status(), "fetching segment 0")
ds.add_segment_delivery(0, now+1, 0, 1000, 2.0) ev0.activate(now+0.5)
ev0.deliver(now+1, 0, 1000, 2.0)
self.failUnlessEqual(ds.get_status(), "idle") self.failUnlessEqual(ds.get_status(), "idle")
ds.add_segment_request(2, now+2) ev2 = ds.add_segment_request(2, now+2)
ds.add_segment_request(1, now+2) del ev2 # hush pyflakes
ev1 = ds.add_segment_request(1, now+2)
self.failUnlessEqual(ds.get_status(), "fetching segments 1,2") self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
ds.add_segment_error(1, now+3) ev1.error(now+3)
self.failUnlessEqual(ds.get_status(), self.failUnlessEqual(ds.get_status(),
"fetching segment 2; errors on segment 1") "fetching segment 2; errors on segment 1")