From 64dffa306f28b88fc3d22b7c70b8806f61ddf6f4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 28 Jan 2019 17:29:08 -0500 Subject: [PATCH] logging, cleanup --- lbrynet/blob_exchange/client.py | 40 ++++++++++++++++---------------- lbrynet/stream/downloader.py | 16 ++++++------- lbrynet/stream/managed_stream.py | 1 - 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 90b9a88a2..37ad056c3 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -19,15 +19,14 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.writer: 'HashBlobWriter' = None self.blob: 'BlobFile' = None - self.download_running = asyncio.Event(loop=self.loop) self._blob_bytes_received = 0 self._response_fut: asyncio.Future = None self._request_lock = asyncio.Lock(loop=self.loop) - def handle_data_received(self, data: bytes): - if self.transport.is_closing(): - if self._response_fut and not (self._response_fut.done() or self._response_fut.cancelled()): + def data_received(self, data: bytes): + if self.transport.is_closing(): # TODO: is this needed? + if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() return @@ -36,27 +35,36 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if response.responses and self.blob: blob_response = response.get_blob_response() if blob_response and not blob_response.error and blob_response.blob_hash == self.blob.blob_hash: + # set the expected length for the incoming blob if we didn't know it self.blob.set_length(blob_response.length) elif blob_response and not blob_response.error and self.blob.blob_hash != blob_response.blob_hash: + # the server started sending a blob we didn't request log.warning("mismatch with self.blob %s", self.blob.blob_hash) return if response.responses: + log.debug("got response from %s:%i <- %s", self.peer_address, self.peer_port, response.to_dict()) + # fire the Future with the response to our request self._response_fut.set_result(response) if response.blob_data and self.writer and not self.writer.closed(): + log.debug("got %i blob bytes from %s:%i", len(response.blob_data), self.peer_address, self.peer_port) + # write blob bytes if we're writing a blob and have blob bytes to write self._blob_bytes_received += len(response.blob_data) try: self.writer.write(response.blob_data) + return except IOError as err: - log.error("error downloading blob: %s", err) - - def data_received(self, data): - try: - return self.handle_data_received(data) - except (asyncio.CancelledError, asyncio.TimeoutError) as err: - if self._response_fut and not self._response_fut.done(): - self._response_fut.set_exception(err) + log.error("error downloading blob from %s:%i: %s", self.peer_address, self.peer_port, err) + if self._response_fut and not self._response_fut.done(): + self._response_fut.set_exception(err) + except (asyncio.CancelledError, asyncio.TimeoutError) as err: # TODO: is this needed? + log.error("%s downloading blob from %s:%i", str(err), self.peer_address, self.peer_port) + if self._response_fut and not self._response_fut.done(): + self._response_fut.set_exception(err) async def _download_blob(self) -> typing.Tuple[bool, bool]: + """ + :return: download success (bool), keep connection (bool) + """ request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash) try: msg = request.serialize() @@ -109,7 +117,6 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.writer.close_handle() if self.blob: await self.blob.close() - self.download_running.clear() self._response_fut = None self.writer = None self.blob = None @@ -122,11 +129,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return False, True async with self._request_lock: try: - if self.download_running.is_set(): - log.info("wait for download already running") - await self.download_running.wait() self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0 - self.download_running.set() self._response_fut = asyncio.Future(loop=self.loop) return await self._download_blob() except OSError: @@ -161,9 +164,6 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', protocol: """ if blob.get_is_verified(): return False, True - if blob.get_is_verified(): - log.info("already verified") - return False, True try: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 113eb90a2..b6ec2ff78 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -51,7 +51,7 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t async def _request_blob(self, peer: 'KademliaPeer'): if self.current_blob.get_is_verified(): - log.info("already verified") + log.debug("already verified") return if peer not in self.active_connections: log.warning("not active, adding: %s", str(peer)) @@ -61,12 +61,12 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t peer.address, peer.tcp_port, self.peer_connect_timeout) await protocol.close() if not keep_connection: - log.info("drop peer %s:%i", peer.address, peer.tcp_port) + log.debug("drop peer %s:%i", peer.address, peer.tcp_port) if peer in self.active_connections: async with self._lock: del self.active_connections[peer] return - log.info("keep peer %s:%i", peer.address, peer.tcp_port) + log.debug("keep peer %s:%i", peer.address, peer.tcp_port) def _update_requests(self): self.new_peer_event.clear() @@ -77,9 +77,9 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t if peer not in self.requested_from[self.current_blob.blob_hash] and peer not in to_add: to_add.append(peer) if to_add or self.running_download_requests: - log.info("adding download probes for %i peers to %i already active", - min(len(to_add), 8 - len(self.running_download_requests)), - len(self.running_download_requests)) + log.debug("adding download probes for %i peers to %i already active", + min(len(to_add), 8 - len(self.running_download_requests)), + len(self.running_download_requests)) else: log.info("downloader idle...") for peer in to_add: @@ -176,9 +176,7 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t try: async with node.stream_peer_search_junction(blob_queue) as search_junction: async for peers in search_junction: - if not isinstance(peers, list): # TODO: what's up with this? - log.error("not a list: %s %s", peers, str(type(peers))) - else: + if peers: self._add_peer_protocols(peers) if not added_peers.is_set(): added_peers.set() diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 6a637ec1e..ed0210340 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -30,7 +30,6 @@ class ManagedStream: self.stream_hash = descriptor.stream_hash self.stream_claim_info = claim self._status = status - self._store_after_finished: asyncio.Task = None self.fully_reflected = asyncio.Event(loop=self.loop) @property