diff --git a/lbry/lbry/blob/blob_manager.py b/lbry/lbry/blob/blob_manager.py index 71c646484..49a67b18a 100644 --- a/lbry/lbry/blob/blob_manager.py +++ b/lbry/lbry/blob/blob_manager.py @@ -37,18 +37,14 @@ class BlobManager: self.connection_manager = ConnectionManager(loop) def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None): - if self.config.save_blobs: + if self.config.save_blobs or ( + is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))): return BlobFile( self.loop, blob_hash, length, self.blob_completed, self.blob_dir ) - else: - if is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash)): - return BlobFile( - self.loop, blob_hash, length, self.blob_completed, self.blob_dir - ) - return BlobBuffer( - self.loop, blob_hash, length, self.blob_completed, self.blob_dir - ) + return BlobBuffer( + self.loop, blob_hash, length, self.blob_completed, self.blob_dir + ) def get_blob(self, blob_hash, length: typing.Optional[int] = None): if blob_hash in self.blobs: @@ -82,6 +78,7 @@ class BlobManager: return { item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name) } + in_blobfiles_dir = await self.loop.run_in_executor(None, get_files_in_blob_dir) to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir) if to_add: diff --git a/lbry/lbry/dht/peer.py b/lbry/lbry/dht/peer.py index ce22a1f31..39d13342f 100644 --- a/lbry/lbry/dht/peer.py +++ b/lbry/lbry/dht/peer.py @@ -112,10 +112,10 @@ class PeerManager: delay = self._loop.time() - constants.check_refresh_interval # fixme: find a way to re-enable that without breaking other parts - #if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: + # if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: # return - #addr_tup = (address, udp_port) - #if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id: + # addr_tup = (address, udp_port) + # if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id: # return previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None)) last_requested = self._last_requested.get((address, udp_port)) diff --git a/lbry/lbry/stream/descriptor.py b/lbry/lbry/stream/descriptor.py index 1b7856d85..b23df15b2 100644 --- a/lbry/lbry/stream/descriptor.py +++ b/lbry/lbry/stream/descriptor.py @@ -71,7 +71,7 @@ class StreamDescriptor: self.sd_hash = sd_hash @property - def length(self): + def length(self) -> int: return len(self.as_json()) def get_stream_hash(self) -> str: @@ -114,7 +114,7 @@ class StreamDescriptor: ]) ).encode() - def calculate_old_sort_sd_hash(self): + def calculate_old_sort_sd_hash(self) -> str: h = get_lbry_hash_obj() h.update(self.old_sort_json()) return h.hexdigest() diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index 38d461165..187084081 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -11,6 +11,8 @@ from lbry.stream.downloader import StreamDownloader from lbry.stream.descriptor import StreamDescriptor from lbry.stream.reflector.client import StreamReflectorClient from lbry.extras.daemon.storage import StoredStreamClaim +from lbry.blob import MAX_BLOB_SIZE + if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.schema.claim import Claim @@ -215,6 +217,9 @@ class ManagedStream: async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': + """ + Generate a stream from a file and save it to the db + """ descriptor = await StreamDescriptor.create_stream( loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, blob_completed_callback=blob_manager.blob_completed @@ -295,7 +300,7 @@ class ManagedStream: if not wrote: decrypted = decrypted[first_blob_start_offset:] if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): - decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151))) + decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * (MAX_BLOB_SIZE - 1)))) log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) await response.write_eof(decrypted) @@ -486,8 +491,8 @@ class ManagedStream: if end >= size: raise HTTPRequestRangeNotSatisfiable() - skip_blobs = start // 2097150 - skip = skip_blobs * 2097151 + skip_blobs = start // (MAX_BLOB_SIZE - 2) # -2 because ... dont remember + skip = skip_blobs * (MAX_BLOB_SIZE - 1) # -1 because skip_first_blob = start - skip start = skip_first_blob + skip final_size = end - start + 1 diff --git a/lbry/lbry/utils.py b/lbry/lbry/utils.py index d23fe9bc3..ed00e7024 100644 --- a/lbry/lbry/utils.py +++ b/lbry/lbry/utils.py @@ -19,7 +19,6 @@ import collections from lbry.schema.claim import Claim from lbry.cryptoutils import get_lbry_hash_obj - log = logging.getLogger(__name__)