mirror of
https://github.com/LBRYFoundation/lbry-sdk.git
synced 2025-08-29 16:31:25 +00:00
remove reactor.iterate from dht shutdown
This commit is contained in:
parent
fe7700d726
commit
693fef1964
1 changed files with 13 additions and 10 deletions
|
@ -21,6 +21,7 @@ import msgtypes
|
||||||
import msgformat
|
import msgformat
|
||||||
from contact import Contact
|
from contact import Contact
|
||||||
|
|
||||||
|
|
||||||
reactor = twisted.internet.reactor
|
reactor = twisted.internet.reactor
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -124,10 +125,10 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
except IndexError:
|
except IndexError:
|
||||||
log.warning("Couldn't decode dht datagram from %s", address)
|
log.warning("Couldn't decode dht datagram from %s", address)
|
||||||
return
|
return
|
||||||
|
|
||||||
message = self._translator.fromPrimitive(msgPrimitive)
|
message = self._translator.fromPrimitive(msgPrimitive)
|
||||||
remoteContact = Contact(message.nodeID, address[0], address[1], self)
|
remoteContact = Contact(message.nodeID, address[0], address[1], self)
|
||||||
|
|
||||||
# Refresh the remote node's details in the local node's k-buckets
|
# Refresh the remote node's details in the local node's k-buckets
|
||||||
self._node.addContact(remoteContact)
|
self._node.addContact(remoteContact)
|
||||||
|
|
||||||
|
@ -143,7 +144,9 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
del self._sentMessages[message.id]
|
del self._sentMessages[message.id]
|
||||||
|
|
||||||
if hasattr(df, '_rpcRawResponse'):
|
if hasattr(df, '_rpcRawResponse'):
|
||||||
# The RPC requested that the raw response message and originating address be returned; do not interpret it
|
# The RPC requested that the raw response message
|
||||||
|
# and originating address be returned; do not
|
||||||
|
# interpret it
|
||||||
df.callback((message, address))
|
df.callback((message, address))
|
||||||
elif isinstance(message, msgtypes.ErrorMessage):
|
elif isinstance(message, msgtypes.ErrorMessage):
|
||||||
# The RPC request raised a remote exception; raise it locally
|
# The RPC request raised a remote exception; raise it locally
|
||||||
|
@ -175,7 +178,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
def _send(self, data, rpcID, address):
|
def _send(self, data, rpcID, address):
|
||||||
""" Transmit the specified data over UDP, breaking it up into several
|
""" Transmit the specified data over UDP, breaking it up into several
|
||||||
packets if necessary
|
packets if necessary
|
||||||
|
|
||||||
If the data is spread over multiple UDP datagrams, the packets have the
|
If the data is spread over multiple UDP datagrams, the packets have the
|
||||||
following structure::
|
following structure::
|
||||||
| | | | | |||||||||||| 0x00 |
|
| | | | | |||||||||||| 0x00 |
|
||||||
|
@ -183,7 +186,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
| type ID | of packets |of this packet | | indicator|
|
| type ID | of packets |of this packet | | indicator|
|
||||||
| (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) |
|
| (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) |
|
||||||
| | | | | |||||||||||| |
|
| | | | | |||||||||||| |
|
||||||
|
|
||||||
@note: The header used for breaking up large data segments will
|
@note: The header used for breaking up large data segments will
|
||||||
possibly be moved out of the KademliaProtocol class in the
|
possibly be moved out of the KademliaProtocol class in the
|
||||||
future, into something similar to a message translator/encoder
|
future, into something similar to a message translator/encoder
|
||||||
|
@ -301,19 +304,19 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
df.errback(failure.Failure(TimeoutError(remoteContactID)))
|
df.errback(failure.Failure(TimeoutError(remoteContactID)))
|
||||||
else:
|
else:
|
||||||
# This should never be reached
|
# This should never be reached
|
||||||
print "ERROR: deferred timed out, but is not present in sent messages list!"
|
log.error("deferred timed out, but is not present in sent messages list!")
|
||||||
|
|
||||||
def stopProtocol(self):
|
def stopProtocol(self):
|
||||||
""" Called when the transport is disconnected.
|
""" Called when the transport is disconnected.
|
||||||
|
|
||||||
Will only be called once, after all ports are disconnected.
|
Will only be called once, after all ports are disconnected.
|
||||||
"""
|
"""
|
||||||
|
log.info('Stopping dht')
|
||||||
for key in self._callLaterList.keys():
|
for key in self._callLaterList.keys():
|
||||||
try:
|
try:
|
||||||
if key > time.time():
|
if key > time.time():
|
||||||
|
log.info('Cancelling %s', self._callLaterList[key])
|
||||||
self._callLaterList[key].cancel()
|
self._callLaterList[key].cancel()
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
print e
|
log.exception('Failed to cancel %s', self._callLaterList[key])
|
||||||
del self._callLaterList[key]
|
del self._callLaterList[key]
|
||||||
#TODO: test: do we really need the reactor.iterate() call?
|
|
||||||
reactor.iterate()
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue