From a1e827fb3b4069c34f2f6767a518fc29f2d5b94f Mon Sep 17 00:00:00 2001 From: Jack Date: Tue, 10 Nov 2015 11:33:35 -0500 Subject: [PATCH] LBRY autodownloader Aquire LBRY files as they are published --- lbrynet/lbrynet_console/ControlHandlers.py | 220 ++++++++++++++++++++- lbrynet/lbrynet_console/LBRYConsole.py | 9 +- 2 files changed, 227 insertions(+), 2 deletions(-) diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index c1754b9e2..0e9335f6c 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -1,4 +1,8 @@ +import json import logging +from time import sleep + +from bitcoinrpc.authproxy import AuthServiceProxy from zope.interface import implements #from lbrynet.core.StreamDescriptor import PlainStreamDescriptorWriter, BlobStreamDescriptorWriter from lbrynet.core.PaymentRateManager import PaymentRateManager @@ -2027,6 +2031,9 @@ class ImmediateAnnounceAllBlobs(CommandHandler): class ImmediateAnnounceAllBlobsFactory(CommandHandlerFactory): control_handler_class = ImmediateAnnounceAllBlobs + command = "announce-blobs" + full_help = "Immediately re-broadcast all hashes associated with the server to " \ + "the distributed hash table." class ModifyServerSettings(RecursiveCommandHandler): @@ -2349,4 +2356,215 @@ class StatusFactory(CommandHandlerFactory): "Show the list of files that are currently downloading " \ "or have been downloaded, and give the option to " \ "toggle whether the file is actively downloading or " \ - "to remove the file." \ No newline at end of file + "to remove the file." + + +class AutoAddStream(object): + def __init__(self, sd_identifier, base_payment_rate_manager, lbry_file_manager): + self.lbry_file_manager = lbry_file_manager + self.sd_identifier = sd_identifier + self.loading_metadata_deferred = None + self.finished_deferred = defer.Deferred() + self.metadata = None + self.downloader = None + self.loading_failed = False + self.payment_rate_manager = PaymentRateManager(base_payment_rate_manager) + + def start(self): + self.loading_metadata_deferred.addCallback(self._handle_metadata) + self.loading_metadata_deferred.addErrback(self._handle_load_canceled) + self.loading_metadata_deferred.addErrback(self._handle_load_failed) + self._start_download() + + def _load_metadata(self, sd_file): + return defer.fail(NotImplementedError()) + + def _handle_load_canceled(self, err): + err.trap(defer.CancelledError) + self.finished_deferred.callback(None) + + def _handle_load_failed(self, err): + self.loading_failed = True + log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback()) + self.finished_deferred.callback(None) + + def _handle_metadata(self, metadata): + self.loading_metadata_deferred = None + self.metadata = metadata + + def _start_download(self): + d = self._make_downloader() + d.addCallback(lambda stream_downloader: stream_downloader.start()) + d.addErrback(self._handle_download_error) + return d + + def _handle_download_error(self, err): + if err.check(InsufficientFundsError): + print "Download stopped due to insufficient funds." + else: + print "An unexpected error has caused the download to stop: %s" % err.getTraceback() + + #TODO fix this, shouldn't be a LBRYFileManager object, it doesn't have a make_downloader method + def _make_downloader(self): + return self.lbry_file_manager.make_downloader(self.metadata, [0.5, True], self.payment_rate_manager) + + +class AutoAddStreamFromHash(AutoAddStream): + def __init__(self, sd_identifier, session, lbry_file_manager): + AutoAddStream.__init__(self, sd_identifier, session.base_payment_rate_manager, lbry_file_manager) + self.session = session + + def start(self, sd_hash): + self.loading_metadata_deferred = download_sd_blob(self.session, sd_hash, + self.payment_rate_manager) + self.loading_metadata_deferred.addCallback(self.sd_identifier.get_metadata_for_sd_blob) + AutoAddStream.start(self) + + def _handle_load_failed(self, err): + self.loading_failed = True + if err.check(InvalidBlobHashError): + self.finished_deferred.callback(None) + return + if err.check(InsufficientFundsError): + self.finished_deferred.callback(None) + return + return AutoAddStream._handle_load_failed(self, err) + + +class AutoAddStreamFromLBRYcrdName(AutoAddStreamFromHash): + def __init__(self, sd_identifier, session, wallet, lbry_file_manager): + AutoAddStreamFromHash.__init__(self, sd_identifier, session, lbry_file_manager) + self.wallet = wallet + self.resolved_name = None + self.description = None + self.key_fee = None + self.key_fee_address = None + self.name = None + + def start(self, name): + self.name = name + self.loading_metadata_deferred = self._resolve_name(name) + self.loading_metadata_deferred.addCallback(lambda stream_hash: download_sd_blob(self.session, + stream_hash, + self.payment_rate_manager)) + self.loading_metadata_deferred.addCallback(self.sd_identifier.get_metadata_for_sd_blob) + log.error("Starting AutoAddStream") + AutoAddStream.start(self) + + def _resolve_name(self, name): + def get_name_from_info(stream_info): + if 'stream_hash' not in stream_info: + raise InvalidStreamInfoError(name) + self.resolved_name = stream_info.get('name', None) + self.description = stream_info.get('description', None) + try: + if 'key_fee' in stream_info: + self.key_fee = float(stream_info['key_fee']) + except ValueError: + self.key_fee = None + self.key_fee_address = stream_info.get('key_fee_address', None) + return stream_info['stream_hash'] + d = self.wallet.get_stream_info_for_name(name) + d.addCallback(get_name_from_info) + return d + + def _handle_load_failed(self, err): + self.loading_failed = True + if err.check(UnknownNameError): + if is_valid_blobhash(self.name): + self.loading_failed = False + self.loading_metadata_deferred = None + AutoAddStreamFromHash.start(self, self.name) + return + else: + self.finished_deferred.callback(None) + return + elif err.check(InvalidBlobHashError): + self.finished_deferred.callback(None) + return + return AutoAddStreamFromHash._handle_load_failed(self, err) + + def _start_download(self): + d = self._pay_key_fee() + d.addCallback(lambda _: AutoAddStream._start_download(self)) + return d + + def _pay_key_fee(self): + if self.key_fee is not None and self.key_fee_address is not None: + reserved_points = self.wallet.reserve_points(self.key_fee_address, self.key_fee) + if reserved_points is None: + return defer.fail(InsufficientFundsError()) + return self.wallet.send_points_to_address(reserved_points, self.key_fee) + return defer.succeed(True) + + +#TODO see problem on line 2408 +class AutoFetcher(CommandHandler): + def __init__(self, console, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier): + CommandHandler.__init__(self, console) + self.sd_identifier = sd_identifier + self.wallet = wallet + self.session = session + self.lbry_file_manager = lbry_file_manager + self.lbry_metadata_manager = lbry_file_metadata_manager + self.seen = [] + + settings = self.session.wallet.get_rpc_conf() + rpc_user = settings["username"] + rpc_pass = settings["password"] + rpc_port = settings["rpc_port"] + rpc_url = "127.0.0.1" + rpc_conn_string = "http://%s:%s@%s:%s" % (rpc_user, rpc_pass, rpc_url, str(rpc_port)) + self.rpc_conn = AuthServiceProxy(rpc_conn_string) + + def start(self): + self._listen_for_names() + + def _listen_for_names(self): + l = 0 + firstrun = True + while True: + c = self.rpc_conn.getblockchaininfo() + if l != c: + block = self.rpc_conn.getblock(c['bestblockhash']) + txids = block['tx'] + transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids] + claimflag = False + claims = None + nmsg = [] + tmsg = [] + for t in transactions: + claims = self.rpc_conn.getclaimsfortx(t['txid']) + if firstrun: + claims = [{'name': 'wonderfullife'}] + if claims: + for claim in claims: + if claim not in self.seen: + print claim + print '* Trying to get claim', claim['name'] + get_name = AutoAddStreamFromLBRYcrdName(self.sd_identifier, self.session, + self.wallet, self.lbry_file_manager) + get_name.start(claim['name']) + nmsg.append(claim['name']) + self.seen.append(claim) + claimflag = True + else: + tmsg.append("Transaction: " + str(t['txid'])) + if claimflag: + print '! New name claims' + for c in nmsg: + print ' ' + str(c) + else: + print 'No name claims in this block' + for t in tmsg: + print ' ' + str(t) + + l = c + firstrun = False + sleep(1) + + +class AutoFetcherFactory(CommandHandlerFactory): + control_handler_class = AutoFetcher + command = "start-autofetching" + short_help = "Download all lbry files as they are published" \ No newline at end of file diff --git a/lbrynet/lbrynet_console/LBRYConsole.py b/lbrynet/lbrynet_console/LBRYConsole.py index 60dac2679..4f0b35074 100644 --- a/lbrynet/lbrynet_console/LBRYConsole.py +++ b/lbrynet/lbrynet_console/LBRYConsole.py @@ -21,6 +21,7 @@ from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType from lbrynet.lbryfile.LBRYFileMetadataManager import DBLBRYFileMetadataManager, TempLBRYFileMetadataManager #from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager from lbrynet.lbrynet_console.ControlHandlers import ApplicationStatusFactory, GetWalletBalancesFactory, ShutDownFactory +from lbrynet.lbrynet_console.ControlHandlers import AutoFetcherFactory, ImmediateAnnounceAllBlobsFactory from lbrynet.lbrynet_console.ControlHandlers import LBRYFileStatusFactory, DeleteLBRYFileChooserFactory from lbrynet.lbrynet_console.ControlHandlers import ToggleLBRYFileRunningChooserFactory from lbrynet.lbrynet_console.ControlHandlers import ModifyApplicationDefaultsFactory @@ -311,7 +312,10 @@ class LBRYConsole(): ModifyLBRYFileOptionsChooserFactory(self.lbry_file_manager), AddStreamFromHashFactory(self.sd_identifier, self.session), StatusFactory(self, self.session.rate_limiter, self.lbry_file_manager, - self.session.blob_manager, self.session.wallet if self.wallet_type == 'lbrycrd' else None) + self.session.blob_manager, self.session.wallet if self.wallet_type == 'lbrycrd' else None), + AutoFetcherFactory(self.session, self.lbry_file_manager, self.lbry_file_metadata_manager, + self.session.wallet, self.sd_identifier), + ImmediateAnnounceAllBlobsFactory(self.session.blob_manager) ] self.add_control_handlers(handlers) if self.wallet_type == 'lbrycrd': @@ -537,3 +541,6 @@ def launch_lbry_console(): reactor.addSystemEventTrigger('before', 'shutdown', console.shut_down) reactor.run() + +if __name__ == "__main__": + launch_lbry_console() \ No newline at end of file