Merge branch '4072-no-more-blocking-part-2' into 4080-larger-chunks

This commit is contained in:
Itamar Turner-Trauring 2023-12-08 11:46:22 -05:00
commit 4bf1a643d5
6 changed files with 40 additions and 21 deletions

View File

@ -101,7 +101,7 @@ def client_node(request, grid, storage_nodes, number_of_nodes) -> Client:
"client_node", "client_node",
needed=number_of_nodes, needed=number_of_nodes,
happy=number_of_nodes, happy=number_of_nodes,
total=number_of_nodes, total=number_of_nodes + 3, # Make sure FEC does some work
) )
) )
print(f"Client node pid: {client_node.process.transport.pid}") print(f"Client node pid: {client_node.process.transport.pid}")

View File

@ -0,0 +1 @@
Continued work to make Tahoe-LAFS take advantage of multiple CPUs.

View File

@ -87,8 +87,8 @@ def encrypt_data(encryptor, plaintext):
""" """
_validate_cryptor(encryptor, encrypt=True) _validate_cryptor(encryptor, encrypt=True)
if not isinstance(plaintext, six.binary_type): if not isinstance(plaintext, (six.binary_type, memoryview)):
raise ValueError('Plaintext must be bytes') raise ValueError(f'Plaintext must be bytes or memoryview: {type(plaintext)}')
return encryptor.update(plaintext) return encryptor.update(plaintext)
@ -126,8 +126,8 @@ def decrypt_data(decryptor, plaintext):
""" """
_validate_cryptor(decryptor, encrypt=False) _validate_cryptor(decryptor, encrypt=False)
if not isinstance(plaintext, six.binary_type): if not isinstance(plaintext, (six.binary_type, memoryview)):
raise ValueError('Plaintext must be bytes') raise ValueError(f'Plaintext must be bytes or memoryview: {type(plaintext)}')
return decryptor.update(plaintext) return decryptor.update(plaintext)

View File

@ -419,7 +419,7 @@ class DownloadNode(object):
def process_blocks(self, segnum, blocks): def process_blocks(self, segnum, blocks):
start = now() start = now()
d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) d = self._decode_blocks(segnum, blocks)
d.addCallback(self._check_ciphertext_hash, segnum) d.addCallback(self._check_ciphertext_hash, segnum)
def _deliver(result): def _deliver(result):
log.msg(format="delivering segment(%(segnum)d)", log.msg(format="delivering segment(%(segnum)d)",

View File

@ -14,6 +14,7 @@ from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \
IMutableFileVersion, IWriteable IMutableFileVersion, IWriteable
from allmydata.util import hashutil, log, consumer, deferredutil, mathutil from allmydata.util import hashutil, log, consumer, deferredutil, mathutil
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.cputhreadpool import defer_to_thread
from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \ from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \
WriteableMDMFFileURI, ReadonlyMDMFFileURI WriteableMDMFFileURI, ReadonlyMDMFFileURI
from allmydata.monitor import Monitor from allmydata.monitor import Monitor
@ -128,7 +129,8 @@ class MutableFileNode(object):
return self return self
def create_with_keys(self, keypair, contents, @deferredutil.async_to_deferred
async def create_with_keys(self, keypair, contents,
version=SDMF_VERSION): version=SDMF_VERSION):
"""Call this to create a brand-new mutable file. It will create the """Call this to create a brand-new mutable file. It will create the
shares, find homes for them, and upload the initial contents (created shares, find homes for them, and upload the initial contents (created
@ -137,8 +139,8 @@ class MutableFileNode(object):
use) when it completes. use) when it completes.
""" """
self._pubkey, self._privkey = keypair self._pubkey, self._privkey = keypair
self._writekey, self._encprivkey, self._fingerprint = derive_mutable_keys( self._writekey, self._encprivkey, self._fingerprint = await defer_to_thread(
keypair, derive_mutable_keys, keypair
) )
if version == MDMF_VERSION: if version == MDMF_VERSION:
self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint) self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint)
@ -149,7 +151,7 @@ class MutableFileNode(object):
self._readkey = self._uri.readkey self._readkey = self._uri.readkey
self._storage_index = self._uri.storage_index self._storage_index = self._uri.storage_index
initial_contents = self._get_initial_contents(contents) initial_contents = self._get_initial_contents(contents)
return self._upload(initial_contents, None) return await self._upload(initial_contents, None)
def _get_initial_contents(self, contents): def _get_initial_contents(self, contents):
if contents is None: if contents is None:

View File

@ -4,8 +4,8 @@ Ported to Python 3.
from __future__ import annotations from __future__ import annotations
import time import time
from itertools import count from itertools import count
from zope.interface import implementer from zope.interface import implementer
from twisted.internet import defer from twisted.internet import defer
from twisted.python import failure from twisted.python import failure
@ -873,11 +873,20 @@ class Retrieve(object):
shares = shares[:self._required_shares] shares = shares[:self._required_shares]
self.log("decoding segment %d" % segnum) self.log("decoding segment %d" % segnum)
if segnum == self._num_segments - 1: if segnum == self._num_segments - 1:
d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids) d = self._tail_decoder.decode(shares, shareids)
else: else:
d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids) d = self._segment_decoder.decode(shares, shareids)
def _process(buffers):
segment = b"".join(buffers) # For larger shares, this can take a few milliseconds. As such, we want
# to unblock the event loop. In newer Python b"".join() will release
# the GIL: https://github.com/python/cpython/issues/80232
@deferredutil.async_to_deferred
async def _got_buffers(buffers):
return await defer_to_thread(lambda: b"".join(buffers))
d.addCallback(_got_buffers)
def _process(segment):
self.log(format="now decoding segment %(segnum)s of %(numsegs)s", self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
segnum=segnum, segnum=segnum,
numsegs=self._num_segments, numsegs=self._num_segments,
@ -928,12 +937,20 @@ class Retrieve(object):
reason, reason,
) )
@deferredutil.async_to_deferred
def _try_to_validate_privkey(self, enc_privkey, reader, server): async def _try_to_validate_privkey(self, enc_privkey, reader, server):
node_writekey = self._node.get_writekey() node_writekey = self._node.get_writekey()
alleged_privkey_s = decrypt_privkey(node_writekey, enc_privkey)
alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) def get_privkey():
if alleged_writekey != node_writekey: alleged_privkey_s = decrypt_privkey(node_writekey, enc_privkey)
alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
if alleged_writekey != node_writekey:
return None
privkey, _ = rsa.create_signing_keypair_from_string(alleged_privkey_s)
return privkey
privkey = await defer_to_thread(get_privkey)
if privkey is None:
self.log("invalid privkey from %s shnum %d" % self.log("invalid privkey from %s shnum %d" %
(reader, reader.shnum), (reader, reader.shnum),
level=log.WEIRD, umid="YIw4tA") level=log.WEIRD, umid="YIw4tA")
@ -950,7 +967,6 @@ class Retrieve(object):
# it's good # it's good
self.log("got valid privkey from shnum %d on reader %s" % self.log("got valid privkey from shnum %d on reader %s" %
(reader.shnum, reader)) (reader.shnum, reader))
privkey, _ = rsa.create_signing_keypair_from_string(alleged_privkey_s)
self._node._populate_encprivkey(enc_privkey) self._node._populate_encprivkey(enc_privkey)
self._node._populate_privkey(privkey) self._node._populate_privkey(privkey)
self._need_privkey = False self._need_privkey = False