diff --git a/lbrynet/core/call_later_manager.py b/lbrynet/core/call_later_manager.py new file mode 100644 index 000000000..d3f5d3c2d --- /dev/null +++ b/lbrynet/core/call_later_manager.py @@ -0,0 +1,63 @@ +class CallLaterManager(object): + _callLater = None + _pendingCallLaters = [] + + @classmethod + def _cancel(cls, call_later): + """ + :param call_later: DelayedCall + :return: (callable) canceller function + """ + + def cancel(reason=None): + """ + :param reason: reason for cancellation, this is returned after cancelling the DelayedCall + :return: reason + """ + + if call_later.active(): + call_later.cancel() + cls._pendingCallLaters.remove(call_later) + return reason + return cancel + + @classmethod + def stop(cls): + """ + Cancel any callLaters that are still running + """ + + from twisted.internet import defer + while cls._pendingCallLaters: + canceller = cls._cancel(cls._pendingCallLaters[0]) + try: + canceller() + except (defer.CancelledError, defer.AlreadyCalledError): + pass + + @classmethod + def call_later(cls, when, what, *args, **kwargs): + """ + Schedule a call later and get a canceller callback function + + :param when: (float) delay in seconds + :param what: (callable) + :param args: (*tuple) args to be passed to the callable + :param kwargs: (**dict) kwargs to be passed to the callable + + :return: (tuple) twisted.internet.base.DelayedCall object, canceller function + """ + + call_later = cls._callLater(when, what, *args, **kwargs) + canceller = cls._cancel(call_later) + cls._pendingCallLaters.append(call_later) + return call_later, canceller + + @classmethod + def setup(cls, callLater): + """ + Setup the callLater function to use, supports the real reactor as well as task.Clock + + :param callLater: (IReactorTime.callLater) + """ + cls._callLater = callLater diff --git a/lbrynet/dht/hashannouncer.py b/lbrynet/dht/hashannouncer.py index a1533947a..069c4294f 100644 --- a/lbrynet/dht/hashannouncer.py +++ b/lbrynet/dht/hashannouncer.py @@ -1,7 +1,6 @@ import binascii import collections import logging -import time import datetime from twisted.internet import defer, task diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c7e18af77..b576cf9a2 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -15,6 +15,7 @@ import logging from twisted.internet import defer, error, task from lbrynet.core.utils import generate_id +from lbrynet.core.call_later_manager import CallLaterManager from lbrynet.core.PeerManager import PeerManager import constants @@ -89,10 +90,11 @@ class Node(object): resolve = resolve or reactor.resolve callLater = callLater or reactor.callLater clock = clock or reactor + self.clock = clock + CallLaterManager.setup(callLater) self.reactor_resolve = resolve self.reactor_listenUDP = listenUDP - self.reactor_callLater = callLater - self.clock = clock + self.reactor_callLater = CallLaterManager.call_later self.node_id = node_id or self._generateID() self.port = udpPort self._listeningPort = None # object implementing Twisted @@ -856,7 +858,7 @@ class _IterativeFindHelper(object): if self._should_lookup_active_calls(): # Schedule the next iteration if there are any active # calls (Kademlia uses loose parallelism) - call = self.node.reactor_callLater(constants.iterativeLookupDelay, self.searchIteration) + call, _ = self.node.reactor_callLater(constants.iterativeLookupDelay, self.searchIteration) self.pending_iteration_calls.append(call) # Check for a quick contact response that made an update to the shortList elif prevShortlistLength < len(self.shortlist): diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 4dc2ba176..ce45f56c1 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -3,7 +3,8 @@ import time import socket import errno -from twisted.internet import protocol, defer, error, task +from twisted.internet import protocol, defer, task +from lbrynet.core.call_later_manager import CallLaterManager import constants import encoding @@ -169,17 +170,11 @@ class KademliaProtocol(protocol.DatagramProtocol): df._rpcRawResponse = True # Set the RPC timeout timer - timeoutCall = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, msg.id) + timeoutCall, cancelTimeout = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, msg.id) # Transmit the data self._send(encodedMsg, msg.id, (contact.address, contact.port)) self._sentMessages[msg.id] = (contact.id, df, timeoutCall, method, args) - - def cancel(err): - if timeoutCall.cancelled or timeoutCall.called: - return err - timeoutCall.cancel() - - df.addErrback(cancel) + df.addErrback(cancelTimeout) return df def startProtocol(self): @@ -340,12 +335,9 @@ class KademliaProtocol(protocol.DatagramProtocol): """Schedule the sending of the next UDP packet """ delay = self._delay() key = object() - delayed_call = self._node.reactor_callLater(delay, self._write_and_remove, key, txData, address) - self._call_later_list[key] = delayed_call + delayed_call, _ = self._node.reactor_callLater(delay, self._write_and_remove, key, txData, address) def _write_and_remove(self, key, txData, address): - if key in self._call_later_list: - del self._call_later_list[key] if self.transport: try: self.transport.write(txData, address) @@ -440,7 +432,7 @@ class KademliaProtocol(protocol.DatagramProtocol): # See if any progress has been made; if not, kill the message if self._hasProgressBeenMade(messageID): # Reset the RPC timeout timer - timeoutCall = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, messageID) + timeoutCall, _ = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, messageID) self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args) else: # No progress has been made @@ -469,15 +461,6 @@ class KademliaProtocol(protocol.DatagramProtocol): if self._bandwidth_stats_update_lc.running: self._bandwidth_stats_update_lc.stop() - for delayed_call in self._call_later_list.values(): - try: - delayed_call.cancel() - except (error.AlreadyCalled, error.AlreadyCancelled): - log.debug('Attempted to cancel a DelayedCall that was not active') - except Exception: - log.exception('Failed to cancel a DelayedCall') - # not sure why this is needed, but taking this out sometimes causes - # exceptions.AttributeError: 'Port' object has no attribute 'socket' - # to happen on shutdown - # reactor.iterate() + CallLaterManager.stop() + log.info('DHT stopped') diff --git a/lbrynet/tests/unit/dht/test_protocol.py b/lbrynet/tests/unit/dht/test_protocol.py index d10a5a5e7..0b48cf115 100644 --- a/lbrynet/tests/unit/dht/test_protocol.py +++ b/lbrynet/tests/unit/dht/test_protocol.py @@ -1,7 +1,7 @@ import time import unittest from twisted.internet.task import Clock -from twisted.internet import defer +from twisted.internet import defer, threads import lbrynet.dht.protocol import lbrynet.dht.contact import lbrynet.dht.constants @@ -9,6 +9,7 @@ import lbrynet.dht.msgtypes from lbrynet.dht.error import TimeoutError from lbrynet.dht.node import Node, rpcmethod from lbrynet.tests.mocks import listenUDP, resolve +from lbrynet.core.call_later_manager import CallLaterManager import logging @@ -22,10 +23,12 @@ class KademliaProtocolTest(unittest.TestCase): def setUp(self): self._reactor = Clock() + CallLaterManager.setup(self._reactor.callLater) self.node = Node(node_id='1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP, resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) def tearDown(self): + CallLaterManager.stop() del self._reactor @defer.inlineCallbacks @@ -40,7 +43,6 @@ class KademliaProtocolTest(unittest.TestCase): def testRPCTimeout(self): """ Tests if a RPC message sent to a dead remote node times out correctly """ - dead_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP, resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater) dead_node.start_listening()