From efaa97216f8c854f377906e6150a36162f6e9a97 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 15 Feb 2018 17:30:14 -0500 Subject: [PATCH] move dht node setup back into node class --- lbrynet/core/Session.py | 47 +++------------------------ lbrynet/dht/node.py | 72 ++++++++++++++++++++--------------------- 2 files changed, 39 insertions(+), 80 deletions(-) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 288a7d7d8..603917e7a 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -247,59 +247,20 @@ class Session(object): d.addErrback(upnp_failed) return d - # the callback, if any, will be invoked once the joining procedure - # has terminated - def join_dht(self, cb=None): - from twisted.internet import reactor - - def join_resolved_addresses(result): - addresses = [] - for success, value in result: - if success is True: - addresses.append(value) - return addresses - - @defer.inlineCallbacks - def join_network(knownNodes): - log.debug("join DHT using known nodes: " + str(knownNodes)) - result = yield self.dht_node.joinNetwork(knownNodes) - defer.returnValue(result) - - ds = [] - for host, port in self.known_dht_nodes: - d = reactor.resolve(host) - d.addCallback(lambda h: (h, port)) # match host to port - ds.append(d) - - dl = defer.DeferredList(ds) - dl.addCallback(join_resolved_addresses) - dl.addCallback(join_network) - if cb: - dl.addCallback(cb) - - return dl - + @defer.inlineCallbacks def _setup_dht(self): log.info("Starting DHT") - def start_dht(join_network_result): - self.hash_announcer.run_manage_loop() - return True - self.dht_node = self.dht_node_class( udpPort=self.dht_node_port, node_id=self.node_id, externalIP=self.external_ip, peerPort=self.peer_port ) - self.peer_finder = peerfinder.DHTPeerFinder(self.dht_node, self.peer_manager) - if self.hash_announcer is None: - self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.peer_port) - self.dht_node.startNetwork() - - # pass start_dht() as callback to start the remaining components after joining the DHT - return self.join_dht(start_dht) + yield self.dht_node.joinNetwork(self.known_dht_nodes) + self.peer_finder = self.dht_node.peer_finder + self.hash_announcer = self.dht_node.hash_announcer def _setup_other_components(self): log.debug("Setting up the rest of the components") diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 13843ef51..489a16957 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -18,6 +18,9 @@ import routingtable import datastore import protocol +from peermanager import PeerManager +from hashannouncer import DHTHashAnnouncer +from peerfinder import DHTPeerFinder from contact import Contact from hashwatcher import HashWatcher from distance import Distance @@ -120,7 +123,12 @@ class Node(object): # will be used later self._can_store = True + self.peer_manager = PeerManager() + self.peer_finder = DHTPeerFinder(self, self.peer_manager) + self.hash_announcer = DHTHashAnnouncer(self, self.port) + def __del__(self): + log.warning("unclean shutdown of the dht node") if self._listeningPort is not None: self._listeningPort.stopListening() @@ -138,60 +146,50 @@ class Node(object): self._listeningPort.stopListening() self.hash_watcher.stop() - def startNetwork(self): - """ Causes the Node to start all the underlying components needed for the DHT - to work. This should be called before any other DHT operations. - """ - log.info("Starting DHT underlying components") - - # Prepare the underlying Kademlia protocol - if self.port is not None: - try: - self._listeningPort = reactor.listenUDP(self.port, self._protocol) - except error.CannotListenError as e: - import traceback - log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) - raise ValueError("%s lbrynet may already be running." % str(e)) - - # Start the token looping call - self.change_token_lc.start(constants.tokenSecretChangeInterval) - # #TODO: Refresh all k-buckets further away than this node's closest neighbour - # Start refreshing k-buckets periodically, if necessary - self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, - self._refreshNode) - self.hash_watcher.tick() - @defer.inlineCallbacks - def joinNetwork(self, knownNodeAddresses=None): + def joinNetwork(self, known_node_addresses=None): """ Causes the Node to attempt to join the DHT network by contacting the known DHT nodes. This can be called multiple times if the previous attempt has failed or if the Node has lost all the contacts. - @param knownNodeAddresses: A sequence of tuples containing IP address + @param known_node_addresses: A sequence of tuples containing IP address information for existing nodes on the Kademlia network, in the format: C{(, (udp port>)} - @type knownNodeAddresses: tuple + @type known_node_addresses: list """ + + try: + self._listeningPort = reactor.listenUDP(self.port, self._protocol) + log.info("DHT node listening on %i", self.port) + except error.CannotListenError as e: + import traceback + log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) + raise ValueError("%s lbrynet may already be running." % str(e)) + + known_node_addresses = known_node_addresses or [] + bootstrap_contacts = [] + for node_address, port in known_node_addresses: + host = yield reactor.resolve(node_address) + # Create temporary contact information for the list of addresses of known nodes + contact = Contact(self._generateID(), host, port, self._protocol) + bootstrap_contacts.append(contact) + log.info("Attempting to join the DHT network") - # IGNORE:E1101 - # Create temporary contact information for the list of addresses of known nodes - if knownNodeAddresses != None: - bootstrapContacts = [] - for address, port in knownNodeAddresses: - contact = Contact(self._generateID(), address, port, self._protocol) - bootstrapContacts.append(contact) - else: - bootstrapContacts = None - # Initiate the Kademlia joining sequence - perform a search for this node's own ID - self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) + self._joinDeferred = self._iterativeFind(self.node_id, bootstrap_contacts) # #TODO: Refresh all k-buckets further away than this node's closest neighbour # Start refreshing k-buckets periodically, if necessary self.hash_watcher.tick() yield self._joinDeferred + + self.change_token_lc.start(constants.tokenSecretChangeInterval) self.refresh_node_lc.start(constants.checkRefreshInterval) + self.peer_finder.run_manage_loop() + self.hash_announcer.run_manage_loop() + + #TODO: re-attempt joining the network if it fails @property def contacts(self):