allow the introducer to set default encoding parameters. Closes #84.
By writing something like "25 75 100" into a file named 'encoding_parameters' in the central Introducer's base directory, all clients which use that introducer will be advised to use 25-out-of-100 encoding for files (i.e. 100 shares will be produced, 25 are required to reconstruct, and the upload process will be happy if it can find homes for at least 75 shares). The default values are "3 7 10". For small meshes, the defaults are probably good, but for larger ones it may be appropriate to increase the number of shares.
This commit is contained in:
parent
def63d193e
commit
5399395c27
|
@ -150,6 +150,11 @@ class Client(node.Node, Referenceable):
|
||||||
results.sort()
|
results.sort()
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def get_encoding_parameters(self):
|
||||||
|
if not self.introducer_client:
|
||||||
|
return None
|
||||||
|
return self.introducer_client.encoding_parameters
|
||||||
|
|
||||||
def connected_to_introducer(self):
|
def connected_to_introducer(self):
|
||||||
if self.introducer_client:
|
if self.introducer_client:
|
||||||
return self.introducer_client.connected_to_introducer()
|
return self.introducer_client.connected_to_introducer()
|
||||||
|
|
|
@ -113,9 +113,13 @@ class CRSDecoder(object):
|
||||||
return self.required_shares
|
return self.required_shares
|
||||||
|
|
||||||
def decode(self, some_shares, their_shareids):
|
def decode(self, some_shares, their_shareids):
|
||||||
precondition(len(some_shares) == len(their_shareids), len(some_shares), len(their_shareids))
|
precondition(len(some_shares) == len(their_shareids),
|
||||||
precondition(len(some_shares) == self.required_shares, len(some_shares), self.required_shares)
|
len(some_shares), len(their_shareids))
|
||||||
return defer.succeed(self.decoder.decode(some_shares, [int(s) for s in their_shareids]))
|
precondition(len(some_shares) == self.required_shares,
|
||||||
|
len(some_shares), self.required_shares)
|
||||||
|
data = self.decoder.decode(some_shares,
|
||||||
|
[int(s) for s in their_shareids])
|
||||||
|
return defer.succeed(data)
|
||||||
|
|
||||||
|
|
||||||
all_encoders = {
|
all_encoders = {
|
||||||
|
|
|
@ -88,6 +88,12 @@ class Encoder(object):
|
||||||
self.TOTAL_SHARES = n
|
self.TOTAL_SHARES = n
|
||||||
self.uri_extension_data = {}
|
self.uri_extension_data = {}
|
||||||
|
|
||||||
|
def set_params(self, encoding_parameters):
|
||||||
|
k,d,n = encoding_parameters
|
||||||
|
self.NEEDED_SHARES = k
|
||||||
|
self.SHARES_OF_HAPPINESS = d
|
||||||
|
self.TOTAL_SHARES = n
|
||||||
|
|
||||||
def setup(self, infile, encryption_key):
|
def setup(self, infile, encryption_key):
|
||||||
self.infile = infile
|
self.infile = infile
|
||||||
assert isinstance(encryption_key, str)
|
assert isinstance(encryption_key, str)
|
||||||
|
|
|
@ -20,6 +20,22 @@ URIExtensionData = StringConstraint(1000)
|
||||||
class RIIntroducerClient(RemoteInterface):
|
class RIIntroducerClient(RemoteInterface):
|
||||||
def new_peers(furls=SetOf(FURL)):
|
def new_peers(furls=SetOf(FURL)):
|
||||||
return None
|
return None
|
||||||
|
def set_encoding_parameters(parameters=(int, int, int)):
|
||||||
|
"""Advise the client of the recommended k-of-n encoding parameters
|
||||||
|
for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
|
||||||
|
is the total number of shares that will be created for any given
|
||||||
|
file, while 'k' is the number of shares that must be retrieved to
|
||||||
|
recover that file, and 'desired' is the minimum number of shares that
|
||||||
|
must be placed before the uploader will consider its job a success.
|
||||||
|
n/k is the expansion ratio, while k determines the robustness.
|
||||||
|
|
||||||
|
Introducers should specify 'n' according to the expected size of the
|
||||||
|
grid (there is no point to producing more shares than there are
|
||||||
|
peers), and k according to the desired reliability-vs-overhead goals.
|
||||||
|
|
||||||
|
Note that the current encoding technology requires k>=2.
|
||||||
|
"""
|
||||||
|
return None
|
||||||
|
|
||||||
class RIIntroducer(RemoteInterface):
|
class RIIntroducer(RemoteInterface):
|
||||||
def hello(node=RIIntroducerClient, furl=FURL):
|
def hello(node=RIIntroducerClient, furl=FURL):
|
||||||
|
|
|
@ -9,11 +9,16 @@ from allmydata.util import idlib, observer
|
||||||
|
|
||||||
class Introducer(service.MultiService, Referenceable):
|
class Introducer(service.MultiService, Referenceable):
|
||||||
implements(RIIntroducer)
|
implements(RIIntroducer)
|
||||||
|
name = "introducer"
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
service.MultiService.__init__(self)
|
service.MultiService.__init__(self)
|
||||||
self.nodes = set()
|
self.nodes = set()
|
||||||
self.furls = set()
|
self.furls = set()
|
||||||
|
self._encoding_parameters = None
|
||||||
|
|
||||||
|
def set_encoding_parameters(self, parameters):
|
||||||
|
self._encoding_parameters = parameters
|
||||||
|
|
||||||
def remote_hello(self, node, furl):
|
def remote_hello(self, node, furl):
|
||||||
log.msg("introducer: new contact at %s, node is %s" % (furl, node))
|
log.msg("introducer: new contact at %s, node is %s" % (furl, node))
|
||||||
|
@ -24,6 +29,9 @@ class Introducer(service.MultiService, Referenceable):
|
||||||
node.notifyOnDisconnect(_remove)
|
node.notifyOnDisconnect(_remove)
|
||||||
self.furls.add(furl)
|
self.furls.add(furl)
|
||||||
node.callRemote("new_peers", self.furls)
|
node.callRemote("new_peers", self.furls)
|
||||||
|
if self._encoding_parameters is not None:
|
||||||
|
node.callRemote("set_encoding_parameters",
|
||||||
|
self._encoding_parameters)
|
||||||
for othernode in self.nodes:
|
for othernode in self.nodes:
|
||||||
othernode.callRemote("new_peers", set([furl]))
|
othernode.callRemote("new_peers", set([furl]))
|
||||||
self.nodes.add(node)
|
self.nodes.add(node)
|
||||||
|
@ -42,6 +50,7 @@ class IntroducerClient(service.Service, Referenceable):
|
||||||
self._connected = False
|
self._connected = False
|
||||||
|
|
||||||
self.connection_observers = observer.ObserverList()
|
self.connection_observers = observer.ObserverList()
|
||||||
|
self.encoding_parameters = None
|
||||||
|
|
||||||
def startService(self):
|
def startService(self):
|
||||||
service.Service.startService(self)
|
service.Service.startService(self)
|
||||||
|
@ -60,6 +69,9 @@ class IntroducerClient(service.Service, Referenceable):
|
||||||
for furl in furls:
|
for furl in furls:
|
||||||
self._new_peer(furl)
|
self._new_peer(furl)
|
||||||
|
|
||||||
|
def remote_set_encoding_parameters(self, parameters):
|
||||||
|
self.encoding_parameters = parameters
|
||||||
|
|
||||||
def stopService(self):
|
def stopService(self):
|
||||||
service.Service.stopService(self)
|
service.Service.stopService(self)
|
||||||
self.introducer_reconnector.stopConnecting()
|
self.introducer_reconnector.stopConnecting()
|
||||||
|
|
|
@ -9,13 +9,17 @@ class IntroducerAndVdrive(node.Node):
|
||||||
PORTNUMFILE = "introducer.port"
|
PORTNUMFILE = "introducer.port"
|
||||||
NODETYPE = "introducer"
|
NODETYPE = "introducer"
|
||||||
VDRIVEDIR = "vdrive"
|
VDRIVEDIR = "vdrive"
|
||||||
|
ENCODING_PARAMETERS_FILE = "encoding_parameters"
|
||||||
|
DEFAULT_K, DEFAULT_DESIRED, DEFAULT_N = 3, 7, 10
|
||||||
|
|
||||||
def __init__(self, basedir="."):
|
def __init__(self, basedir="."):
|
||||||
node.Node.__init__(self, basedir)
|
node.Node.__init__(self, basedir)
|
||||||
self.urls = {}
|
self.urls = {}
|
||||||
|
self.read_encoding_parameters()
|
||||||
|
|
||||||
def tub_ready(self):
|
def tub_ready(self):
|
||||||
r = self.add_service(Introducer())
|
i = Introducer()
|
||||||
|
r = self.add_service(i)
|
||||||
self.urls["introducer"] = self.tub.registerReference(r, "introducer")
|
self.urls["introducer"] = self.tub.registerReference(r, "introducer")
|
||||||
self.log(" introducer is at %s" % self.urls["introducer"])
|
self.log(" introducer is at %s" % self.urls["introducer"])
|
||||||
f = open(os.path.join(self.basedir, "introducer.furl"), "w")
|
f = open(os.path.join(self.basedir, "introducer.furl"), "w")
|
||||||
|
@ -32,3 +36,17 @@ class IntroducerAndVdrive(node.Node):
|
||||||
f.write(self.urls["vdrive"] + "\n")
|
f.write(self.urls["vdrive"] + "\n")
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
|
encoding_parameters = self.read_encoding_parameters()
|
||||||
|
i.set_encoding_parameters(encoding_parameters)
|
||||||
|
|
||||||
|
def read_encoding_parameters(self):
|
||||||
|
k, desired, n = self.DEFAULT_K, self.DEFAULT_DESIRED, self.DEFAULT_N
|
||||||
|
PARAM_FILE = os.path.join(self.basedir, self.ENCODING_PARAMETERS_FILE)
|
||||||
|
if os.path.exists(PARAM_FILE):
|
||||||
|
f = open(PARAM_FILE, "r")
|
||||||
|
data = f.read().strip()
|
||||||
|
f.close()
|
||||||
|
k,desired,n = data.split()
|
||||||
|
k = int(k); desired = int(desired); n = int(n)
|
||||||
|
return k, desired, n
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
|
|
||||||
|
import os
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from foolscap.eventual import fireEventually, flushEventualQueue
|
from foolscap.eventual import fireEventually, flushEventualQueue
|
||||||
|
|
||||||
|
@ -7,9 +8,34 @@ from allmydata.util import testutil
|
||||||
|
|
||||||
class Basic(testutil.SignalMixin, unittest.TestCase):
|
class Basic(testutil.SignalMixin, unittest.TestCase):
|
||||||
def test_loadable(self):
|
def test_loadable(self):
|
||||||
q = introducer_and_vdrive.IntroducerAndVdrive()
|
basedir = "introducer_and_vdrive.Basic.test_loadable"
|
||||||
|
os.mkdir(basedir)
|
||||||
|
q = introducer_and_vdrive.IntroducerAndVdrive(basedir)
|
||||||
d = fireEventually(None)
|
d = fireEventually(None)
|
||||||
d.addCallback(lambda res: q.startService())
|
d.addCallback(lambda res: q.startService())
|
||||||
|
d.addCallback(lambda res: q.when_tub_ready())
|
||||||
|
def _check_parameters(res):
|
||||||
|
i = q.getServiceNamed("introducer")
|
||||||
|
self.failUnlessEqual(i._encoding_parameters, (3, 7, 10))
|
||||||
|
d.addCallback(_check_parameters)
|
||||||
|
d.addCallback(lambda res: q.stopService())
|
||||||
|
d.addCallback(flushEventualQueue)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def test_set_parameters(self):
|
||||||
|
basedir = "introducer_and_vdrive.Basic.test_set_parameters"
|
||||||
|
os.mkdir(basedir)
|
||||||
|
f = open(os.path.join(basedir, "encoding_parameters"), "w")
|
||||||
|
f.write("25 75 100")
|
||||||
|
f.close()
|
||||||
|
q = introducer_and_vdrive.IntroducerAndVdrive(basedir)
|
||||||
|
d = fireEventually(None)
|
||||||
|
d.addCallback(lambda res: q.startService())
|
||||||
|
d.addCallback(lambda res: q.when_tub_ready())
|
||||||
|
def _check_parameters(res):
|
||||||
|
i = q.getServiceNamed("introducer")
|
||||||
|
self.failUnlessEqual(i._encoding_parameters, (25, 75, 100))
|
||||||
|
d.addCallback(_check_parameters)
|
||||||
d.addCallback(lambda res: q.stopService())
|
d.addCallback(lambda res: q.stopService())
|
||||||
d.addCallback(flushEventualQueue)
|
d.addCallback(flushEventualQueue)
|
||||||
return d
|
return d
|
||||||
|
|
|
@ -573,8 +573,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||||
self.failUnless("size: %d\n" % len(self.data) in output)
|
self.failUnless("size: %d\n" % len(self.data) in output)
|
||||||
self.failUnless("num_segments: 1\n" in output)
|
self.failUnless("num_segments: 1\n" in output)
|
||||||
# segment_size is always a multiple of needed_shares
|
# segment_size is always a multiple of needed_shares
|
||||||
self.failUnless("segment_size: 125\n" in output)
|
self.failUnless("segment_size: 114\n" in output)
|
||||||
self.failUnless("total_shares: 100\n" in output)
|
self.failUnless("total_shares: 10\n" in output)
|
||||||
# keys which are supposed to be present
|
# keys which are supposed to be present
|
||||||
for key in ("size", "num_segments", "segment_size",
|
for key in ("size", "num_segments", "segment_size",
|
||||||
"needed_shares", "total_shares",
|
"needed_shares", "total_shares",
|
||||||
|
|
|
@ -14,6 +14,8 @@ class FakeClient:
|
||||||
def get_permuted_peers(self, storage_index):
|
def get_permuted_peers(self, storage_index):
|
||||||
return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
|
return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
|
||||||
for fakeid in range(50) ]
|
for fakeid in range(50) ]
|
||||||
|
def get_encoding_parameters(self):
|
||||||
|
return None
|
||||||
|
|
||||||
DATA = """
|
DATA = """
|
||||||
Once upon a time, there was a beautiful princess named Buttercup. She lived
|
Once upon a time, there was a beautiful princess named Buttercup. She lived
|
||||||
|
|
|
@ -65,7 +65,10 @@ class FileUploader:
|
||||||
self._client = client
|
self._client = client
|
||||||
self._options = options
|
self._options = options
|
||||||
|
|
||||||
def set_params(self, needed_shares, shares_of_happiness, total_shares):
|
def set_params(self, encoding_parameters):
|
||||||
|
self._encoding_parameters = encoding_parameters
|
||||||
|
|
||||||
|
needed_shares, shares_of_happiness, total_shares = encoding_parameters
|
||||||
self.needed_shares = needed_shares
|
self.needed_shares = needed_shares
|
||||||
self.shares_of_happiness = shares_of_happiness
|
self.shares_of_happiness = shares_of_happiness
|
||||||
self.total_shares = total_shares
|
self.total_shares = total_shares
|
||||||
|
@ -111,6 +114,7 @@ class FileUploader:
|
||||||
|
|
||||||
def setup_encoder(self):
|
def setup_encoder(self):
|
||||||
self._encoder = encode.Encoder(self._options)
|
self._encoder = encode.Encoder(self._options)
|
||||||
|
self._encoder.set_params(self._encoding_parameters)
|
||||||
self._encoder.setup(self._filehandle, self._encryption_key)
|
self._encoder.setup(self._filehandle, self._encryption_key)
|
||||||
share_size = self._encoder.get_share_size()
|
share_size = self._encoder.get_share_size()
|
||||||
block_size = self._encoder.get_block_size()
|
block_size = self._encoder.get_block_size()
|
||||||
|
@ -313,9 +317,12 @@ class Uploader(service.MultiService):
|
||||||
uploader_class = FileUploader
|
uploader_class = FileUploader
|
||||||
URI_LIT_SIZE_THRESHOLD = 55
|
URI_LIT_SIZE_THRESHOLD = 55
|
||||||
|
|
||||||
needed_shares = 25 # Number of shares required to reconstruct a file.
|
DEFAULT_ENCODING_PARAMETERS = (25, 75, 100)
|
||||||
desired_shares = 75 # We will abort an upload unless we can allocate space for at least this many.
|
# this is a tuple of (needed, desired, total). 'needed' is the number of
|
||||||
total_shares = 100 # Total number of shares created by encoding. If everybody has room then this is is how many we will upload.
|
# shares required to reconstruct a file. 'desired' means that we will
|
||||||
|
# abort an upload unless we can allocate space for at least this many.
|
||||||
|
# 'total' is the total number of shares created by encoding. If everybody
|
||||||
|
# has room then this is is how many we will upload.
|
||||||
|
|
||||||
def compute_id_strings(self, f):
|
def compute_id_strings(self, f):
|
||||||
# return a list of (plaintext_hash, encryptionkey, crypttext_hash)
|
# return a list of (plaintext_hash, encryptionkey, crypttext_hash)
|
||||||
|
@ -366,8 +373,10 @@ class Uploader(service.MultiService):
|
||||||
else:
|
else:
|
||||||
u = self.uploader_class(self.parent, options)
|
u = self.uploader_class(self.parent, options)
|
||||||
u.set_filehandle(fh)
|
u.set_filehandle(fh)
|
||||||
u.set_params(self.needed_shares, self.desired_shares,
|
encoding_parameters = self.parent.get_encoding_parameters()
|
||||||
self.total_shares)
|
if not encoding_parameters:
|
||||||
|
encoding_parameters = self.DEFAULT_ENCODING_PARAMETERS
|
||||||
|
u.set_params(encoding_parameters)
|
||||||
plaintext_hash, key, crypttext_hash = self.compute_id_strings(fh)
|
plaintext_hash, key, crypttext_hash = self.compute_id_strings(fh)
|
||||||
u.set_encryption_key(key)
|
u.set_encryption_key(key)
|
||||||
u.set_id_strings(crypttext_hash, plaintext_hash)
|
u.set_id_strings(crypttext_hash, plaintext_hash)
|
||||||
|
|
Loading…
Reference in New Issue