leasedb/accounting crawler: only treat stable shares as disappeared or unleased.
fixes #1921 Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
This commit is contained in:
parent
76e7c5b97a
commit
9f6d12691e
|
@ -113,9 +113,9 @@ The accounting crawler may perform the following functions (but see ticket
|
||||||
corrupted. This is handled in the same way as upgrading from a previous
|
corrupted. This is handled in the same way as upgrading from a previous
|
||||||
version.
|
version.
|
||||||
|
|
||||||
- Detect shares that have unexpectedly disappeared from storage. The
|
- Detect shares with stable entries in the leasedb that have unexpectedly
|
||||||
disappearance of a share is logged, and its entry and leases are removed
|
disappeared from storage. The disappearance of a share is logged, and its
|
||||||
from the leasedb.
|
entry and leases are removed from the leasedb.
|
||||||
|
|
||||||
|
|
||||||
Accounts
|
Accounts
|
||||||
|
|
|
@ -4,10 +4,12 @@ import time
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from allmydata.util.deferredutil import for_items
|
from allmydata.util.deferredutil import for_items
|
||||||
|
from allmydata.util.assertutil import _assert
|
||||||
from allmydata.util import log
|
from allmydata.util import log
|
||||||
from allmydata.storage.crawler import ShareCrawler
|
from allmydata.storage.crawler import ShareCrawler
|
||||||
from allmydata.storage.common import si_a2b
|
from allmydata.storage.common import si_a2b
|
||||||
from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN, SHARETYPE_CORRUPTED
|
from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN, SHARETYPE_CORRUPTED, \
|
||||||
|
STATE_STABLE
|
||||||
|
|
||||||
|
|
||||||
class AccountingCrawler(ShareCrawler):
|
class AccountingCrawler(ShareCrawler):
|
||||||
|
@ -73,7 +75,7 @@ class AccountingCrawler(ShareCrawler):
|
||||||
# updated, and removed.
|
# updated, and removed.
|
||||||
for key, value in db_sharemap.iteritems():
|
for key, value in db_sharemap.iteritems():
|
||||||
(si_s, shnum) = key
|
(si_s, shnum) = key
|
||||||
(used_space, sharetype) = value
|
(used_space, sharetype, state) = value
|
||||||
|
|
||||||
examined_sharesets[sharetype].add(si_s)
|
examined_sharesets[sharetype].add(si_s)
|
||||||
|
|
||||||
|
@ -92,30 +94,35 @@ class AccountingCrawler(ShareCrawler):
|
||||||
stored_shares = set(stored_sharemap)
|
stored_shares = set(stored_sharemap)
|
||||||
db_shares = set(db_sharemap)
|
db_shares = set(db_sharemap)
|
||||||
|
|
||||||
# add new shares to the DB
|
# Add new shares to the DB.
|
||||||
new_shares = stored_shares - db_shares
|
new_shares = stored_shares - db_shares
|
||||||
for shareid in new_shares:
|
for shareid in new_shares:
|
||||||
(si_s, shnum) = shareid
|
(si_s, shnum) = shareid
|
||||||
(used_space, sharetype) = stored_sharemap[shareid]
|
(used_space, sharetype, state) = stored_sharemap[shareid]
|
||||||
|
|
||||||
self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
|
self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
|
||||||
self._leasedb.add_starter_lease(si_s, shnum)
|
self._leasedb.add_starter_lease(si_s, shnum)
|
||||||
|
|
||||||
# remove disappeared shares from DB
|
# Remove disappeared shares from the DB. Note that only shares in STATE_STABLE
|
||||||
disappeared_shares = db_shares - stored_shares
|
# should be considered "disappeared", since otherwise it would be possible for
|
||||||
for (si_s, shnum) in disappeared_shares:
|
# this to delete shares that are in the process of being created (see ticket #1921).
|
||||||
|
potentially_disappeared_shares = db_shares - stored_shares
|
||||||
|
for shareid in potentially_disappeared_shares:
|
||||||
|
(used_space, sharetype, state) = db_sharemap[shareid]
|
||||||
|
if state == STATE_STABLE:
|
||||||
|
(si_s, shnum) = shareid
|
||||||
log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
|
log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
|
||||||
si_s=si_s, shnum=shnum, level=log.WEIRD)
|
si_s=si_s, shnum=shnum, level=log.WEIRD)
|
||||||
# This is temporarily disabled, because it results in failures if we're examining
|
self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
|
||||||
# a prefix while a share is created in it (ticket #1921).
|
|
||||||
#self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
|
|
||||||
|
|
||||||
recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
|
recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
|
||||||
|
|
||||||
def _delete_share(ign, key, value):
|
def _delete_share(ign, key, value):
|
||||||
(si_s, shnum) = key
|
(si_s, shnum) = key
|
||||||
(used_space, sharetype) = value
|
(used_space, sharetype, state) = value
|
||||||
|
_assert(state == STATE_STABLE, state=state)
|
||||||
storage_index = si_a2b(si_s)
|
storage_index = si_a2b(si_s)
|
||||||
|
|
||||||
d3 = defer.succeed(None)
|
d3 = defer.succeed(None)
|
||||||
def _mark_and_delete(ign):
|
def _mark_and_delete(ign):
|
||||||
self._leasedb.mark_share_as_going(storage_index, shnum)
|
self._leasedb.mark_share_as_going(storage_index, shnum)
|
||||||
|
@ -141,6 +148,7 @@ class AccountingCrawler(ShareCrawler):
|
||||||
d3.addCallbacks(_deleted, _not_deleted)
|
d3.addCallbacks(_deleted, _not_deleted)
|
||||||
return d3
|
return d3
|
||||||
|
|
||||||
|
# This only includes stable unleased shares (see ticket #1921).
|
||||||
unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
|
unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
|
||||||
d2 = for_items(_delete_share, unleased_sharemap)
|
d2 = for_items(_delete_share, unleased_sharemap)
|
||||||
|
|
||||||
|
|
|
@ -137,14 +137,15 @@ class LeaseDB:
|
||||||
|
|
||||||
def get_shares_for_prefix(self, prefix):
|
def get_shares_for_prefix(self, prefix):
|
||||||
"""
|
"""
|
||||||
Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype) pairs.
|
Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype, state) triples
|
||||||
|
for shares with this prefix.
|
||||||
"""
|
"""
|
||||||
self._cursor.execute("SELECT `storage_index`,`shnum`, `used_space`, `sharetype`"
|
self._cursor.execute("SELECT `storage_index`,`shnum`, `used_space`, `sharetype`, `state`"
|
||||||
" FROM `shares`"
|
" FROM `shares`"
|
||||||
" WHERE `prefix` == ?",
|
" WHERE `prefix` == ?",
|
||||||
(prefix,))
|
(prefix,))
|
||||||
db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype)))
|
db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype), int(state)))
|
||||||
for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()])
|
for (si_s, shnum, used_space, sharetype, state) in self._cursor.fetchall()])
|
||||||
return db_sharemap
|
return db_sharemap
|
||||||
|
|
||||||
def add_new_share(self, storage_index, shnum, used_space, sharetype):
|
def add_new_share(self, storage_index, shnum, used_space, sharetype):
|
||||||
|
@ -284,18 +285,24 @@ class LeaseDB:
|
||||||
return map(_to_age, rows)
|
return map(_to_age, rows)
|
||||||
|
|
||||||
def get_unleased_shares_for_prefix(self, prefix):
|
def get_unleased_shares_for_prefix(self, prefix):
|
||||||
|
"""
|
||||||
|
Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype, state) triples
|
||||||
|
for stable, unleased shares with this prefix.
|
||||||
|
"""
|
||||||
if self.debug: print "GET_UNLEASED_SHARES_FOR_PREFIX", prefix
|
if self.debug: print "GET_UNLEASED_SHARES_FOR_PREFIX", prefix
|
||||||
# This would be simpler, but it doesn't work because 'NOT IN' doesn't support multiple columns.
|
# This would be simpler, but it doesn't work because 'NOT IN' doesn't support multiple columns.
|
||||||
#query = ("SELECT `storage_index`, `shnum`, `used_space`, `sharetype` FROM `shares`"
|
#query = ("SELECT `storage_index`, `shnum`, `used_space`, `sharetype`, `state` FROM `shares`"
|
||||||
# " WHERE (`storage_index`, `shnum`) NOT IN (SELECT DISTINCT `storage_index`, `shnum` FROM `leases`)")
|
# " WHERE `state` = STATE_STABLE "
|
||||||
|
# " AND (`storage_index`, `shnum`) NOT IN (SELECT DISTINCT `storage_index`, `shnum` FROM `leases`)")
|
||||||
|
|
||||||
# This "negative join" should be equivalent.
|
# This "negative join" should be equivalent.
|
||||||
self._cursor.execute("SELECT DISTINCT s.storage_index, s.shnum, s.used_space, s.sharetype FROM `shares` s LEFT JOIN `leases` l"
|
self._cursor.execute("SELECT DISTINCT s.storage_index, s.shnum, s.used_space, s.sharetype, s.state"
|
||||||
|
" FROM `shares` s LEFT JOIN `leases` l"
|
||||||
" ON (s.storage_index = l.storage_index AND s.shnum = l.shnum)"
|
" ON (s.storage_index = l.storage_index AND s.shnum = l.shnum)"
|
||||||
" WHERE s.prefix = ? AND l.storage_index IS NULL",
|
" WHERE s.prefix = ? AND s.state = ? AND l.storage_index IS NULL",
|
||||||
(prefix,))
|
(prefix, STATE_STABLE))
|
||||||
db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype)))
|
db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype), int(state)))
|
||||||
for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()])
|
for (si_s, shnum, used_space, sharetype, state) in self._cursor.fetchall()])
|
||||||
return db_sharemap
|
return db_sharemap
|
||||||
|
|
||||||
def remove_leases_by_renewal_time(self, renewal_cutoff_time):
|
def remove_leases_by_renewal_time(self, renewal_cutoff_time):
|
||||||
|
|
Loading…
Reference in New Issue