crawler: modify API to support upcoming bucket-counting crawler

This commit is contained in:
Brian Warner 2009-02-19 19:31:42 -07:00
parent 9bc08158c6
commit ef4ff21ae7
2 changed files with 137 additions and 50 deletions

View File

@ -1,5 +1,6 @@
import os, time, struct, pickle
import os, time, struct
import cPickle as pickle
from twisted.internet import reactor
from twisted.application import service
from allmydata.storage.server import si_b2a
@ -25,7 +26,10 @@ class ShareCrawler(service.MultiService):
To use this, create a subclass which implements the process_bucket()
method. It will be called with a prefixdir and a base32 storage index
string. process_bucket() should run synchronously.
string. process_bucket() should run synchronously. Any keys added to
self.state will be preserved. Override add_initial_state() to set up
initial state keys. Override finished_cycle() to perform additional
processing when the cycle is complete.
Then create an instance, with a reference to a StorageServer and a
filename where it can store persistent state. The statefile is used to
@ -39,15 +43,15 @@ class ShareCrawler(service.MultiService):
the Deferred that it returns.
"""
# use up to 10% of the CPU, on average. This can be changed at any time.
allowed_cpu_percentage = .10
# use up to 1.0 seconds before yielding. This can be changed at any time.
cpu_slice = 1.0
# don't run a cycle faster than this
minimum_cycle_time = 300
# all three of these can be changed at any time
allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
cpu_slice = 1.0 # use up to 1.0 seconds before yielding
minimum_cycle_time = 300 # don't run a cycle faster than this
def __init__(self, server, statefile):
def __init__(self, server, statefile, allowed_cpu_percentage=None):
service.MultiService.__init__(self)
if allowed_cpu_percentage is not None:
self.allowed_cpu_percentage = allowed_cpu_percentage
self.server = server
self.sharedir = server.sharedir
self.statefile = statefile
@ -56,24 +60,73 @@ class ShareCrawler(service.MultiService):
self.prefixes.sort()
self.timer = None
self.bucket_cache = (None, [])
self.first_cycle_finished = False
self.current_sleep_time = None
self.next_wake_time = None
def get_state(self):
"""I return the current state of the crawler. This is a copy of my
state dictionary, plus the following keys::
current-sleep-time: float, duration of our current sleep
next-wake-time: float, seconds-since-epoch of when we will next wake
If we are not currently sleeping (i.e. get_state() was called from
inside the process_prefixdir, process_bucket, or finished_cycle()
methods, or if startService has not yet been called on this crawler),
these two keys will be None.
"""
state = self.state.copy() # it isn't a deepcopy, so don't go crazy
state["current-sleep-time"] = self.current_sleep_time
state["next-wake-time"] = self.next_wake_time
return state
def load_state(self):
# we use this to store state for both the crawler's internals and
# anything the subclass-specific code needs. The state is stored
# after each bucket is processed, after each prefixdir is processed,
# and after a cycle is complete. The internal keys we use are:
# ["version"]: int, always 1
# ["last-cycle-finished"]: int, or None if we have not yet finished
# any cycle
# ["current-cycle"]: int, or None if we are sleeping between cycles
# ["last-complete-prefix"]: str, two-letter name of the last prefixdir
# that was fully processed, or None if we
# are sleeping between cycles, or if we
# have not yet finished any prefixdir since
# a cycle was started
# ["last-complete-bucket"]: str, base32 storage index bucket name
# of the last bucket to be processed, or
# None if we are sleeping between cycles
try:
f = open(self.statefile, "rb")
state = pickle.load(f)
f.close()
except EnvironmentError:
state = {"version": 1,
"last-cycle-finished": None,
"current-cycle": 0,
"last-complete-prefix": None,
"last-complete-bucket": None,
}
self.state = state
lcp = state["last-complete-prefix"]
if lcp == None:
self.last_complete_prefix_index = -1
else:
self.last_complete_prefix_index = self.prefixes.index(lcp)
self.last_complete_bucket = state["last-complete-bucket"]
self.first_cycle_finished = state["first-cycle-finished"]
f.close()
except EnvironmentError:
self.last_complete_prefix_index = -1
self.last_complete_bucket = None
self.first_cycle_finished = False
self.add_initial_state()
def add_initial_state(self):
"""Hook method to add extra keys to self.state when first loaded.
The first time this Crawler is used, or when the code has been
upgraded, the saved state file may not contain all the keys you
expect. Use this method to add any missing keys. Simply modify
self.state as needed.
This method for subclasses to override. No upcall is necessary.
"""
pass
def save_state(self):
lcpi = self.last_complete_prefix_index
@ -81,14 +134,10 @@ class ShareCrawler(service.MultiService):
last_complete_prefix = None
else:
last_complete_prefix = self.prefixes[lcpi]
state = {"version": 1,
"last-complete-prefix": last_complete_prefix,
"last-complete-bucket": self.last_complete_bucket,
"first-cycle-finished": self.first_cycle_finished,
}
self.state["last-complete-prefix"] = last_complete_prefix
tmpfile = self.statefile + ".tmp"
f = open(tmpfile, "wb")
pickle.dump(state, f)
pickle.dump(self.state, f)
f.close()
fileutil.move_into_place(tmpfile, self.statefile)
@ -105,6 +154,8 @@ class ShareCrawler(service.MultiService):
def start_slice(self):
self.timer = None
self.current_sleep_time = None
self.next_wake_time = None
start_slice = time.time()
try:
self.start_current_prefix(start_slice)
@ -112,7 +163,8 @@ class ShareCrawler(service.MultiService):
except TimeSliceExceeded:
finished_cycle = False
# either we finished a whole cycle, or we ran out of time
this_slice = time.time() - start_slice
now = time.time()
this_slice = now - start_slice
# this_slice/(this_slice+sleep_time) = percentage
# this_slice/percentage = this_slice+sleep_time
# sleep_time = (this_slice/percentage) - this_slice
@ -128,10 +180,16 @@ class ShareCrawler(service.MultiService):
else:
self.sleeping_between_cycles = False
self.current_sleep_time = sleep_time # for status page
self.next_wake_time = now + sleep_time
self.yielding(sleep_time)
self.timer = reactor.callLater(sleep_time, self.start_slice)
def start_current_prefix(self, start_slice):
if self.state["current-cycle"] is None:
assert self.state["last-cycle-finished"] is not None
self.state["current-cycle"] = self.state["last-cycle-finished"] + 1
cycle = self.state["current-cycle"]
for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
if time.time() > start_slice + self.cpu_slice:
raise TimeSliceExceeded()
@ -147,17 +205,19 @@ class ShareCrawler(service.MultiService):
except EnvironmentError:
buckets = []
self.bucket_cache = (i, buckets)
self.process_prefixdir(prefixdir, buckets, start_slice)
self.process_prefixdir(cycle, prefix, prefixdir,
buckets, start_slice)
self.last_complete_prefix_index = i
self.save_state()
# yay! we finished the whole cycle
self.last_complete_prefix_index = -1
self.last_complete_bucket = None
self.first_cycle_finished = True
self.state["last-complete-bucket"] = None
self.state["last-cycle-finished"] = cycle
self.state["current-cycle"] = None
self.finished_cycle(cycle)
self.save_state()
self.finished_cycle()
def process_prefixdir(self, prefixdir, buckets, start_slice):
def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
"""This gets a list of bucket names (i.e. storage index strings,
base32-encoded) in sorted order.
@ -166,20 +226,43 @@ class ShareCrawler(service.MultiService):
are being managed by this server.
"""
for bucket in buckets:
if bucket <= self.last_complete_bucket:
if bucket <= self.state["last-complete-bucket"]:
continue
if time.time() > start_slice + self.cpu_slice:
raise TimeSliceExceeded()
self.process_bucket(prefixdir, bucket)
self.last_complete_bucket = bucket
self.process_bucket(cycle, prefix, prefixdir, bucket)
self.state["last-complete-bucket"] = bucket
self.save_state()
def process_bucket(self, prefixdir, storage_index_b32):
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
"""Examine a single bucket. Subclasses should do whatever they want
to do to the shares therein, then update self.state as necessary.
This method will be called exactly once per share (per cycle), unless
the crawler was interrupted (by node restart, for example), in which
case it might be called a second time on a bucket which was processed
during the previous node's incarnation. However, in that case, no
changes to self.state will have been recorded.
This method for subclasses to override. No upcall is necessary.
"""
pass
def finished_cycle(self):
def finished_cycle(self, cycle):
"""Notify subclass that a cycle (one complete traversal of all
prefixdirs) has just finished. 'cycle' is the number of the cycle
that just finished. This method should perform summary work and
update self.state to publish information to status displays.
This method for subclasses to override. No upcall is necessary.
"""
pass
def yielding(self, sleep_time):
"""The crawler is about to sleep for 'sleep_time' seconds. This
method is mostly for the convenience of unit tests.
This method for subclasses to override. No upcall is necessary.
"""
pass

View File

@ -15,23 +15,23 @@ from common_util import StallMixin
class BucketEnumeratingCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
def __init__(self, server, statefile):
ShareCrawler.__init__(self, server, statefile)
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
self.all_buckets = []
self.finished_d = defer.Deferred()
def process_bucket(self, prefixdir, storage_index_b32):
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
self.all_buckets.append(storage_index_b32)
def finished_cycle(self):
def finished_cycle(self, cycle):
eventually(self.finished_d.callback, None)
class PacedCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
def __init__(self, server, statefile):
ShareCrawler.__init__(self, server, statefile)
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
self.countdown = 6
self.all_buckets = []
self.finished_d = defer.Deferred()
def process_bucket(self, prefixdir, storage_index_b32):
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
self.all_buckets.append(storage_index_b32)
self.countdown -= 1
if self.countdown == 0:
@ -39,7 +39,7 @@ class PacedCrawler(ShareCrawler):
self.cpu_slice = -1.0
def yielding(self, sleep_time):
self.cpu_slice = 500
def finished_cycle(self):
def finished_cycle(self, cycle):
eventually(self.finished_d.callback, None)
class ConsumingCrawler(ShareCrawler):
@ -47,18 +47,18 @@ class ConsumingCrawler(ShareCrawler):
allowed_cpu_percentage = 0.5
minimum_cycle_time = 0
def __init__(self, server, statefile):
ShareCrawler.__init__(self, server, statefile)
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
self.accumulated = 0.0
self.cycles = 0
self.last_yield = 0.0
def process_bucket(self, prefixdir, storage_index_b32):
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
start = time.time()
time.sleep(0.05)
elapsed = time.time() - start
self.accumulated += elapsed
self.last_yield += elapsed
def finished_cycle(self):
def finished_cycle(self, cycle):
self.cycles += 1
def yielding(self, sleep_time):
self.last_yield = 0.0
@ -99,7 +99,7 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
sis = [self.write(i, ss, serverid) for i in range(10)]
statefile = os.path.join(self.basedir, "statefile")
c = BucketEnumeratingCrawler(ss, statefile)
c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1)
c.load_state()
c.start_current_prefix(time.time())
@ -322,7 +322,11 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
# empty methods in the base class
def _check():
return c.first_cycle_finished
return bool(c.state["last-cycle-finished"] is not None)
d = self.poll(_check)
def _done(ignored):
state = c.get_state()
self.failUnless(state["last-cycle-finished"] is not None)
d.addCallback(_done)
return d