storage: measure latency-per-operation, calculate mean/median/percentiles
This commit is contained in:
parent
a28b40f790
commit
6b7ff02e36
|
@ -203,13 +203,16 @@ class BucketWriter(Referenceable):
|
|||
return self._size
|
||||
|
||||
def remote_write(self, offset, data):
|
||||
start = time.time()
|
||||
precondition(not self.closed)
|
||||
if self.throw_out_all_data:
|
||||
return
|
||||
self._sharefile.write_share_data(offset, data)
|
||||
self.ss.add_latency("write", time.time() - start)
|
||||
|
||||
def remote_close(self):
|
||||
precondition(not self.closed)
|
||||
start = time.time()
|
||||
|
||||
fileutil.make_dirs(os.path.dirname(self.finalhome))
|
||||
fileutil.rename(self.incominghome, self.finalhome)
|
||||
|
@ -225,6 +228,7 @@ class BucketWriter(Referenceable):
|
|||
|
||||
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
|
||||
self.ss.bucket_writer_closed(self, filelen)
|
||||
self.ss.add_latency("close", time.time() - start)
|
||||
|
||||
def _disconnected(self):
|
||||
if not self.closed:
|
||||
|
@ -252,11 +256,15 @@ class BucketWriter(Referenceable):
|
|||
class BucketReader(Referenceable):
|
||||
implements(RIBucketReader)
|
||||
|
||||
def __init__(self, sharefname):
|
||||
def __init__(self, ss, sharefname):
|
||||
self.ss = ss
|
||||
self._share_file = ShareFile(sharefname)
|
||||
|
||||
def remote_read(self, offset, length):
|
||||
return self._share_file.read_share_data(offset, length)
|
||||
start = time.time()
|
||||
data = self._share_file.read_share_data(offset, length)
|
||||
self.ss.add_latency("read", time.time() - start)
|
||||
return data
|
||||
|
||||
|
||||
# the MutableShareFile is like the ShareFile, but used for mutable data. It
|
||||
|
@ -704,6 +712,47 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
log.msg(format="space measurement done, consumed=%(consumed)d bytes",
|
||||
consumed=self.consumed,
|
||||
parent=lp, facility="tahoe.storage")
|
||||
self.latencies = {"allocate": [], # immutable
|
||||
"write": [],
|
||||
"close": [],
|
||||
"read": [],
|
||||
"renew": [],
|
||||
"cancel": [],
|
||||
"get": [],
|
||||
"writev": [], # mutable
|
||||
"readv": [],
|
||||
}
|
||||
|
||||
def add_latency(self, category, latency):
|
||||
a = self.latencies[category]
|
||||
a.append(latency)
|
||||
if len(a) > 1000:
|
||||
self.latencies[category] = a[-1000:]
|
||||
|
||||
def get_latencies(self):
|
||||
"""Return a dict, indexed by category, that contains a dict of
|
||||
latency numbers for each category. Each dict will contain the
|
||||
following keys: mean, median, 90_percentile, 95_percentile,
|
||||
99_percentile). If no samples have been collected for the given
|
||||
category, then that category name will not be present in the return
|
||||
value."""
|
||||
# note that Amazon's Dynamo paper says they use 99.9% percentile.
|
||||
output = {}
|
||||
for category in self.latencies:
|
||||
if not self.latencies[category]:
|
||||
continue
|
||||
stats = {}
|
||||
samples = self.latencies[category][:]
|
||||
samples.sort()
|
||||
count = len(samples)
|
||||
stats["mean"] = sum(samples) / count
|
||||
stats["median"] = samples[int(0.5 * count)]
|
||||
stats["90_percentile"] = samples[int(0.9 * count)]
|
||||
stats["95_percentile"] = samples[int(0.95 * count)]
|
||||
stats["99_percentile"] = samples[int(0.99 * count)]
|
||||
stats["999_percentile"] = samples[int(0.999 * count)]
|
||||
output[category] = stats
|
||||
return output
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "facility" not in kwargs:
|
||||
|
@ -747,6 +796,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
# owner_num is not for clients to set, but rather it should be
|
||||
# curried into the PersonalStorageServer instance that is dedicated
|
||||
# to a particular owner.
|
||||
start = time.time()
|
||||
alreadygot = set()
|
||||
bucketwriters = {} # k: shnum, v: BucketWriter
|
||||
si_dir = storage_index_to_dir(storage_index)
|
||||
|
@ -778,6 +828,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
|
||||
if self.readonly_storage:
|
||||
# we won't accept new shares
|
||||
self.add_latency("allocate", time.time() - start)
|
||||
return alreadygot, bucketwriters
|
||||
|
||||
for shnum in sharenums:
|
||||
|
@ -809,9 +860,11 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
if bucketwriters:
|
||||
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
|
||||
|
||||
self.add_latency("allocate", time.time() - start)
|
||||
return alreadygot, bucketwriters
|
||||
|
||||
def remote_renew_lease(self, storage_index, renew_secret):
|
||||
start = time.time()
|
||||
new_expire_time = time.time() + 31*24*60*60
|
||||
found_buckets = False
|
||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||
|
@ -829,10 +882,12 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
else:
|
||||
pass # non-sharefile
|
||||
sf.renew_lease(renew_secret, new_expire_time)
|
||||
self.add_latency("renew", time.time() - start)
|
||||
if not found_buckets:
|
||||
raise IndexError("no such lease to renew")
|
||||
|
||||
def remote_cancel_lease(self, storage_index, cancel_secret):
|
||||
start = time.time()
|
||||
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
|
||||
|
||||
remaining_files = 0
|
||||
|
@ -873,6 +928,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
self.consumed -= total_space_freed
|
||||
if self.stats_provider:
|
||||
self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
|
||||
self.add_latency("cancel", time.time() - start)
|
||||
if not found_buckets:
|
||||
raise IndexError("no such lease to cancel")
|
||||
|
||||
|
@ -908,11 +964,13 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
pass
|
||||
|
||||
def remote_get_buckets(self, storage_index):
|
||||
start = time.time()
|
||||
si_s = si_b2a(storage_index)
|
||||
log.msg("storage: get_buckets %s" % si_s)
|
||||
bucketreaders = {} # k: sharenum, v: BucketReader
|
||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||
bucketreaders[shnum] = BucketReader(filename)
|
||||
bucketreaders[shnum] = BucketReader(self, filename)
|
||||
self.add_latency("get", time.time() - start)
|
||||
return bucketreaders
|
||||
|
||||
def get_leases(self, storage_index):
|
||||
|
@ -936,6 +994,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
secrets,
|
||||
test_and_write_vectors,
|
||||
read_vector):
|
||||
start = time.time()
|
||||
si_s = si_b2a(storage_index)
|
||||
lp = log.msg("storage: slot_writev %s" % si_s)
|
||||
si_dir = storage_index_to_dir(storage_index)
|
||||
|
@ -1002,6 +1061,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
share.add_or_renew_lease(lease_info)
|
||||
|
||||
# all done
|
||||
self.add_latency("writev", time.time() - start)
|
||||
return (testv_is_good, read_data)
|
||||
|
||||
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
|
||||
|
@ -1015,6 +1075,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
return share
|
||||
|
||||
def remote_slot_readv(self, storage_index, shares, readv):
|
||||
start = time.time()
|
||||
si_s = si_b2a(storage_index)
|
||||
lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
|
||||
facility="tahoe.storage", level=log.OPERATIONAL)
|
||||
|
@ -1022,6 +1083,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
# shares exist if there is a file for them
|
||||
bucketdir = os.path.join(self.sharedir, si_dir)
|
||||
if not os.path.isdir(bucketdir):
|
||||
self.add_latency("readv", time.time() - start)
|
||||
return {}
|
||||
datavs = {}
|
||||
for sharenum_s in os.listdir(bucketdir):
|
||||
|
@ -1035,6 +1097,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||
datavs[sharenum] = msf.readv(readv)
|
||||
log.msg("returning shares %s" % (datavs.keys(),),
|
||||
facility="tahoe.storage", level=log.NOISY, parent=lp)
|
||||
self.add_latency("readv", time.time() - start)
|
||||
return datavs
|
||||
|
||||
|
||||
|
|
|
@ -29,6 +29,8 @@ class Bucket(unittest.TestCase):
|
|||
|
||||
def bucket_writer_closed(self, bw, consumed):
|
||||
pass
|
||||
def add_latency(self, category, latency):
|
||||
pass
|
||||
|
||||
def make_lease(self):
|
||||
owner_num = 0
|
||||
|
@ -57,7 +59,7 @@ class Bucket(unittest.TestCase):
|
|||
bw.remote_close()
|
||||
|
||||
# now read from it
|
||||
br = BucketReader(bw.finalhome)
|
||||
br = BucketReader(self, bw.finalhome)
|
||||
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
|
||||
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
|
||||
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
|
||||
|
@ -92,6 +94,8 @@ class BucketProxy(unittest.TestCase):
|
|||
|
||||
def bucket_writer_closed(self, bw, consumed):
|
||||
pass
|
||||
def add_latency(self, category, latency):
|
||||
pass
|
||||
|
||||
def test_create(self):
|
||||
bw, rb, sharefname = self.make_bucket("test_create", 500)
|
||||
|
@ -147,7 +151,7 @@ class BucketProxy(unittest.TestCase):
|
|||
|
||||
# now read everything back
|
||||
def _start_reading(res):
|
||||
br = BucketReader(sharefname)
|
||||
br = BucketReader(self, sharefname)
|
||||
rb = RemoteBucket()
|
||||
rb.target = br
|
||||
rbp = ReadBucketProxy(rb)
|
||||
|
@ -910,3 +914,66 @@ class MutableServer(unittest.TestCase):
|
|||
no_shares = read("si1", [], [(0,10)])
|
||||
self.failUnlessEqual(no_shares, {})
|
||||
|
||||
class Stats(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.sparent = LoggingServiceParent()
|
||||
self._lease_secret = itertools.count()
|
||||
def tearDown(self):
|
||||
return self.sparent.stopService()
|
||||
|
||||
def workdir(self, name):
|
||||
basedir = os.path.join("storage", "Server", name)
|
||||
return basedir
|
||||
|
||||
def create(self, name, sizelimit=None):
|
||||
workdir = self.workdir(name)
|
||||
ss = StorageServer(workdir, sizelimit)
|
||||
ss.setServiceParent(self.sparent)
|
||||
return ss
|
||||
|
||||
def test_latencies(self):
|
||||
ss = self.create("test_latencies")
|
||||
for i in range(10000):
|
||||
ss.add_latency("allocate", 1.0 * i)
|
||||
for i in range(1000):
|
||||
ss.add_latency("renew", 1.0 * i)
|
||||
for i in range(10):
|
||||
ss.add_latency("cancel", 2.0 * i)
|
||||
ss.add_latency("get", 5.0)
|
||||
|
||||
output = ss.get_latencies()
|
||||
|
||||
self.failUnlessEqual(sorted(output.keys()),
|
||||
sorted(["allocate", "renew", "cancel", "get"]))
|
||||
self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
|
||||
self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
|
||||
self.failUnless(abs(output["allocate"]["median"] - 9500) < 1)
|
||||
self.failUnless(abs(output["allocate"]["90_percentile"] - 9900) < 1)
|
||||
self.failUnless(abs(output["allocate"]["95_percentile"] - 9950) < 1)
|
||||
self.failUnless(abs(output["allocate"]["99_percentile"] - 9990) < 1)
|
||||
self.failUnless(abs(output["allocate"]["999_percentile"] - 9999) < 1)
|
||||
|
||||
self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
|
||||
self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
|
||||
self.failUnless(abs(output["renew"]["median"] - 500) < 1)
|
||||
self.failUnless(abs(output["renew"]["90_percentile"] - 900) < 1)
|
||||
self.failUnless(abs(output["renew"]["95_percentile"] - 950) < 1)
|
||||
self.failUnless(abs(output["renew"]["99_percentile"] - 990) < 1)
|
||||
self.failUnless(abs(output["renew"]["999_percentile"] - 999) < 1)
|
||||
|
||||
self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
|
||||
self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
|
||||
self.failUnless(abs(output["cancel"]["median"] - 10) < 1)
|
||||
self.failUnless(abs(output["cancel"]["90_percentile"] - 18) < 1)
|
||||
self.failUnless(abs(output["cancel"]["95_percentile"] - 18) < 1)
|
||||
self.failUnless(abs(output["cancel"]["99_percentile"] - 18) < 1)
|
||||
self.failUnless(abs(output["cancel"]["999_percentile"] - 18) < 1)
|
||||
|
||||
self.failUnlessEqual(len(ss.latencies["get"]), 1)
|
||||
self.failUnless(abs(output["get"]["mean"] - 5) < 1)
|
||||
self.failUnless(abs(output["get"]["median"] - 5) < 1)
|
||||
self.failUnless(abs(output["get"]["90_percentile"] - 5) < 1)
|
||||
self.failUnless(abs(output["get"]["95_percentile"] - 5) < 1)
|
||||
self.failUnless(abs(output["get"]["99_percentile"] - 5) < 1)
|
||||
self.failUnless(abs(output["get"]["999_percentile"] - 5) < 1)
|
||||
|
|
Loading…
Reference in New Issue