diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index d0a37f403..8db8b4d05 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -1,16 +1,15 @@ import logging import os from sqlite3 import IntegrityError -from twisted.internet import threads, defer, reactor, task +from twisted.internet import threads, defer, task from lbrynet import conf from lbrynet.blob.blob_file import BlobFile from lbrynet.blob.creator import BlobFileCreator -from lbrynet.dht.hashannouncer import DHTHashSupplier log = logging.getLogger(__name__) -class DiskBlobManager(DHTHashSupplier): +class DiskBlobManager(object): def __init__(self, hash_announcer, blob_dir, storage): """ @@ -18,8 +17,7 @@ class DiskBlobManager(DHTHashSupplier): blob_dir - directory where blobs are stored db_dir - directory where sqlite database of blob information is stored """ - - DHTHashSupplier.__init__(self, hash_announcer) + self.hash_announcer = hash_announcer self.storage = storage self.announce_head_blobs_only = conf.settings['announce_head_blobs_only'] self.blob_dir = blob_dir @@ -70,14 +68,14 @@ class DiskBlobManager(DHTHashSupplier): @defer.inlineCallbacks def blob_completed(self, blob, next_announce_time=None, should_announce=True): if next_announce_time is None: - next_announce_time = self.get_next_announce_time() + next_announce_time = self.hash_announcer.get_next_announce_time() yield self.storage.add_completed_blob( blob.blob_hash, blob.length, next_announce_time, should_announce ) # we announce all blobs immediately, if announce_head_blob_only is False # otherwise, announce only if marked as should_announce if not self.announce_head_blobs_only or should_announce: - reactor.callLater(0, self.immediate_announce, [blob.blob_hash]) + self.immediate_announce([blob.blob_hash]) def completed_blobs(self, blobhashes_to_check): return self._completed_blobs(blobhashes_to_check) @@ -93,7 +91,7 @@ class DiskBlobManager(DHTHashSupplier): blob = self.blobs[blob_hash] if blob.get_is_verified(): return self.storage.set_should_announce( - blob_hash, self.get_next_announce_time(), should_announce + blob_hash, self.hash_announcer.get_next_announce_time(), should_announce ) return defer.succeed(False) @@ -110,7 +108,7 @@ class DiskBlobManager(DHTHashSupplier): raise Exception("Blob has a length of 0") new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length) self.blobs[blob_creator.blob_hash] = new_blob - next_announce_time = self.get_next_announce_time() + next_announce_time = self.hash_announcer.get_next_announce_time() return self.blob_completed(new_blob, next_announce_time, should_announce) def immediate_announce_all_blobs(self): diff --git a/lbrynet/dht/hashannouncer.py b/lbrynet/dht/hashannouncer.py index 5338a9e7c..a1533947a 100644 --- a/lbrynet/dht/hashannouncer.py +++ b/lbrynet/dht/hashannouncer.py @@ -20,27 +20,32 @@ class DummyHashAnnouncer(object): def stop(self): pass - def add_supplier(self, supplier): - pass - def hash_queue_size(self): return 0 def immediate_announce(self, blob_hashes): pass + def get_next_announce_time(self): + return 0 + class DHTHashAnnouncer(DummyHashAnnouncer): ANNOUNCE_CHECK_INTERVAL = 60 CONCURRENT_ANNOUNCERS = 5 + # 1 hour is the min time hash will be reannounced + MIN_HASH_REANNOUNCE_TIME = 60 * 60 + # conservative assumption of the time it takes to announce + # a single hash + DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION = 1 + """This class announces to the DHT that this peer has certain blobs""" STORE_RETRIES = 3 - def __init__(self, dht_node, peer_port): + def __init__(self, dht_node): self.dht_node = dht_node - self.peer_port = peer_port - self.supplier = None + self.peer_port = dht_node.peerPort self.next_manage_call = None self.hash_queue = collections.deque() self._concurrent_announcers = 0 @@ -49,6 +54,8 @@ class DHTHashAnnouncer(DummyHashAnnouncer): self._lock = utils.DeferredLockContextManager(defer.DeferredLock()) self._last_checked = dht_node.clock.seconds(), self.CONCURRENT_ANNOUNCERS self._total = None + self.single_hash_announce_duration = self.DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION + self._hashes_to_announce = [] def run_manage_loop(self): log.info("Starting hash announcer") @@ -79,10 +86,7 @@ class DHTHashAnnouncer(DummyHashAnnouncer): def stop(self): log.info("Stopping DHT hash announcer.") if self._manage_call_lc.running: - self._manage_call_lc.stop() - - def add_supplier(self, supplier): - self.supplier = supplier + return self._manage_call_lc.stop() def immediate_announce(self, blob_hashes): if self.peer_port is not None: @@ -96,9 +100,8 @@ class DHTHashAnnouncer(DummyHashAnnouncer): @defer.inlineCallbacks def _announce_available_hashes(self): log.debug('Announcing available hashes') - if self.supplier: - hashes = yield self.supplier.hashes_to_announce() - yield self._announce_hashes(hashes) + hashes = yield self.hashes_to_announce() + yield self._announce_hashes(hashes) @defer.inlineCallbacks def _announce_hashes(self, hashes, immediate=False): @@ -180,24 +183,20 @@ class DHTHashAnnouncer(DummyHashAnnouncer): self.set_single_hash_announce_duration(seconds_per_blob) defer.returnValue(stored_to) + @defer.inlineCallbacks + def add_hashes_to_announce(self, blob_hashes): + yield self._lock._lock.acquire() + self._hashes_to_announce.extend(blob_hashes) + yield self._lock._lock.release() -class DHTHashSupplier(object): - # 1 hour is the min time hash will be reannounced - MIN_HASH_REANNOUNCE_TIME = 60 * 60 - # conservative assumption of the time it takes to announce - # a single hash - DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION = 1 - - """Classes derived from this class give hashes to a hash announcer""" - - def __init__(self, announcer): - if announcer is not None: - announcer.add_supplier(self) - self.hash_announcer = announcer - self.single_hash_announce_duration = self.DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION - + @defer.inlineCallbacks def hashes_to_announce(self): - pass + hashes_to_announce = [] + yield self._lock._lock.acquire() + while self._hashes_to_announce: + hashes_to_announce.append(self._hashes_to_announce.pop()) + yield self._lock._lock.release() + defer.returnValue(hashes_to_announce) def set_single_hash_announce_duration(self, seconds): """ @@ -221,7 +220,7 @@ class DHTHashSupplier(object): Returns: timestamp for next announce time """ - queue_size = self.hash_announcer.hash_queue_size() + num_hashes_to_announce + queue_size = self.hash_queue_size() + num_hashes_to_announce reannounce = max(self.MIN_HASH_REANNOUNCE_TIME, queue_size * self.single_hash_announce_duration) - return time.time() + reannounce + return self.dht_node.clock.seconds() + reannounce diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c5cc27678..3150a0226 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -138,9 +138,9 @@ class Node(object): # will be used later self._can_store = True - self.peer_manager = PeerManager() - self.peer_finder = DHTPeerFinder(self, self.peer_manager) - self.hash_announcer = DHTHashAnnouncer(self, self.port) + self.peer_manager = peer_manager or PeerManager() + self.peer_finder = peer_finder or DHTPeerFinder(self, self.peer_manager) + self.hash_announcer = hash_announcer or DHTHashAnnouncer(self) def __del__(self): log.warning("unclean shutdown of the dht node")