diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 78907aec4..cbdd6e694 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -12,7 +12,6 @@ from lbrynet.core.PaymentRateManager import BasePaymentRateManager, NegotiatedPa from lbrynet.core.BlobAvailability import BlobAvailabilityTracker from twisted.internet import threads, defer - log = logging.getLogger(__name__) @@ -36,6 +35,7 @@ class Session(object): upnp, which opens holes in compatible firewalls so that remote peers can connect to this peer. """ + def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, @@ -251,10 +251,7 @@ class Session(object): from twisted.internet import reactor - log.debug("Starting the dht") - - def match_port(h, p): - return h, p + log.info("Starting DHT") def join_resolved_addresses(result): addresses = [] @@ -272,7 +269,7 @@ class Session(object): ds = [] for host, port in self.known_dht_nodes: d = reactor.resolve(host) - d.addCallback(match_port, port) + d.addCallback(lambda h: (h, port)) # match host to port ds.append(d) self.dht_node = self.dht_node_class( @@ -323,6 +320,7 @@ class Session(object): def _unset_upnp(self): log.info("Unsetting upnp for %s", self) + def threaded_unset_upnp(): u = miniupnpc.UPnP() num_devices_found = u.discover() diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index fea2f43bc..eeb3f6be7 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -44,7 +44,7 @@ def datetime_obj(*args, **kwargs): def call_later(delay, func, *args, **kwargs): - # Import here to ensure that it gets called after installing a reator + # Import here to ensure that it gets called after installing a reactor # see: http://twistedmatrix.com/documents/current/core/howto/choosing-reactor.html from twisted.internet import reactor return reactor.callLater(delay, func, *args, **kwargs) @@ -87,7 +87,7 @@ def obfuscate(plain): return base64.b64encode(plain).encode('rot13') -def check_connection(server="www.lbry.io", port=80): +def check_connection(server="lbry.io", port=80): """Attempts to open a socket to server:port and returns True if successful.""" try: log.debug('Checking connection to %s:%s', server, port) diff --git a/lbrynet/dht/constants.py b/lbrynet/dht/constants.py index 079fc9787..1cee46311 100644 --- a/lbrynet/dht/constants.py +++ b/lbrynet/dht/constants.py @@ -46,7 +46,10 @@ peer_request_timeout = 10 checkRefreshInterval = refreshTimeout / 5 #: Max size of a single UDP datagram, in bytes. If a message is larger than this, it will -#: be spread accross several UDP packets. +#: be spread across several UDP packets. udpDatagramMaxSize = 8192 # 8 KB -key_bits = 384 +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +h = get_lbry_hash_obj() +key_bits = h.digest_size * 8 # 384 bits diff --git a/lbrynet/dht/encoding.py b/lbrynet/dht/encoding.py index 55bb68fb7..4f7a068ac 100644 --- a/lbrynet/dht/encoding.py +++ b/lbrynet/dht/encoding.py @@ -7,6 +7,7 @@ # The docstrings in this module contain epytext markup; API documentation # may be created by processing this file with epydoc: http://epydoc.sf.net + class DecodeError(Exception): """ Should be raised by an C{Encoding} implementation if decode operation fails diff --git a/lbrynet/dht/hashwatcher.py b/lbrynet/dht/hashwatcher.py index f55185f82..ea1651cbc 100644 --- a/lbrynet/dht/hashwatcher.py +++ b/lbrynet/dht/hashwatcher.py @@ -2,8 +2,8 @@ from collections import Counter import datetime -class HashWatcher(): - def __init__(self, ttl=600): +class HashWatcher(object): + def __init__(self): self.ttl = 600 self.hashes = [] self.next_tick = None diff --git a/lbrynet/dht/msgtypes.py b/lbrynet/dht/msgtypes.py index b24f5ced4..cf0b4ef2c 100644 --- a/lbrynet/dht/msgtypes.py +++ b/lbrynet/dht/msgtypes.py @@ -9,7 +9,7 @@ import hashlib import random - +from lbrynet.core.utils import generate_id class Message(object): """ Base class for messages - all "unknown" messages use this class """ @@ -24,9 +24,7 @@ class RequestMessage(Message): def __init__(self, nodeID, method, methodArgs, rpcID=None): if rpcID == None: - hash = hashlib.sha384() - hash.update(str(random.getrandbits(255))) - rpcID = hash.digest() + rpcID = generate_id() Message.__init__(self, rpcID, nodeID) self.request = method self.args = methodArgs diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 0befa359b..65ce90bbe 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -6,11 +6,9 @@ # # The docstrings in this module contain epytext markup; API documentation # may be created by processing this file with epydoc: http://epydoc.sf.net -import argparse import binascii import hashlib import operator -import random import struct import time @@ -23,10 +21,13 @@ import protocol import twisted.internet.reactor import twisted.internet.threads import twisted.python.log + from contact import Contact from hashwatcher import HashWatcher import logging +from lbrynet.core.utils import generate_id + log = logging.getLogger(__name__) @@ -512,14 +513,14 @@ class Node(object): """ # Get the sender's ID (if any) if '_rpcNodeID' in kwargs: - rpcSenderID = kwargs['_rpcNodeID'] + rpc_sender_id = kwargs['_rpcNodeID'] else: - rpcSenderID = None - contacts = self._routingTable.findCloseNodes(key, constants.k, rpcSenderID) - contactTriples = [] + rpc_sender_id = None + contacts = self._routingTable.findCloseNodes(key, constants.k, rpc_sender_id) + contact_triples = [] for contact in contacts: - contactTriples.append((contact.id, contact.address, contact.port)) - return contactTriples + contact_triples.append((contact.id, contact.address, contact.port)) + return contact_triples @rpcmethod def findValue(self, key, **kwargs): @@ -536,8 +537,8 @@ class Node(object): if self._dataStore.hasPeersForBlob(key): rval = {key: self._dataStore.getPeersForBlob(key)} else: - contactTriples = self.findNode(key, **kwargs) - rval = {'contacts': contactTriples} + contact_triples = self.findNode(key, **kwargs) + rval = {'contacts': contact_triples} if '_rpcNodeContact' in kwargs: contact = kwargs['_rpcNodeContact'] compact_ip = contact.compact_ip() @@ -551,9 +552,7 @@ class Node(object): @return: A globally unique n-bit pseudo-random identifier @rtype: str """ - hash = hashlib.sha384() - hash.update(str(random.getrandbits(255))) - return hash.digest() + return generate_id() def _iterativeFind(self, key, startupShortlist=None, rpc='findNode'): """ The basic Kademlia iterative lookup operation (for nodes/values) @@ -583,11 +582,8 @@ class Node(object): return a list of the k closest nodes to the specified key @rtype: twisted.internet.defer.Deferred """ - if rpc != 'findNode': - findValue = True - else: - findValue = False - shortlist = [] + findValue = rpc != 'findNode' + if startupShortlist == None: shortlist = self._routingTable.findCloseNodes(key, constants.alpha) if key != self.id: @@ -903,30 +899,3 @@ class ExpensiveSort(object): def _removeValue(self): for item in self.to_sort: delattr(item, self.attr) - - -def main(): - parser = argparse.ArgumentParser(description="Launch a dht node") - parser.add_argument("udp_port", help="The UDP port on which the node will listen", - type=int) - parser.add_argument("known_node_ip", - help="The IP of a known node to be used to bootstrap into the network", - nargs='?') - parser.add_argument("known_node_port", - help="The port of a known node to be used to bootstrap into the network", - nargs='?', default=4000, type=int) - - args = parser.parse_args() - - if args.known_node_ip: - known_nodes = [(args.known_node_ip, args.known_node_port)] - else: - known_nodes = [] - - node = Node(udpPort=args.udp_port) - node.joinNetwork(known_nodes) - twisted.internet.reactor.run() - - -if __name__ == '__main__': - main() diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 66ceb5de4..d5348edd4 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -108,6 +108,8 @@ class KademliaProtocol(protocol.DatagramProtocol): msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) + log.debug("DHT SEND: %s(%s)", method, args) + df = defer.Deferred() if rawResponse: df._rpcRawResponse = True @@ -163,7 +165,7 @@ class KademliaProtocol(protocol.DatagramProtocol): self._handleRPC(remoteContact, message.id, message.request, message.args) elif isinstance(message, msgtypes.ResponseMessage): # Find the message that triggered this response - if self._sentMessages.has_key(message.id): + if message.id in self._sentMessages: # Cancel timeout timer for this RPC df, timeoutCall = self._sentMessages[message.id][1:3] timeoutCall.cancel() @@ -296,6 +298,7 @@ class KademliaProtocol(protocol.DatagramProtocol): func = getattr(self._node, method, None) if callable(func) and hasattr(func, 'rpcmethod'): # Call the exposed Node method and return the result to the deferred callback chain + log.debug("DHT RECV CALL %s with args %s", method, args) try: kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} result = func(*args, **kwargs) @@ -351,7 +354,7 @@ class KademliaProtocol(protocol.DatagramProtocol): Will only be called once, after all ports are disconnected. """ - log.info('Stopping dht') + log.info('Stopping DHT') for delayed_call in self._call_later_list.values(): try: delayed_call.cancel() @@ -363,3 +366,4 @@ class KademliaProtocol(protocol.DatagramProtocol): # exceptions.AttributeError: 'Port' object has no attribute 'socket' # to happen on shutdown # reactor.iterate() + log.info('DHT stopped') diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 3aa493119..1f6cca926 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -160,7 +160,6 @@ class TreeRoutingTable(RoutingTable): # be dropped, and the new contact added to the tail of # the k-bucket. This implementation follows section # 2.2 regarding this point. - headContact = self._buckets[bucketIndex]._contacts[0] def replaceContact(failure): """ Callback for the deferred PING RPC to see if the head @@ -181,8 +180,8 @@ class TreeRoutingTable(RoutingTable): self.addContact(contact) # Ping the least-recently seen contact in this k-bucket - headContact = self._buckets[bucketIndex]._contacts[0] - df = headContact.ping() + head_contact = self._buckets[bucketIndex]._contacts[0] + df = head_contact.ping() # If there's an error (i.e. timeout), remove the head # contact, and append the new one df.addErrback(replaceContact) diff --git a/scripts/dht_scripts.py b/scripts/dht_scripts.py index fd37763fc..657a5d7e0 100644 --- a/scripts/dht_scripts.py +++ b/scripts/dht_scripts.py @@ -1,11 +1,15 @@ -import binascii -import logging +from lbrynet.core import log_support + +import logging.handlers import sys +import traceback from lbrynet.dht.node import Node -from twisted.internet import reactor, task + +from twisted.internet import reactor, defer from lbrynet.core.utils import generate_id + log = logging.getLogger(__name__) @@ -13,90 +17,91 @@ def print_usage(): print "Usage:\n%s UDP_PORT KNOWN_NODE_IP KNOWN_NODE_PORT HASH" +@defer.inlineCallbacks def join_network(udp_port, known_nodes): lbryid = generate_id() - log.info('Creating Node') + log.info('Creating node') node = Node(udpPort=udp_port, lbryid=lbryid) log.info('Joining network') - d = node.joinNetwork(known_nodes) + yield node.joinNetwork(known_nodes) - def log_network_size(): + defer.returnValue(node) + + +@defer.inlineCallbacks +def get_hosts(node, h): + log.info("Looking up %s", h) + hosts = yield node.getPeersForBlob(h.decode("hex")) + log.info("Hosts returned from the DHT: %s", hosts) + + +@defer.inlineCallbacks +def announce_hash(node, h): + results = yield node.announceHaveBlob(h, 34567) + for success, result in results: + if success: + log.info("Succeeded: %s", str(result)) + else: + log.info("Failed: %s", str(result.getErrorMessage())) + + +# def get_args(): +# if len(sys.argv) < 5: +# print_usage() +# sys.exit(1) +# udp_port = int(sys.argv[1]) +# known_nodes = [(sys.argv[2], int(sys.argv[3]))] +# h = binascii.unhexlify(sys.argv[4]) +# return udp_port, known_nodes, h + + +@defer.inlineCallbacks +def connect(port=None): + try: + if port is None: + raise Exception("need a port") + known_nodes = [('54.236.227.82', 4444)] # lbrynet1 + node = yield join_network(port, known_nodes) + log.info("joined") + reactor.callLater(3, find, node) + except Exception: + log.error("CAUGHT EXCEPTION") + traceback.print_exc() + log.info("Stopping reactor") + yield reactor.stop() + + +@defer.inlineCallbacks +def find(node): + try: log.info("Approximate number of nodes in DHT: %s", str(node.getApproximateTotalDHTNodes())) log.info("Approximate number of blobs in DHT: %s", str(node.getApproximateTotalHashes())) - d.addCallback(lambda _: log_network_size()) + h = "578f5e82da7db97bfe0677826d452cc0c65406a8e986c9caa126af4ecdbf4913daad2f7f5d1fb0ffec17d0bf8f187f5a" + peersFake = yield node.getPeersForBlob(h.decode("hex")) + print peersFake + peers = yield node.getPeersForBlob(h.decode("hex")) + print peers - d.addCallback(lambda _: node) + # yield get_hosts(node, h) + except Exception: + log.error("CAUGHT EXCEPTION") + traceback.print_exc() - return d + log.info("Stopping reactor") + yield reactor.stop() -def get_hosts(node, h): - def print_hosts(hosts): - print "Hosts returned from the DHT: " - print hosts - log.info("Looking up %s", h) - d = node.getPeersForBlob(h) - d.addCallback(print_hosts) - return d - - -def announce_hash(node, h): - d = node.announceHaveBlob(h, 34567) - - def log_results(results): - for success, result in results: - if success: - log.info("Succeeded: %s", str(result)) - else: - log.info("Failed: %s", str(result.getErrorMessage())) - - d.addCallback(log_results) - return d - - -def get_args(): - if len(sys.argv) < 5: - print_usage() - sys.exit(1) - udp_port = int(sys.argv[1]) - known_nodes = [(sys.argv[2], int(sys.argv[3]))] - h = binascii.unhexlify(sys.argv[4]) - return udp_port, known_nodes, h - - -def run_dht_script(dht_func): - log_format = "(%(asctime)s)[%(filename)s:%(lineno)s] %(funcName)s(): %(message)s" - logging.basicConfig(level=logging.DEBUG, format=log_format) - - udp_port, known_nodes, h = get_args() - - d = task.deferLater(reactor, 0, join_network, udp_port, known_nodes) - - def run_dht_func(node): - return dht_func(node, h) - - d.addCallback(run_dht_func) - - def log_err(err): - log.error("An error occurred: %s", err.getTraceback()) - return err - - def shut_down(): - log.info("Shutting down") - reactor.stop() - - d.addErrback(log_err) - d.addBoth(lambda _: shut_down()) +def main(): + log_support.configure_console(level='DEBUG') + log_support.configure_twisted() + reactor.callLater(0, connect, port=10001) + log.info("Running reactor") reactor.run() -def get_hosts_for_hash_in_dht(): - run_dht_script(get_hosts) - - -def announce_hash_to_dht(): - run_dht_script(announce_hash) +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/dhttest.py b/scripts/dhttest.py index 8d648f036..a188030dc 100644 --- a/scripts/dhttest.py +++ b/scripts/dhttest.py @@ -22,18 +22,18 @@ import binascii -import hashlib import random import twisted.internet.reactor from lbrynet.dht.node import Node +from lbrynet.core.cryptoutils import get_lbry_hash_obj # The Entangled DHT node; instantiated in the main() method node = None # The key to use for this example when storing/retrieving data -hash = hashlib.sha384() -hash.update("key") -KEY = hash.digest() +h = get_lbry_hash_obj() +h.update("key") +KEY = h.digest() # The value to store VALUE = random.randint(10000, 20000)