From f2968aab2234fde9e08cebcf9aeac619186d0761 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 Feb 2019 22:59:41 -0500 Subject: [PATCH] add re-reflect task to stream manager, add concurrent_reflector_uploads to config --- lbrynet/conf.py | 3 +++ lbrynet/stream/reflector/client.py | 2 +- lbrynet/stream/stream_manager.py | 33 ++++++++++++++++++++---------- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 61e0127f4..2f2eab69e 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -505,6 +505,9 @@ class Config(CLIConfig): "Upload completed streams (published and downloaded) reflector in order to re-host them", True, previous_names=['reflect_uploads'] ) + concurrent_reflector_uploads = Integer( + "Maximum number of streams to upload to a reflector server at a time", 10 + ) # servers reflector_servers = Servers("Reflector re-hosting servers", [ diff --git a/lbrynet/stream/reflector/client.py b/lbrynet/stream/reflector/client.py index 908f0608f..117afa53e 100644 --- a/lbrynet/stream/reflector/client.py +++ b/lbrynet/stream/reflector/client.py @@ -27,7 +27,7 @@ class StreamReflectorClient(asyncio.Protocol): def connection_made(self, transport): self.transport = transport - log.info("Connected to reflector") + log.debug("Connected to reflector") self.connected.set() def connection_lost(self, exc: typing.Optional[Exception]): diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 360eaf4fc..a29976e39 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -62,6 +62,7 @@ class StreamManager: self.streams: typing.Set[ManagedStream] = set() self.starting_streams: typing.Dict[str, asyncio.Future] = {} self.resume_downloading_task: asyncio.Task = None + self.re_reflect_task: asyncio.Task = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] async def _update_content_claim(self, stream: ManagedStream): @@ -147,26 +148,36 @@ class StreamManager: await asyncio.gather(*t, loop=self.loop) async def reflect_streams(self): - streams = list(self.streams) - batch = [] - while streams: - stream = streams.pop() - if not stream.fully_reflected.is_set(): - host, port = random.choice(self.reflector_servers) - batch.append(stream.upload_to_reflector(host, port)) - if len(batch) >= 10: - await asyncio.gather(*batch) + while True: + if self.config.reflector_servers: + sd_hashes = await self.storage.get_streams_to_re_reflect() + streams = list(filter(lambda s: s.sd_hash in sd_hashes, self.streams)) batch = [] - if batch: - await asyncio.gather(*batch) + total = len(streams) + while streams: + stream = streams.pop() + if not stream.fully_reflected.is_set(): + host, port = random.choice(self.config.reflector_servers) + batch.append(stream.upload_to_reflector(host, port)) + if len(batch) >= self.config.concurrent_reflector_uploads: + await asyncio.gather(*batch) + batch = [] + if batch: + await asyncio.gather(*batch) + if total: + log.info("uploaded %i streams to reflector", total) + await asyncio.sleep(300, loop=self.loop) async def start(self): await self.load_streams_from_database() self.resume_downloading_task = self.loop.create_task(self.resume()) + self.re_reflect_task = self.loop.create_task(self.reflect_streams()) def stop(self): if self.resume_downloading_task and not self.resume_downloading_task.done(): self.resume_downloading_task.cancel() + if self.re_reflect_task and not self.re_reflect_task.done(): + self.re_reflect_task.cancel() while self.streams: stream = self.streams.pop() stream.stop_download()