Removes BucketCounter
This commit is contained in:
parent
50a617f7c6
commit
b9f1d00fad
|
@ -423,46 +423,3 @@ class ShareCrawler(HookMixin, service.MultiService):
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class BucketCountingCrawler(ShareCrawler):
|
|
||||||
"""
|
|
||||||
I keep track of how many sharesets, each corresponding to a storage index,
|
|
||||||
are being managed by this server. This is equivalent to the number of
|
|
||||||
distributed files and directories for which I am providing storage. The
|
|
||||||
actual number of files and directories in the full grid is probably higher
|
|
||||||
(especially when there are more servers than 'N', the number of generated
|
|
||||||
shares), because some files and directories will have shares on other
|
|
||||||
servers instead of me. Also note that the number of sharesets will differ
|
|
||||||
from the number of shares in small grids, when more than one share is
|
|
||||||
placed on a single server.
|
|
||||||
"""
|
|
||||||
|
|
||||||
minimum_cycle_time = 60*60 # we don't need this more than once an hour
|
|
||||||
|
|
||||||
def add_initial_state(self):
|
|
||||||
# ["bucket-counts"][cyclenum][prefix] = number
|
|
||||||
# ["last-complete-cycle"] = cyclenum # maintained by base class
|
|
||||||
# ["last-complete-bucket-count"] = number
|
|
||||||
self.state.setdefault("bucket-counts", {})
|
|
||||||
self.state.setdefault("last-complete-bucket-count", None)
|
|
||||||
|
|
||||||
def process_prefix(self, cycle, prefix, start_slice):
|
|
||||||
# We don't need to look at the individual sharesets.
|
|
||||||
d = self.backend.get_sharesets_for_prefix(prefix)
|
|
||||||
def _got_sharesets(sharesets):
|
|
||||||
if cycle not in self.state["bucket-counts"]:
|
|
||||||
self.state["bucket-counts"][cycle] = {}
|
|
||||||
self.state["bucket-counts"][cycle][prefix] = len(sharesets)
|
|
||||||
d.addCallback(_got_sharesets)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def finished_cycle(self, cycle):
|
|
||||||
last_counts = self.state["bucket-counts"].get(cycle, [])
|
|
||||||
if len(last_counts) == len(self.prefixes):
|
|
||||||
# great, we have a whole cycle.
|
|
||||||
num_sharesets = sum(last_counts.values())
|
|
||||||
self.state["last-complete-bucket-count"] = num_sharesets
|
|
||||||
# get rid of old counts
|
|
||||||
for old_cycle in list(self.state["bucket-counts"].keys()):
|
|
||||||
if old_cycle != cycle:
|
|
||||||
del self.state["bucket-counts"][old_cycle]
|
|
||||||
|
|
|
@ -13,7 +13,6 @@ import allmydata # for __full_version__
|
||||||
from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
|
from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
|
||||||
_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
|
_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
|
||||||
from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
|
from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
|
||||||
from allmydata.storage.crawler import BucketCountingCrawler
|
|
||||||
from allmydata.storage.accountant import Accountant
|
from allmydata.storage.accountant import Accountant
|
||||||
from allmydata.storage.expiration import ExpirationPolicy
|
from allmydata.storage.expiration import ExpirationPolicy
|
||||||
|
|
||||||
|
@ -21,7 +20,6 @@ from allmydata.storage.expiration import ExpirationPolicy
|
||||||
class StorageServer(service.MultiService):
|
class StorageServer(service.MultiService):
|
||||||
implements(IStatsProducer)
|
implements(IStatsProducer)
|
||||||
name = 'storage'
|
name = 'storage'
|
||||||
BucketCounterClass = BucketCountingCrawler
|
|
||||||
DEFAULT_EXPIRATION_POLICY = ExpirationPolicy(enabled=False)
|
DEFAULT_EXPIRATION_POLICY = ExpirationPolicy(enabled=False)
|
||||||
|
|
||||||
def __init__(self, serverid, backend, statedir,
|
def __init__(self, serverid, backend, statedir,
|
||||||
|
@ -64,7 +62,6 @@ class StorageServer(service.MultiService):
|
||||||
"cancel": [],
|
"cancel": [],
|
||||||
}
|
}
|
||||||
|
|
||||||
self.init_bucket_counter()
|
|
||||||
self.init_accountant(expiration_policy or self.DEFAULT_EXPIRATION_POLICY)
|
self.init_accountant(expiration_policy or self.DEFAULT_EXPIRATION_POLICY)
|
||||||
|
|
||||||
def init_accountant(self, expiration_policy):
|
def init_accountant(self, expiration_policy):
|
||||||
|
@ -83,21 +80,12 @@ class StorageServer(service.MultiService):
|
||||||
def get_expiration_policy(self):
|
def get_expiration_policy(self):
|
||||||
return self.accountant.get_accounting_crawler().get_expiration_policy()
|
return self.accountant.get_accounting_crawler().get_expiration_policy()
|
||||||
|
|
||||||
def get_bucket_counter(self):
|
|
||||||
return self.bucket_counter
|
|
||||||
|
|
||||||
def get_serverid(self):
|
def get_serverid(self):
|
||||||
return self._serverid
|
return self._serverid
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.get_serverid()),)
|
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.get_serverid()),)
|
||||||
|
|
||||||
def init_bucket_counter(self):
|
|
||||||
statefile = os.path.join(self._statedir, "bucket_counter.state")
|
|
||||||
self.bucket_counter = self.BucketCounterClass(self.backend, statefile,
|
|
||||||
clock=self.clock)
|
|
||||||
self.bucket_counter.setServiceParent(self)
|
|
||||||
|
|
||||||
def count(self, name, delta=1):
|
def count(self, name, delta=1):
|
||||||
if self.stats_provider:
|
if self.stats_provider:
|
||||||
self.stats_provider.count("storage_server." + name, delta)
|
self.stats_provider.count("storage_server." + name, delta)
|
||||||
|
|
|
@ -4810,129 +4810,6 @@ def remove_tags(s):
|
||||||
return s
|
return s
|
||||||
|
|
||||||
|
|
||||||
class BucketCounterTest(WithDiskBackend, CrawlerTestMixin, ReallyEqualMixin, unittest.TestCase):
|
|
||||||
def test_bucket_counter(self):
|
|
||||||
server = self.create("test_bucket_counter", detached=True)
|
|
||||||
bucket_counter = server.bucket_counter
|
|
||||||
|
|
||||||
# finish as fast as possible
|
|
||||||
bucket_counter.slow_start = 0
|
|
||||||
bucket_counter.cpu_slice = 100.0
|
|
||||||
|
|
||||||
d = server.bucket_counter.set_hook('after_prefix')
|
|
||||||
|
|
||||||
server.setServiceParent(self.sparent)
|
|
||||||
|
|
||||||
w = StorageStatus(server)
|
|
||||||
|
|
||||||
# this sample is before the crawler has started doing anything
|
|
||||||
html = w.renderSynchronously()
|
|
||||||
self.failUnlessIn("<h1>Storage Server Status</h1>", html)
|
|
||||||
s = remove_tags(html)
|
|
||||||
self.failUnlessIn("Accepting new shares: Yes", s)
|
|
||||||
self.failUnlessIn("Reserved space: - 0 B (0)", s)
|
|
||||||
self.failUnlessIn("Total sharesets: Not computed yet", s)
|
|
||||||
self.failUnlessIn("Next crawl in", s)
|
|
||||||
|
|
||||||
def _after_first_prefix(prefix):
|
|
||||||
server.bucket_counter.save_state()
|
|
||||||
state = bucket_counter.get_state()
|
|
||||||
self.failUnlessEqual(prefix, state["last-complete-prefix"])
|
|
||||||
self.failUnlessEqual(prefix, bucket_counter.prefixes[0])
|
|
||||||
|
|
||||||
html = w.renderSynchronously()
|
|
||||||
s = remove_tags(html)
|
|
||||||
self.failUnlessIn(" Current crawl ", s)
|
|
||||||
self.failUnlessIn(" (next work in ", s)
|
|
||||||
|
|
||||||
return bucket_counter.set_hook('after_cycle')
|
|
||||||
d.addCallback(_after_first_prefix)
|
|
||||||
|
|
||||||
def _after_first_cycle(cycle):
|
|
||||||
self.failUnlessEqual(cycle, 0)
|
|
||||||
progress = bucket_counter.get_progress()
|
|
||||||
self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
|
|
||||||
d.addCallback(_after_first_cycle)
|
|
||||||
d.addBoth(self._wait_for_yield, bucket_counter)
|
|
||||||
|
|
||||||
def _after_yield(ign):
|
|
||||||
html = w.renderSynchronously()
|
|
||||||
s = remove_tags(html)
|
|
||||||
self.failUnlessIn("Total sharesets: 0 (the number of", s)
|
|
||||||
self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
|
|
||||||
d.addCallback(_after_yield)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def test_bucket_counter_cleanup(self):
|
|
||||||
server = self.create("test_bucket_counter_cleanup", detached=True)
|
|
||||||
bucket_counter = server.bucket_counter
|
|
||||||
|
|
||||||
# finish as fast as possible
|
|
||||||
bucket_counter.slow_start = 0
|
|
||||||
bucket_counter.cpu_slice = 100.0
|
|
||||||
|
|
||||||
d = bucket_counter.set_hook('after_prefix')
|
|
||||||
|
|
||||||
server.setServiceParent(self.sparent)
|
|
||||||
|
|
||||||
def _after_first_prefix(prefix):
|
|
||||||
bucket_counter.save_state()
|
|
||||||
state = bucket_counter.state
|
|
||||||
self.failUnlessEqual(prefix, state["last-complete-prefix"])
|
|
||||||
self.failUnlessEqual(prefix, bucket_counter.prefixes[0])
|
|
||||||
|
|
||||||
# now sneak in and mess with its state, to make sure it cleans up
|
|
||||||
# properly at the end of the cycle
|
|
||||||
state["bucket-counts"][-12] = {}
|
|
||||||
bucket_counter.save_state()
|
|
||||||
|
|
||||||
return bucket_counter.set_hook('after_cycle')
|
|
||||||
d.addCallback(_after_first_prefix)
|
|
||||||
|
|
||||||
def _after_first_cycle(cycle):
|
|
||||||
self.failUnlessEqual(cycle, 0)
|
|
||||||
progress = bucket_counter.get_progress()
|
|
||||||
self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
|
|
||||||
|
|
||||||
s = bucket_counter.get_state()
|
|
||||||
self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
|
|
||||||
d.addCallback(_after_first_cycle)
|
|
||||||
d.addBoth(self._wait_for_yield, bucket_counter)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def test_bucket_counter_eta(self):
|
|
||||||
server = self.create("test_bucket_counter_eta", detached=True)
|
|
||||||
bucket_counter = server.bucket_counter
|
|
||||||
|
|
||||||
# finish as fast as possible
|
|
||||||
bucket_counter.slow_start = 0
|
|
||||||
bucket_counter.cpu_slice = 100.0
|
|
||||||
|
|
||||||
d = bucket_counter.set_hook('after_prefix')
|
|
||||||
|
|
||||||
server.setServiceParent(self.sparent)
|
|
||||||
|
|
||||||
w = StorageStatus(server)
|
|
||||||
|
|
||||||
def _check_1(prefix1):
|
|
||||||
# no ETA is available yet
|
|
||||||
html = w.renderSynchronously()
|
|
||||||
s = remove_tags(html)
|
|
||||||
self.failUnlessIn("complete (next work", s)
|
|
||||||
|
|
||||||
return bucket_counter.set_hook('after_prefix')
|
|
||||||
d.addCallback(_check_1)
|
|
||||||
|
|
||||||
def _check_2(prefix2):
|
|
||||||
# an ETA based upon elapsed time should be available.
|
|
||||||
html = w.renderSynchronously()
|
|
||||||
s = remove_tags(html)
|
|
||||||
self.failUnlessIn("complete (ETA ", s)
|
|
||||||
d.addCallback(_check_2)
|
|
||||||
d.addBoth(self._wait_for_yield, bucket_counter)
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
class AccountingCrawlerTest(CrawlerTestMixin, WebRenderingMixin, ReallyEqualMixin):
|
class AccountingCrawlerTest(CrawlerTestMixin, WebRenderingMixin, ReallyEqualMixin):
|
||||||
def make_shares(self, server):
|
def make_shares(self, server):
|
||||||
aa = server.get_accountant().get_anonymous_account()
|
aa = server.get_accountant().get_anonymous_account()
|
||||||
|
|
|
@ -29,9 +29,8 @@ class StorageStatus(rend.Page):
|
||||||
def render_JSON(self, req):
|
def render_JSON(self, req):
|
||||||
req.setHeader("content-type", "text/plain")
|
req.setHeader("content-type", "text/plain")
|
||||||
accounting_crawler = self.storage.get_accounting_crawler()
|
accounting_crawler = self.storage.get_accounting_crawler()
|
||||||
bucket_counter = self.storage.get_bucket_counter()
|
|
||||||
d = {"stats": self.storage.get_stats(),
|
d = {"stats": self.storage.get_stats(),
|
||||||
"bucket-counter": bucket_counter.get_state(),
|
"bucket-counter": None,
|
||||||
"lease-checker": accounting_crawler.get_state(),
|
"lease-checker": accounting_crawler.get_state(),
|
||||||
"lease-checker-progress": accounting_crawler.get_progress(),
|
"lease-checker-progress": accounting_crawler.get_progress(),
|
||||||
}
|
}
|
||||||
|
@ -95,15 +94,13 @@ class StorageStatus(rend.Page):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def data_last_complete_bucket_count(self, ctx, data):
|
def data_last_complete_bucket_count(self, ctx, data):
|
||||||
s = self.storage.get_bucket_counter().get_state()
|
s = self.storage.get_stats()
|
||||||
count = s.get("last-complete-bucket-count")
|
if "storage_server.total_bucket_count" not in s:
|
||||||
if count is None:
|
|
||||||
return "Not computed yet"
|
return "Not computed yet"
|
||||||
return count
|
return s['storage_server.total_bucket_count']
|
||||||
|
|
||||||
def render_count_crawler_status(self, ctx, storage):
|
def render_count_crawler_status(self, ctx, storage):
|
||||||
p = self.storage.get_bucket_counter().get_progress()
|
return ctx.tag
|
||||||
return ctx.tag[self.format_crawler_progress(p)]
|
|
||||||
|
|
||||||
def format_crawler_progress(self, p):
|
def format_crawler_progress(self, p):
|
||||||
cycletime = p["estimated-time-per-cycle"]
|
cycletime = p["estimated-time-per-cycle"]
|
||||||
|
|
Loading…
Reference in New Issue