mirror of
https://github.com/LBRYFoundation/lbry-sdk.git
synced 2025-08-23 17:27:25 +00:00
don't try to resend failed blobs to reflector
-fixes infinite loop where client keeps trying to send failing blobs, which may be failing because they are invalid -return list of reflected blob hashes from BlobReflectorClient
This commit is contained in:
parent
679c2f403f
commit
3085e28490
3 changed files with 14 additions and 12 deletions
|
@ -13,15 +13,15 @@ at anytime.
|
||||||
*
|
*
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
*
|
* incorrectly raised download cancelled error for already verified blob files
|
||||||
*
|
* infinite loop where reflector client keeps trying to send failing blobs, which may be failing because they are invalid and thus will never be successfully received
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
* incorrectly raised download cancelled error for already verified blob files
|
*
|
||||||
*
|
*
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
|
@ -26,7 +26,7 @@ class BlobReflectorClient(Protocol):
|
||||||
self.file_sender = None
|
self.file_sender = None
|
||||||
self.producer = None
|
self.producer = None
|
||||||
self.streaming = False
|
self.streaming = False
|
||||||
self.sent_blobs = False
|
self.reflected_blobs = []
|
||||||
d = self.send_handshake()
|
d = self.send_handshake()
|
||||||
d.addErrback(
|
d.addErrback(
|
||||||
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||||
|
@ -46,10 +46,9 @@ class BlobReflectorClient(Protocol):
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
if reason.check(error.ConnectionDone):
|
if reason.check(error.ConnectionDone):
|
||||||
self.factory.sent_blobs = self.sent_blobs
|
if self.reflected_blobs:
|
||||||
if self.factory.sent_blobs:
|
|
||||||
log.info('Finished sending data via reflector')
|
log.info('Finished sending data via reflector')
|
||||||
self.factory.finished_deferred.callback(self.factory.sent_blobs)
|
self.factory.finished_deferred.callback(self.reflected_blobs)
|
||||||
else:
|
else:
|
||||||
log.info('Reflector finished: %s', reason)
|
log.info('Reflector finished: %s', reason)
|
||||||
self.factory.finished_deferred.callback(reason)
|
self.factory.finished_deferred.callback(reason)
|
||||||
|
@ -101,7 +100,6 @@ class BlobReflectorClient(Protocol):
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
def start_transfer(self):
|
def start_transfer(self):
|
||||||
self.sent_blobs = True
|
|
||||||
assert self.read_handle is not None, \
|
assert self.read_handle is not None, \
|
||||||
"self.read_handle was None when trying to start the transfer"
|
"self.read_handle was None when trying to start the transfer"
|
||||||
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
||||||
|
@ -130,6 +128,8 @@ class BlobReflectorClient(Protocol):
|
||||||
if 'received_blob' not in response_dict:
|
if 'received_blob' not in response_dict:
|
||||||
raise ValueError("I don't know if the blob made it to the intended destination!")
|
raise ValueError("I don't know if the blob made it to the intended destination!")
|
||||||
else:
|
else:
|
||||||
|
if response_dict['received_blob']:
|
||||||
|
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
|
|
||||||
def open_blob_for_reading(self, blob):
|
def open_blob_for_reading(self, blob):
|
||||||
|
@ -188,7 +188,6 @@ class BlobReflectorClientFactory(ClientFactory):
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.blobs = blobs
|
self.blobs = blobs
|
||||||
self.p = None
|
self.p = None
|
||||||
self.sent_blobs = False
|
|
||||||
self.finished_deferred = defer.Deferred()
|
self.finished_deferred = defer.Deferred()
|
||||||
|
|
||||||
def buildProtocol(self, addr):
|
def buildProtocol(self, addr):
|
||||||
|
|
|
@ -204,14 +204,18 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
raise ValueError("I don't know if the sd blob made it to the intended destination!")
|
raise ValueError("I don't know if the sd blob made it to the intended destination!")
|
||||||
else:
|
else:
|
||||||
self.received_descriptor_response = True
|
self.received_descriptor_response = True
|
||||||
|
disconnect = False
|
||||||
if response_dict['received_sd_blob']:
|
if response_dict['received_sd_blob']:
|
||||||
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
||||||
log.info("Sent reflector descriptor %s", self.next_blob_to_send)
|
log.info("Sent reflector descriptor %s", self.next_blob_to_send)
|
||||||
else:
|
else:
|
||||||
log.warning("Reflector failed to receive descriptor %s",
|
log.warning("Reflector failed to receive descriptor %s",
|
||||||
self.next_blob_to_send)
|
self.next_blob_to_send)
|
||||||
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
disconnect = True
|
||||||
return self.set_not_uploading()
|
d = self.set_not_uploading()
|
||||||
|
if disconnect:
|
||||||
|
d.addCallback(lambda _: self.transport.loseConnection())
|
||||||
|
return d
|
||||||
|
|
||||||
def handle_normal_response(self, response_dict):
|
def handle_normal_response(self, response_dict):
|
||||||
if self.file_sender is None: # Expecting Server Info Response
|
if self.file_sender is None: # Expecting Server Info Response
|
||||||
|
@ -232,7 +236,6 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
log.debug("Sent reflector blob %s", self.next_blob_to_send)
|
log.debug("Sent reflector blob %s", self.next_blob_to_send)
|
||||||
else:
|
else:
|
||||||
log.warning("Reflector failed to receive blob %s", self.next_blob_to_send)
|
log.warning("Reflector failed to receive blob %s", self.next_blob_to_send)
|
||||||
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
|
|
||||||
def open_blob_for_reading(self, blob):
|
def open_blob_for_reading(self, blob):
|
||||||
|
|
Loading…
Add table
Reference in a new issue