This commit is contained in:
Victor Shyba 2020-02-14 13:23:33 -03:00
parent e9623fae13
commit f6b4fee814
5 changed files with 12 additions and 40 deletions

View file

@ -238,7 +238,7 @@ class FileManager:
# TODO: analytics for torrents # TODO: analytics for torrents
pass pass
elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))): stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server server = self.wallet_manager.ledger.network.client.server
self.loop.create_task( self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes( self.analytics_manager.send_time_to_first_bytes(

View file

@ -3,7 +3,6 @@ import asyncio
import time import time
import typing import typing
import logging import logging
import binascii
from typing import Optional from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.error import DownloadSDTimeoutError from lbry.error import DownloadSDTimeoutError

View file

@ -230,42 +230,17 @@ class StreamManager(SourceManager):
self.reflect_stream(stream) self.reflect_stream(stream)
return stream return stream
async def delete(self, stream: ManagedStream, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
if stream.sd_hash in self.running_reflector_uploads: if source.sd_hash in self.running_reflector_uploads:
self.running_reflector_uploads[stream.sd_hash].cancel() self.running_reflector_uploads[source.sd_hash].cancel()
stream.stop_tasks() source.stop_tasks()
if stream.sd_hash in self.streams: if source.sd_hash in self.streams:
del self.streams[stream.sd_hash] del self.streams[source.sd_hash]
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
await self.storage.delete_stream(stream.descriptor) await self.storage.delete_stream(source.descriptor)
if delete_file and stream.output_file_exists: if delete_file and source.output_file_exists:
os.remove(stream.full_path) os.remove(source.full_path)
# =======
# self.running_reflector_uploads.pop().cancel()
# super().stop()
# log.info("finished stopping the stream manager")
#
# def _upload_stream_to_reflector(self, stream: ManagedStream):
# if self.config.reflector_servers:
# host, port = random.choice(self.config.reflector_servers)
# task = self.loop.create_task(stream.upload_to_reflector(host, port))
# self.running_reflector_uploads.append(task)
# task.add_done_callback(
# lambda _: None
# if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task)
# )
#
# async def create(self, file_path: str, key: Optional[bytes] = None,
# iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
# self.add(source)
# if self.config.reflect_streams:
# self._upload_stream_to_reflector(source)
# return source
#
# async def _delete(self, stream: ManagedStream, delete_file: Optional[bool] = False):
# >>>>>>> ManagedDownloadSource and SourceManager refactor
async def stream_partial_content(self, request: Request, sd_hash: str): async def stream_partial_content(self, request: Request, sd_hash: str):
return await self._sources[sd_hash].stream_file(request, self.node) return await self._sources[sd_hash].stream_file(request, self.node)

View file

@ -82,9 +82,6 @@ class TorrentManager(SourceManager):
super().__init__(loop, config, storage, analytics_manager) super().__init__(loop, config, storage, analytics_manager)
self.torrent_session: 'TorrentSession' = torrent_session self.torrent_session: 'TorrentSession' = torrent_session
def add(self, source: ManagedDownloadSource):
super().add(source)
async def recover_streams(self, file_infos: typing.List[typing.Dict]): async def recover_streams(self, file_infos: typing.List[typing.Dict]):
raise NotImplementedError raise NotImplementedError

View file

@ -12,6 +12,7 @@ ignore_missing_imports = True
[pylint] [pylint]
jobs=8 jobs=8
ignore=words,server,rpc,schema,winpaths.py,migrator,undecorated.py ignore=words,server,rpc,schema,winpaths.py,migrator,undecorated.py
extension-pkg-whitelist=libtorrent
max-parents=10 max-parents=10
max-args=10 max-args=10
max-line-length=120 max-line-length=120