diff --git a/gui/kivy/main_window.py b/gui/kivy/main_window.py index 2dc623022..919203c06 100644 --- a/gui/kivy/main_window.py +++ b/gui/kivy/main_window.py @@ -809,7 +809,7 @@ class ElectrumWindow(App): Clock.schedule_once(lambda dt: on_success(tx)) def _broadcast_thread(self, tx, on_complete): - ok, txid = self.network.broadcast(tx) + ok, txid = self.network.broadcast_transaction(tx) Clock.schedule_once(lambda dt: on_complete(ok, txid)) def broadcast(self, tx, pr=None): diff --git a/gui/qt/main_window.py b/gui/qt/main_window.py index 1397f0908..6d5786d8e 100644 --- a/gui/qt/main_window.py +++ b/gui/qt/main_window.py @@ -1600,7 +1600,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): if pr and pr.has_expired(): self.payment_request = None return False, _("Payment request has expired") - status, msg = self.network.broadcast(tx) + status, msg = self.network.broadcast_transaction(tx) if pr and status is True: self.invoices.set_paid(pr, tx.txid()) self.invoices.save() @@ -2359,7 +2359,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError): if ok and txid: txid = str(txid).strip() try: - r = self.network.synchronous_get(('blockchain.transaction.get',[txid])) + r = self.network.get_transaction(txid) except BaseException as e: self.show_message(str(e)) return diff --git a/gui/stdio.py b/gui/stdio.py index 4b0daa156..a4756eae6 100644 --- a/gui/stdio.py +++ b/gui/stdio.py @@ -198,7 +198,7 @@ class ElectrumGui: self.wallet.labels[tx.txid()] = self.str_description print(_("Please wait...")) - status, msg = self.network.broadcast(tx) + status, msg = self.network.broadcast_transaction(tx) if status: print(_('Payment sent.')) diff --git a/gui/text.py b/gui/text.py index 9ecf91aff..b616f8734 100644 --- a/gui/text.py +++ b/gui/text.py @@ -349,7 +349,7 @@ class ElectrumGui: self.wallet.labels[tx.txid()] = self.str_description self.show_message(_("Please wait..."), getchar=False) - status, msg = self.network.broadcast(tx) + status, msg = self.network.broadcast_transaction(tx) if status: self.show_message(_('Payment sent.')) diff --git a/lib/commands.py b/lib/commands.py index af2dd5742..d43292a15 100644 --- a/lib/commands.py +++ b/lib/commands.py @@ -181,7 +181,7 @@ class Commands: walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - return self.network.synchronous_get(('blockchain.scripthash.get_history', [sh])) + return self.network.get_history_for_scripthash(sh) @command('w') def listunspent(self): @@ -199,7 +199,7 @@ class Commands: is a walletless server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - return self.network.synchronous_get(('blockchain.scripthash.listunspent', [sh])) + return self.network.listunspent_for_scripthash(sh) @command('') def serialize(self, jsontx): @@ -252,10 +252,10 @@ class Commands: return tx.deserialize() @command('n') - def broadcast(self, tx, timeout=30): + def broadcast(self, tx): """Broadcast a transaction to the network. """ tx = Transaction(tx) - return self.network.broadcast(tx, timeout) + return self.network.broadcast_transaction(tx) @command('') def createmultisig(self, num, pubkeys): @@ -322,7 +322,7 @@ class Commands: server query, results are not checked by SPV. """ sh = bitcoin.address_to_scripthash(address) - out = self.network.synchronous_get(('blockchain.scripthash.get_balance', [sh])) + out = self.network.get_balance_for_scripthash(sh) out["confirmed"] = str(Decimal(out["confirmed"])/COIN) out["unconfirmed"] = str(Decimal(out["unconfirmed"])/COIN) return out @@ -331,7 +331,7 @@ class Commands: def getmerkle(self, txid, height): """Get Merkle branch of a transaction included in a block. Electrum uses this to verify transactions (Simple Payment Verification).""" - return self.network.synchronous_get(('blockchain.transaction.get_merkle', [txid, int(height)])) + return self.network.get_merkle_for_transaction(txid, int(height)) @command('n') def getservers(self): @@ -517,7 +517,7 @@ class Commands: if self.wallet and txid in self.wallet.transactions: tx = self.wallet.transactions[txid] else: - raw = self.network.synchronous_get(('blockchain.transaction.get', [txid])) + raw = self.network.get_transaction(txid) if raw: tx = Transaction(raw) else: diff --git a/lib/network.py b/lib/network.py index 8b64bd7a4..d6349711f 100644 --- a/lib/network.py +++ b/lib/network.py @@ -620,26 +620,6 @@ class Network(util.DaemonThread): # Response is now in canonical form self.process_response(interface, response, callbacks) - def map_scripthash_to_address(self, callback): - def cb2(x): - x2 = x.copy() - p = x2.pop('params') - addr = self.h2addr[p[0]] - x2['params'] = [addr] - callback(x2) - return cb2 - - def subscribe_to_addresses(self, addresses, callback): - hash2address = {bitcoin.address_to_scripthash(address): address for address in addresses} - self.h2addr.update(hash2address) - msgs = [('blockchain.scripthash.subscribe', [x]) for x in hash2address.keys()] - self.send(msgs, self.map_scripthash_to_address(callback)) - - def request_address_history(self, address, callback): - h = bitcoin.address_to_scripthash(address) - self.h2addr.update({h: address}) - self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback)) - def send(self, messages, callback): '''Messages is a list of (method, params) tuples''' messages = list(messages) @@ -668,6 +648,7 @@ class Network(util.DaemonThread): self.subscriptions[k] = l # check cached response for subscriptions r = self.sub_cache.get(k) + if r is not None: self.print_error("cache hit", k) callback(r) @@ -802,12 +783,6 @@ class Network(util.DaemonThread): blockchain.catch_up = None self.notify('updated') - def request_header(self, interface, height): - #interface.print_error("requesting header %d" % height) - self.queue_request('blockchain.block.get_header', [height], interface) - interface.request = height - interface.req_time = time.time() - def on_get_header(self, interface, response): '''Handle receiving a single block header''' header = response.get('result') @@ -1062,27 +1037,134 @@ class Network(util.DaemonThread): def get_local_height(self): return self.blockchain().height() - def synchronous_get(self, request, timeout=30): + @staticmethod + def __wait_for(it): + """Wait for the result of calling lambda `it`.""" q = queue.Queue() - self.send([request], q.put) + it(q.put) try: - r = q.get(True, timeout) + result = q.get(block=True, timeout=30) except queue.Empty: raise util.TimeoutException(_('Server did not answer')) - if r.get('error'): - raise Exception(r.get('error')) - return r.get('result') - def broadcast(self, tx, timeout=30): - tx_hash = tx.txid() + if result.get('error'): + raise Exception(result.get('error')) + + return result.get('result') + + @staticmethod + def __with_default_synchronous_callback(invocation, callback): + """ Use this method if you want to make the network request + synchronous. """ + if not callback: + return Network.__wait_for(invocation) + + invocation(callback) + + def request_header(self, interface, height): + self.queue_request('blockchain.block.get_header', [height], interface) + interface.request = height + interface.req_time = time.time() + + def map_scripthash_to_address(self, callback): + def cb2(x): + x2 = x.copy() + p = x2.pop('params') + addr = self.h2addr[p[0]] + x2['params'] = [addr] + callback(x2) + return cb2 + + def subscribe_to_addresses(self, addresses, callback): + hash2address = { + bitcoin.address_to_scripthash(address): address + for address in addresses} + self.h2addr.update(hash2address) + msgs = [ + ('blockchain.scripthash.subscribe', [x]) + for x in hash2address.keys()] + self.send(msgs, self.map_scripthash_to_address(callback)) + + def request_address_history(self, address, callback): + h = bitcoin.address_to_scripthash(address) + self.h2addr.update({h: address}) + self.send([('blockchain.scripthash.get_history', [h])], self.map_scripthash_to_address(callback)) + + # NOTE this method handles exceptions and a special edge case, counter to + # what the other ElectrumX methods do. This is unexpected. + def broadcast_transaction(self, transaction, callback=None): + command = 'blockchain.transaction.broadcast' + invocation = lambda c: self.send([(command, [str(transaction)])], c) + + if callback: + invocation(callback) + return + try: - out = self.synchronous_get(('blockchain.transaction.broadcast', [str(tx)]), timeout) + out = Network.__wait_for(invocation) except BaseException as e: return False, "error: " + str(e) - if out != tx_hash: + + if out != transaction.txid(): return False, "error: " + out + return True, out + def get_history_for_scripthash(self, hash, callback=None): + command = 'blockchain.scripthash.get_history' + invocation = lambda c: self.send([(command, [hash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def subscribe_to_headers(self, callback=None): + command = 'blockchain.headers.subscribe' + invocation = lambda c: self.send([(command, [True])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def subscribe_to_address(self, address, callback=None): + command = 'blockchain.address.subscribe' + invocation = lambda c: self.send([(command, [address])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def get_merkle_for_transaction(self, tx_hash, tx_height, callback=None): + command = 'blockchain.transaction.get_merkle' + invocation = lambda c: self.send([(command, [tx_hash, tx_height])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def subscribe_to_scripthash(self, scripthash, callback=None): + command = 'blockchain.scripthash.subscribe' + invocation = lambda c: self.send([(command, [scripthash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def get_transaction(self, transaction_hash, callback=None): + command = 'blockchain.transaction.get' + invocation = lambda c: self.send([(command, [transaction_hash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def get_transactions(self, transaction_hashes, callback=None): + command = 'blockchain.transaction.get' + messages = [(command, [tx_hash]) for tx_hash in transaction_hashes] + invocation = lambda c: self.send(messages, c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def listunspent_for_scripthash(self, scripthash, callback=None): + command = 'blockchain.scripthash.listunspent' + invocation = lambda c: self.send([(command, [scripthash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + + def get_balance_for_scripthash(self, scripthash, callback=None): + command = 'blockchain.scripthash.get_balance' + invocation = lambda c: self.send([(command, [scripthash])], c) + + return Network.__with_default_synchronous_callback(invocation, callback) + def export_checkpoints(self, path): # run manually from the console to generate checkpoints cp = self.blockchain().get_checkpoints() diff --git a/lib/synchronizer.py b/lib/synchronizer.py index 341aa0444..c865d272f 100644 --- a/lib/synchronizer.py +++ b/lib/synchronizer.py @@ -160,15 +160,16 @@ class Synchronizer(ThreadJob): def request_missing_txs(self, hist): # "hist" is a list of [tx_hash, tx_height] lists - requests = [] + transaction_hashes = [] for tx_hash, tx_height in hist: if tx_hash in self.requested_tx: continue if tx_hash in self.wallet.transactions: continue - requests.append(('blockchain.transaction.get', [tx_hash])) + transaction_hashes.append(tx_hash) self.requested_tx[tx_hash] = tx_height - self.network.send(requests, self.on_tx_response) + + self.network.get_transactions(transaction_hashes, self.on_tx_response) def initialize(self): '''Check the initial state of the wallet. Subscribe to all its diff --git a/lib/tests/test_transaction.py b/lib/tests/test_transaction.py index 6662c31f9..3f38b5629 100644 --- a/lib/tests/test_transaction.py +++ b/lib/tests/test_transaction.py @@ -773,5 +773,5 @@ class NetworkMock(object): def __init__(self, unspent): self.unspent = unspent - def synchronous_get(self, arg): + def synchronous_send(self, arg): return self.unspent diff --git a/lib/verifier.py b/lib/verifier.py index 06c6b0118..376e8aad6 100644 --- a/lib/verifier.py +++ b/lib/verifier.py @@ -54,9 +54,11 @@ class SPV(ThreadJob): else: if (tx_hash not in self.requested_merkle and tx_hash not in self.merkle_roots): - request = ('blockchain.transaction.get_merkle', - [tx_hash, tx_height]) - self.network.send([request], self.verify_merkle) + + self.network.get_merkle_for_transaction( + tx_hash, + tx_height, + self.verify_merkle) self.print_error('requested merkle', tx_hash) self.requested_merkle.add(tx_hash) diff --git a/lib/wallet.py b/lib/wallet.py index 37718768e..db2d21a0b 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -93,12 +93,13 @@ def dust_threshold(network): def append_utxos_to_inputs(inputs, network, pubkey, txin_type, imax): if txin_type != 'p2pk': address = bitcoin.pubkey_to_address(txin_type, pubkey) - sh = bitcoin.address_to_scripthash(address) + scripthash = bitcoin.address_to_scripthash(address) else: script = bitcoin.public_key_to_p2pk_script(pubkey) - sh = bitcoin.script_to_scripthash(script) + scripthash = bitcoin.script_to_scripthash(script) address = '(pubkey)' - u = network.synchronous_get(('blockchain.scripthash.listunspent', [sh])) + + u = network.listunspent_for_scripthash(scripthash) for item in u: if len(inputs) >= imax: break @@ -1471,9 +1472,8 @@ class Abstract_Wallet(PrintError): # all the input txs, in which case we ask the network. tx = self.transactions.get(tx_hash, None) if not tx and self.network: - request = ('blockchain.transaction.get', [tx_hash]) try: - tx = Transaction(self.network.synchronous_get(request)) + tx = Transaction(self.network.get_transaction(tx_hash)) except TimeoutException as e: self.print_error('getting input txn from network timed out for {}'.format(tx_hash)) if not ignore_timeout: diff --git a/lib/websockets.py b/lib/websockets.py index 5462dd161..ea68dfa5f 100644 --- a/lib/websockets.py +++ b/lib/websockets.py @@ -95,17 +95,18 @@ class WsClientThread(util.DaemonThread): continue util.print_error('response', r) method = r.get('method') - params = r.get('params') + scripthash = r.get('params')[0] result = r.get('result') if result is None: continue if method == 'blockchain.scripthash.subscribe': - self.network.send([('blockchain.scripthash.get_balance', params)], self.response_queue.put) + self.network.get_balance_for_scripthash( + scripthash, self.response_queue.put) elif method == 'blockchain.scripthash.get_balance': - h = params[0] - addr = self.network.h2addr.get(h, None) + addr = self.network.h2addr.get(scripthash, None) if addr is None: - util.print_error("can't find address for scripthash: %s" % h) + util.print_error( + "can't find address for scripthash: %s" % scripthash) l = self.subscriptions.get(addr, []) for ws, amount in l: if not ws.closed: diff --git a/scripts/block_headers b/scripts/block_headers index 14ae918a9..cda9e14b9 100755 --- a/scripts/block_headers +++ b/scripts/block_headers @@ -22,7 +22,7 @@ if not network.is_connected(): # 2. send the subscription callback = lambda response: print_msg(json_encode(response.get('result'))) network.send([('server.version',["block_headers script", "1.2"])], callback) -network.send([('blockchain.headers.subscribe',[True])], callback) +network.subscribe_to_headers(callback) # 3. wait for results while network.is_connected(): diff --git a/scripts/get_history b/scripts/get_history index b78fcc58a..3e25166da 100755 --- a/scripts/get_history +++ b/scripts/get_history @@ -14,5 +14,5 @@ except Exception: n = Network() n.start() _hash = bitcoin.address_to_scripthash(addr) -h = n.synchronous_get(('blockchain.scripthash.get_history',[_hash])) +h = n.get_history_for_scripthash(_hash) print_msg(json_encode(h)) diff --git a/scripts/watch_address b/scripts/watch_address index a8446fc81..9c60afba4 100755 --- a/scripts/watch_address +++ b/scripts/watch_address @@ -29,7 +29,7 @@ if not network.is_connected(): # 2. send the subscription callback = lambda response: print_msg(json_encode(response.get('result'))) -network.send([('blockchain.scripthash.subscribe',[sh])], callback) +network.subscribe_to_address(addr, callback) # 3. wait for results while network.is_connected():