mirror of
https://github.com/LBRYFoundation/lbry-sdk.git
synced 2025-08-23 17:27:25 +00:00
Merge branch 'pylint'
This commit is contained in:
commit
ce181f2896
12 changed files with 14 additions and 520 deletions
14
.pylintrc
14
.pylintrc
|
@ -34,6 +34,8 @@ unsafe-load-any-extension=no
|
||||||
# be loaded. Extensions are loading into the active Python interpreter and may
|
# be loaded. Extensions are loading into the active Python interpreter and may
|
||||||
# run arbitrary code
|
# run arbitrary code
|
||||||
extension-pkg-whitelist=
|
extension-pkg-whitelist=
|
||||||
|
miniupnpc,
|
||||||
|
unqlite
|
||||||
|
|
||||||
# Allow optimization of some AST trees. This will activate a peephole AST
|
# Allow optimization of some AST trees. This will activate a peephole AST
|
||||||
# optimizer, which will apply various small optimizations. For instance, it can
|
# optimizer, which will apply various small optimizations. For instance, it can
|
||||||
|
@ -74,10 +76,8 @@ disable=
|
||||||
broad-except,
|
broad-except,
|
||||||
cell-var-from-loop,
|
cell-var-from-loop,
|
||||||
consider-iterating-dictionary,
|
consider-iterating-dictionary,
|
||||||
cyclic-import,
|
|
||||||
dangerous-default-value,
|
dangerous-default-value,
|
||||||
duplicate-code,
|
duplicate-code,
|
||||||
exec-used,
|
|
||||||
fixme,
|
fixme,
|
||||||
global-statement,
|
global-statement,
|
||||||
inherit-non-class,
|
inherit-non-class,
|
||||||
|
@ -86,12 +86,10 @@ disable=
|
||||||
locally-disabled,
|
locally-disabled,
|
||||||
logging-not-lazy,
|
logging-not-lazy,
|
||||||
missing-docstring,
|
missing-docstring,
|
||||||
multiple-imports,
|
|
||||||
no-else-return,
|
no-else-return,
|
||||||
no-init,
|
no-init,
|
||||||
no-member,
|
no-member,
|
||||||
no-self-use,
|
no-self-use,
|
||||||
not-context-manager,
|
|
||||||
protected-access,
|
protected-access,
|
||||||
redefined-builtin,
|
redefined-builtin,
|
||||||
redefined-outer-name,
|
redefined-outer-name,
|
||||||
|
@ -117,7 +115,13 @@ disable=
|
||||||
unused-variable,
|
unused-variable,
|
||||||
wildcard-import,
|
wildcard-import,
|
||||||
wrong-import-order,
|
wrong-import-order,
|
||||||
wrong-import-position
|
wrong-import-position,
|
||||||
|
deprecated-lambda,
|
||||||
|
simplifiable-if-statement,
|
||||||
|
unidiomatic-typecheck,
|
||||||
|
global-at-module-level,
|
||||||
|
inconsistent-return-statements,
|
||||||
|
keyword-arg-before-vararg
|
||||||
|
|
||||||
|
|
||||||
[REPORTS]
|
[REPORTS]
|
||||||
|
|
|
@ -52,7 +52,7 @@ class DiskBlobManager(DHTHashSupplier):
|
||||||
blob that is already on the hard disk
|
blob that is already on the hard disk
|
||||||
"""
|
"""
|
||||||
if length is not None and not isinstance(length, int):
|
if length is not None and not isinstance(length, int):
|
||||||
raise Exception("invalid length type: %s (%s)", length, str(type(length)))
|
raise Exception("invalid length type: %s (%s)" % (length, str(type(length))))
|
||||||
if blob_hash in self.blobs:
|
if blob_hash in self.blobs:
|
||||||
return defer.succeed(self.blobs[blob_hash])
|
return defer.succeed(self.blobs[blob_hash])
|
||||||
return self._make_new_blob(blob_hash, length)
|
return self._make_new_blob(blob_hash, length)
|
||||||
|
|
|
@ -314,7 +314,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for"
|
log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for"
|
||||||
" more details.", self.peer_port)
|
" more details.", self.peer_port)
|
||||||
log.error("%s", traceback.format_exc())
|
log.error("%s", traceback.format_exc())
|
||||||
raise ValueError("%s lbrynet may already be running on your computer.", str(e))
|
raise ValueError("%s lbrynet may already be running on your computer." % str(e))
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
def _start_reflector(self):
|
def _start_reflector(self):
|
||||||
|
|
|
@ -73,7 +73,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||||
self.completed = True
|
self.completed = True
|
||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
else:
|
else:
|
||||||
raise Exception("Unknown status for stream %s: %s", self.stream_hash, status)
|
raise Exception("Unknown status for stream %s: %s" % (self.stream_hash, status))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def stop(self, err=None, change_status=True):
|
def stop(self, err=None, change_status=True):
|
||||||
|
|
|
@ -67,4 +67,3 @@ the client disconnects.
|
||||||
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
|
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
|
||||||
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory
|
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory
|
||||||
from lbrynet.reflector.client.blob import BlobReflectorClientFactory as BlobClientFactory
|
from lbrynet.reflector.client.blob import BlobReflectorClientFactory as BlobClientFactory
|
||||||
from lbrynet.reflector import reupload
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
import ctypes, sys
|
import sys
|
||||||
|
import ctypes
|
||||||
from ctypes import windll, wintypes
|
from ctypes import windll, wintypes
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
import logging
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def getNames(wallet, names=None):
|
|
||||||
if names:
|
|
||||||
defer.returnValue(names)
|
|
||||||
nametrie = yield wallet.get_nametrie()
|
|
||||||
defer.returnValue(list(getNameClaims(nametrie)))
|
|
||||||
|
|
||||||
|
|
||||||
def getNameClaims(trie):
|
|
||||||
for x in trie:
|
|
||||||
if 'txid' in x:
|
|
||||||
try:
|
|
||||||
yield str(x['name'])
|
|
||||||
except UnicodeError:
|
|
||||||
log.warning('Skippin name %s as it is not ascii', x['name'])
|
|
|
@ -1,59 +0,0 @@
|
||||||
import logging
|
|
||||||
import random
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
from twisted.internet import reactor
|
|
||||||
|
|
||||||
from lbrynet.core import Error
|
|
||||||
from lbrynet.core import StreamDescriptor
|
|
||||||
from lbrynet.metadata import Metadata
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class Name(object):
|
|
||||||
def __init__(self, name):
|
|
||||||
self.name = name
|
|
||||||
self.sd_hash = None
|
|
||||||
self.sd_blob = None
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def setSdHash(self, wallet):
|
|
||||||
try:
|
|
||||||
stream = yield wallet.get_stream_info_for_name(self.name)
|
|
||||||
metadata = Metadata.Metadata(stream)
|
|
||||||
self.sd_hash = _getSdHash(metadata)
|
|
||||||
except (Error.InvalidStreamInfoError, AssertionError):
|
|
||||||
pass
|
|
||||||
except Exception:
|
|
||||||
log.exception('What happened')
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def download_sd_blob(self, session):
|
|
||||||
print('Trying to get sd_blob for {} using {}'.format(self.name, self.sd_hash))
|
|
||||||
try:
|
|
||||||
blob = yield download_sd_blob_with_timeout(
|
|
||||||
session, self.sd_hash, session.payment_rate_manager)
|
|
||||||
|
|
||||||
self.sd_blob = blob
|
|
||||||
yield self._after_download(blob)
|
|
||||||
print('Downloaded sd_blob for {} using {}'.format(self.name, self.sd_hash))
|
|
||||||
except defer.TimeoutError:
|
|
||||||
print('Downloading sd_blob for {} timed-out'.format(self.name))
|
|
||||||
# swallow errors from the timeout
|
|
||||||
pass
|
|
||||||
except Exception:
|
|
||||||
log.exception('Failed to download {}'.format(self.name))
|
|
||||||
|
|
||||||
def _after_download(self, blob):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _getSdHash(metadata):
|
|
||||||
return metadata['sources']['lbry_sd_hash']
|
|
||||||
|
|
||||||
|
|
||||||
def download_sd_blob_with_timeout(session, sd_hash, payment_rate_manager):
|
|
||||||
d = StreamDescriptor.download_sd_blob(session, sd_hash, payment_rate_manager)
|
|
||||||
d.addTimeout(random.randint(10, 30), reactor)
|
|
||||||
return d
|
|
|
@ -1,50 +0,0 @@
|
||||||
import itertools
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class DeferredPool(defer.Deferred):
|
|
||||||
def __init__(self, deferred_iter, pool_size):
|
|
||||||
self.deferred_iter = deferred_iter
|
|
||||||
self.pool_size = pool_size
|
|
||||||
# results are stored unordered
|
|
||||||
self.result_list = []
|
|
||||||
self.started_count = 0
|
|
||||||
self.total_count = None
|
|
||||||
defer.Deferred.__init__(self)
|
|
||||||
|
|
||||||
for deferred in itertools.islice(deferred_iter, pool_size):
|
|
||||||
self._start_one(deferred)
|
|
||||||
|
|
||||||
def _start_one(self, deferred):
|
|
||||||
deferred.addCallbacks(self._callback, self._callback,
|
|
||||||
callbackArgs=(self.started_count, defer.SUCCESS),
|
|
||||||
errbackArgs=(self.started_count, defer.FAILURE))
|
|
||||||
self.started_count += 1
|
|
||||||
|
|
||||||
def _callback(self, result, index, success):
|
|
||||||
self.result_list.append((index, success, result))
|
|
||||||
if self._done():
|
|
||||||
self._finish()
|
|
||||||
else:
|
|
||||||
self._process_next()
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _done(self):
|
|
||||||
return self.total_count == len(self.result_list)
|
|
||||||
|
|
||||||
def _finish(self):
|
|
||||||
result_list = [(s, r) for i, s, r in sorted(self.result_list)]
|
|
||||||
self.callback(result_list)
|
|
||||||
|
|
||||||
def _process_next(self):
|
|
||||||
try:
|
|
||||||
deferred = next(self.deferred_iter)
|
|
||||||
except StopIteration:
|
|
||||||
self.total_count = self.started_count
|
|
||||||
else:
|
|
||||||
self._start_one(deferred)
|
|
|
@ -1,174 +0,0 @@
|
||||||
from __future__ import print_function
|
|
||||||
from lbrynet.core import log_support
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import collections
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import shutil
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
from twisted.internet import reactor
|
|
||||||
|
|
||||||
from lbrynet import analytics
|
|
||||||
from lbrynet import conf
|
|
||||||
from lbrynet.core import Wallet
|
|
||||||
from lbrynet.core import BlobAvailability
|
|
||||||
from lbrynet.core import Session
|
|
||||||
from lbrynet.core import utils
|
|
||||||
|
|
||||||
import common
|
|
||||||
import name
|
|
||||||
import pool
|
|
||||||
import track
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger()
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
|
||||||
conf.initialize_settings()
|
|
||||||
parser = argparse.ArgumentParser()
|
|
||||||
parser.add_argument('--limit', type=int)
|
|
||||||
parser.add_argument('--download', action='store_true',
|
|
||||||
help='Set flag to also download each sd_blob and report on success')
|
|
||||||
args = parser.parse_args(args)
|
|
||||||
|
|
||||||
log_support.configure_console()
|
|
||||||
log_support.configure_twisted()
|
|
||||||
|
|
||||||
# make a fresh dir or else we will include blobs that we've
|
|
||||||
# already downloaded but might not otherwise be available.
|
|
||||||
db_dir = tempfile.mkdtemp()
|
|
||||||
try:
|
|
||||||
blob_dir = os.path.join(db_dir, 'blobfiles')
|
|
||||||
os.makedirs(blob_dir)
|
|
||||||
storage = Wallet.InMemoryStorage()
|
|
||||||
wallet = Wallet.LBRYumWallet(storage)
|
|
||||||
session = Session.Session(
|
|
||||||
0,
|
|
||||||
db_dir=db_dir,
|
|
||||||
node_id=utils.generate_id(),
|
|
||||||
blob_dir=blob_dir,
|
|
||||||
dht_node_port=4444,
|
|
||||||
known_dht_nodes=conf.settings['known_dht_nodes'],
|
|
||||||
peer_port=3333,
|
|
||||||
use_upnp=False,
|
|
||||||
wallet=wallet
|
|
||||||
)
|
|
||||||
api = analytics.Api.new_instance(conf.settings['share_usage_data'])
|
|
||||||
run(args, session, api)
|
|
||||||
reactor.run()
|
|
||||||
finally:
|
|
||||||
shutil.rmtree(db_dir)
|
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def run(args, session, api):
|
|
||||||
try:
|
|
||||||
yield session.setup()
|
|
||||||
names = yield common.getNames(session.wallet)
|
|
||||||
if args.limit and len(names) > args.limit:
|
|
||||||
names = random.sample(list(names), args.limit)
|
|
||||||
names = [Name(n) for n in names]
|
|
||||||
blob_tracker = BlobAvailability.BlobAvailabilityTracker(
|
|
||||||
session.blob_manager, session.peer_finder, session.dht_node)
|
|
||||||
|
|
||||||
tracker = yield Tracker(session, names, blob_tracker)
|
|
||||||
yield tracker.processNameClaims(args.download)
|
|
||||||
event = makeEvent(tracker.stats)
|
|
||||||
if args.download and not args.limit:
|
|
||||||
api.track(event)
|
|
||||||
else:
|
|
||||||
# don't send event to analytics if it doesn't contain the full info
|
|
||||||
print(event)
|
|
||||||
except Exception:
|
|
||||||
log.exception('Something bad happened')
|
|
||||||
finally:
|
|
||||||
reactor.stop()
|
|
||||||
|
|
||||||
|
|
||||||
class Tracker(track.Tracker):
|
|
||||||
def __init__(self, session, names, blob_tracker):
|
|
||||||
track.Tracker.__init__(self, session, names)
|
|
||||||
self.blob_tracker = blob_tracker
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def processNameClaims(self, download=False):
|
|
||||||
try:
|
|
||||||
yield self._getSdHashes()
|
|
||||||
yield self._filterNames('sd_hash')
|
|
||||||
yield self._checkAvailability()
|
|
||||||
yield self._filterNames('is_available')
|
|
||||||
yield self.print_attempts_counter()
|
|
||||||
if download:
|
|
||||||
yield self._downloadAllBlobs()
|
|
||||||
yield self._filterNames('sd_blob')
|
|
||||||
except Exception:
|
|
||||||
log.exception('Something bad happened')
|
|
||||||
|
|
||||||
def print_attempts_counter(self):
|
|
||||||
print(self.attempts_counter)
|
|
||||||
|
|
||||||
def attempts_counter(self):
|
|
||||||
return collections.Counter([n.availability_attempts for n in self.names])
|
|
||||||
|
|
||||||
def _checkAvailability(self):
|
|
||||||
return pool.DeferredPool(
|
|
||||||
(n.check_availability(self.blob_tracker) for n in self.names),
|
|
||||||
10
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Name(name.Name):
|
|
||||||
# From experience, very few sd_blobs get found after the third attempt
|
|
||||||
MAX_ATTEMPTS = 6
|
|
||||||
def __init__(self, my_name):
|
|
||||||
name.Name.__init__(self, my_name)
|
|
||||||
self.is_available = None
|
|
||||||
self.availability_attempts = 0
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _check_availability(self, blob_tracker):
|
|
||||||
b = yield blob_tracker.get_blob_availability(self.sd_hash)
|
|
||||||
peer_count = b[self.sd_hash]
|
|
||||||
self._setAvailable(peer_count)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def check_availability(self, blob_tracker):
|
|
||||||
while not self.is_available and self.availability_attempts < self.MAX_ATTEMPTS:
|
|
||||||
self.availability_attempts += 1
|
|
||||||
log.info('Attempt %s to find %s', self.availability_attempts, self.name)
|
|
||||||
yield self._check_availability(blob_tracker)
|
|
||||||
|
|
||||||
def _setAvailable(self, peer_count):
|
|
||||||
self.is_available = peer_count > 0
|
|
||||||
|
|
||||||
|
|
||||||
def makeEvent(stats):
|
|
||||||
return {
|
|
||||||
'userId': 'lbry',
|
|
||||||
'event': 'Content Availability',
|
|
||||||
'properties': {
|
|
||||||
'total_published': stats['sd_hash'],
|
|
||||||
'sd_blob_available_on_dht': stats['is_available'],
|
|
||||||
'sd_blob_available_for_download': stats['sd_blob'],
|
|
||||||
},
|
|
||||||
'context': {
|
|
||||||
'app': {
|
|
||||||
'name': 'Availability Tracker',
|
|
||||||
'version': 1,
|
|
||||||
},
|
|
||||||
'library': {
|
|
||||||
'name': 'lbrynet-analytics',
|
|
||||||
'version': '1.0.0'
|
|
||||||
},
|
|
||||||
},
|
|
||||||
'timestamp': utils.isonow()
|
|
||||||
}
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
sys.exit(main())
|
|
|
@ -1,163 +0,0 @@
|
||||||
from __future__ import print_function
|
|
||||||
from lbrynet.core import log_support
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import sys
|
|
||||||
|
|
||||||
import appdirs
|
|
||||||
from twisted.internet import defer
|
|
||||||
from twisted.internet import reactor
|
|
||||||
|
|
||||||
from lbrynet import conf
|
|
||||||
from lbrynet.core import Wallet
|
|
||||||
from lbrynet.core import BlobManager
|
|
||||||
from lbrynet.core import Session
|
|
||||||
from lbrynet.core import utils
|
|
||||||
|
|
||||||
from lbrynet import reflector
|
|
||||||
|
|
||||||
import name
|
|
||||||
import track
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger('main')
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
|
||||||
conf.initialize_settings()
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
|
||||||
parser.add_argument('destination', type=conf.server_port, nargs='+')
|
|
||||||
parser.add_argument('--names', nargs='*')
|
|
||||||
parser.add_argument('--limit', type=int)
|
|
||||||
args = parser.parse_args(args)
|
|
||||||
|
|
||||||
log_support.configure_console(level='INFO')
|
|
||||||
|
|
||||||
db_dir = appdirs.user_data_dir('lighthouse-uploader')
|
|
||||||
safe_makedirs(db_dir)
|
|
||||||
# no need to persist metadata info
|
|
||||||
storage = Wallet.InMemoryStorage()
|
|
||||||
wallet = Wallet.LBRYumWallet(storage)
|
|
||||||
blob_dir = os.path.join(db_dir, 'blobfiles')
|
|
||||||
safe_makedirs(blob_dir)
|
|
||||||
# Don't set a hash_announcer, we have no need to tell anyone we
|
|
||||||
# have these blobs
|
|
||||||
blob_manager = BlobManager.DiskBlobManager(None, blob_dir, db_dir)
|
|
||||||
# TODO: make it so that I can disable the BlobAvailabilityTracker
|
|
||||||
# or, in general, make the session more reusable for users
|
|
||||||
# that only want part of the functionality
|
|
||||||
session = Session.Session(
|
|
||||||
blob_data_payment_rate=0,
|
|
||||||
db_dir=db_dir,
|
|
||||||
node_id=utils.generate_id(),
|
|
||||||
blob_dir=blob_dir,
|
|
||||||
dht_node_port=4444,
|
|
||||||
known_dht_nodes=conf.settings['known_dht_nodes'],
|
|
||||||
peer_port=3333,
|
|
||||||
use_upnp=False,
|
|
||||||
wallet=wallet,
|
|
||||||
blob_manager=blob_manager,
|
|
||||||
)
|
|
||||||
assert session.wallet
|
|
||||||
run(session, args.destination, args.names, args.limit)
|
|
||||||
reactor.run()
|
|
||||||
|
|
||||||
|
|
||||||
def safe_makedirs(directory):
|
|
||||||
try:
|
|
||||||
os.makedirs(directory)
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def run(session, destinations, names, limit):
|
|
||||||
try:
|
|
||||||
yield session.setup()
|
|
||||||
while not session.wallet.network.is_connected():
|
|
||||||
log.info('Retrying wallet startup')
|
|
||||||
try:
|
|
||||||
yield session.wallet.start()
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
names = yield getNames(session.wallet, names)
|
|
||||||
if limit and limit < len(names):
|
|
||||||
names = random.sample(names, limit)
|
|
||||||
log.info('Processing %s names', len(names))
|
|
||||||
names = [Name(n, session.blob_manager) for n in names]
|
|
||||||
t = Tracker(session, destinations, names)
|
|
||||||
yield t.processNameClaims()
|
|
||||||
except Exception:
|
|
||||||
log.exception('Something bad happened')
|
|
||||||
finally:
|
|
||||||
log.warning('We are stopping the reactor gracefully')
|
|
||||||
reactor.stop()
|
|
||||||
|
|
||||||
|
|
||||||
def logAndStop(err):
|
|
||||||
log_support.failure(err, log, 'This sucks: %s')
|
|
||||||
reactor.stop()
|
|
||||||
|
|
||||||
|
|
||||||
def logAndRaise(err):
|
|
||||||
log_support.failure(err, log, 'This still sucks: %s')
|
|
||||||
return err
|
|
||||||
|
|
||||||
|
|
||||||
class Tracker(track.Tracker):
|
|
||||||
def __init__(self, session, destinations, names):
|
|
||||||
self.destinations = destinations
|
|
||||||
track.Tracker.__init__(self, session, names)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def blob_manager(self):
|
|
||||||
return self.session.blob_manager
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def processNameClaims(self):
|
|
||||||
yield super(Tracker, self).processNameClaims()
|
|
||||||
log.info('Sending the blobs')
|
|
||||||
yield self._sendSdBlobs()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _sendSdBlobs(self):
|
|
||||||
blobs = [n.sd_blob for n in self.names if n.sd_blob]
|
|
||||||
log.info('Sending %s blobs', len(blobs))
|
|
||||||
blob_hashes = [b.blob_hash for b in blobs]
|
|
||||||
for destination in self.destinations:
|
|
||||||
factory = reflector.BlobClientFactory(self.blob_manager, blob_hashes)
|
|
||||||
yield self._connect(destination, factory)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _connect(self, destination, factory):
|
|
||||||
url, port = destination
|
|
||||||
ip = yield reactor.resolve(url)
|
|
||||||
try:
|
|
||||||
print('Connecting to {}'.format(ip))
|
|
||||||
yield reactor.connectTCP(ip, port, factory)
|
|
||||||
#factory.finished_deferred.addTimeout(60, reactor)
|
|
||||||
value = yield factory.finished_deferred
|
|
||||||
if value:
|
|
||||||
print('Success!')
|
|
||||||
else:
|
|
||||||
print('Not success?', value)
|
|
||||||
except Exception:
|
|
||||||
log.exception('Somehow failed to send blobs')
|
|
||||||
|
|
||||||
|
|
||||||
class Name(name.Name):
|
|
||||||
def __init__(self, my_name, blob_manager):
|
|
||||||
name.Name.__init__(self, my_name)
|
|
||||||
self.blob_manager = blob_manager
|
|
||||||
|
|
||||||
def _after_download(self, blob):
|
|
||||||
# keep the blob for future runs
|
|
||||||
self.blob_manager.blob_completed(blob)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
sys.exit(main())
|
|
|
@ -1,41 +0,0 @@
|
||||||
import logging
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
import pool
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class Tracker(object):
|
|
||||||
def __init__(self, session, names):
|
|
||||||
self.session = session
|
|
||||||
self.names = names
|
|
||||||
self.stats = {}
|
|
||||||
|
|
||||||
@property
|
|
||||||
def wallet(self):
|
|
||||||
return self.session.wallet
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def processNameClaims(self):
|
|
||||||
try:
|
|
||||||
log.info('Starting to get name claims')
|
|
||||||
yield self._getSdHashes()
|
|
||||||
self._filterNames('sd_hash')
|
|
||||||
log.info('Downloading all of the blobs')
|
|
||||||
yield self._downloadAllBlobs()
|
|
||||||
except Exception:
|
|
||||||
log.exception('Something bad happened')
|
|
||||||
|
|
||||||
def _getSdHashes(self):
|
|
||||||
return pool.DeferredPool((n.setSdHash(self.wallet) for n in self.names), 10)
|
|
||||||
|
|
||||||
def _filterNames(self, attr):
|
|
||||||
self.names = [n for n in self.names if getattr(n, attr)]
|
|
||||||
self.stats[attr] = len(self.names)
|
|
||||||
print("We have {} names with attribute {}".format(len(self.names), attr))
|
|
||||||
|
|
||||||
def _downloadAllBlobs(self):
|
|
||||||
return pool.DeferredPool((n.download_sd_blob(self.session) for n in self.names), 10)
|
|
Loading…
Add table
Reference in a new issue