import re from binascii import unhexlify from struct import pack from unicodedata import normalize from electrum.account import BIP32_Account from electrum.bitcoin import (bc_address_to_hash_160, xpub_from_pubkey, EncodeBase58Check) from electrum.i18n import _ from electrum.plugins import BasePlugin, hook from electrum.transaction import (deserialize, is_extended_pubkey, Transaction, x_to_xpub) from electrum.wallet import BIP32_HD_Wallet, BIP44_Wallet class TrezorCompatibleWallet(BIP44_Wallet): # Extend BIP44 Wallet as required by hardware implementation. # Derived classes must set: # - device # - wallet_type restore_wallet_class = BIP44_Wallet def __init__(self, storage): BIP44_Wallet.__init__(self, storage) self.proper_device = False def give_error(self, message): self.print_error(message) raise Exception(message) def get_action(self): pass def can_export(self): return False def is_watching_only(self): assert not self.has_seed() return not self.proper_device def can_change_password(self): return False def get_client(self): return self.plugin.get_client(self) def check_proper_device(self): return self.get_client().check_proper_device(self) def derive_xkeys(self, root, derivation, password): if self.master_public_keys.get(root): return BIP44_wallet.derive_xkeys(self, root, derivation, password) # Happens when creating a wallet derivation = derivation.replace(self.root_name, self.prefix() + "/") xpub = self.get_public_key(derivation) return xpub, None def get_public_key(self, bip32_path): client = self.get_client() address_n = client.expand_path(bip32_path) node = client.get_public_node(address_n).node xpub = ("0488B21E".decode('hex') + chr(node.depth) + self.i4b(node.fingerprint) + self.i4b(node.child_num) + node.chain_code + node.public_key) return EncodeBase58Check(xpub) def i4b(self, x): return pack('>I', x) def decrypt_message(self, pubkey, message, password): raise RuntimeError(_('Decrypt method is not implemented')) def sign_message(self, address, message, password): client = self.get_client() self.check_proper_device() try: address_path = self.address_id(address) address_n = client.expand_path(address_path) except Exception as e: self.give_error(e) try: msg_sig = client.sign_message('Bitcoin', address_n, message) except Exception as e: self.give_error(e) finally: self.plugin.get_handler(self).stop() return msg_sig.signature def sign_transaction(self, tx, password): if tx.is_complete() or self.is_watching_only(): return self.check_proper_device() # previous transactions used as inputs prev_tx = {} # path of the xpubs that are involved xpub_path = {} for txin in tx.inputs: tx_hash = txin['prevout_hash'] ptx = self.transactions.get(tx_hash) if ptx is None: ptx = self.network.synchronous_get(('blockchain.transaction.get', [tx_hash])) ptx = Transaction(ptx) prev_tx[tx_hash] = ptx for x_pubkey in txin['x_pubkeys']: if not is_extended_pubkey(x_pubkey): continue xpub = x_to_xpub(x_pubkey) for k, v in self.master_public_keys.items(): if v == xpub: acc_id = re.match("x/(\d+)'", k).group(1) xpub_path[xpub] = self.account_derivation(acc_id) self.plugin.sign_transaction(self, tx, prev_tx, xpub_path) class TrezorCompatiblePlugin(BasePlugin): # Derived classes provide: # # class-static variables: client_class, firmware_URL, handler_class, # libraries_available, libraries_URL, minimum_firmware, # wallet_class, ckd_public, types, HidTransport def __init__(self, parent, config, name): BasePlugin.__init__(self, parent, config, name) self.device = self.wallet_class.device self.client = None self.wallet_class.plugin = self def constructor(self, s): return self.wallet_class(s) def give_error(self, message): self.print_error(message) raise Exception(message) def is_enabled(self): return self.libraries_available def create_client(self): if not self.libraries_available: self.give_error(_('please install the %s libraries from %s') % (self.device, self.libraries_URL)) devices = self.HidTransport.enumerate() if not devices: self.give_error(_('Could not connect to your %s. Verify the ' 'cable is connected and that no other app is ' 'using it.\nContinuing in watching-only mode.' % self.device)) transport = self.HidTransport(devices[0]) client = self.client_class(transport, self) if not client.atleast_version(*self.minimum_firmware): self.give_error(_('Outdated %s firmware. Please update the ' 'firmware from %s') % (self.device, self.firmware_URL)) return client def get_handler(self, wallet): return self.get_client(wallet).handler def get_client(self, wallet=None): if not self.client or self.client.bad: self.client = self.create_client() return self.client def atleast_version(self, major, minor=0, patch=0): return self.get_client().atleast_version(major, minor, patch) @staticmethod def normalize_passphrase(self, passphrase): return normalize('NFKD', unicode(passphrase or '')) def on_restore_wallet(self, wallet, wizard): assert isinstance(wallet, self.wallet_class) msg = _("Enter the seed for your %s wallet:" % self.device) seed = wizard.request_seed(msg, is_valid = self.is_valid_seed) # Restored wallets are not hardware wallets wallet_class = self.wallet_class.restore_wallet_class wallet.storage.put('wallet_type', wallet_class.wallet_type) wallet = wallet_class(wallet.storage) passphrase = wizard.request_passphrase(self.device, restore=True) password = wizard.request_password() wallet.add_seed(seed, password) wallet.add_cosigner_seed(seed, 'x/', password, passphrase) wallet.create_main_account(password) return wallet @hook def close_wallet(self, wallet): if self.client: self.print_error("clear session") self.client.clear_session() self.client.transport.close() self.client = None def sign_transaction(self, wallet, tx, prev_tx, xpub_path): self.prev_tx = prev_tx self.xpub_path = xpub_path client = self.get_client() inputs = self.tx_inputs(tx, True) outputs = self.tx_outputs(wallet, tx) try: signed_tx = client.sign_tx('Bitcoin', inputs, outputs)[1] except Exception as e: self.give_error(e) finally: self.get_handler(wallet).stop() raw = signed_tx.encode('hex') tx.update_signatures(raw) def show_address(self, wallet, address): wallet.check_proper_device() try: address_path = wallet.address_id(address) address_n = self.client_class.expand_path(address_path) except Exception as e: self.give_error(e) try: client.get_address('Bitcoin', address_n, True) except Exception as e: self.give_error(e) finally: self.get_handler(wallet).stop() def tx_inputs(self, tx, for_sig=False): inputs = [] for txin in tx.inputs: txinputtype = self.types.TxInputType() if txin.get('is_coinbase'): prev_hash = "\0"*32 prev_index = 0xffffffff # signed int -1 else: if for_sig: x_pubkeys = txin['x_pubkeys'] if len(x_pubkeys) == 1: x_pubkey = x_pubkeys[0] xpub, s = BIP32_Account.parse_xpubkey(x_pubkey) xpub_n = self.client_class.expand_path(self.xpub_path[xpub]) txinputtype.address_n.extend(xpub_n + s) else: def f(x_pubkey): if is_extended_pubkey(x_pubkey): xpub, s = BIP32_Account.parse_xpubkey(x_pubkey) else: xpub = xpub_from_pubkey(x_pubkey.decode('hex')) s = [] node = ckd_public.deserialize(xpub) return self.types.HDNodePathType(node=node, address_n=s) pubkeys = map(f, x_pubkeys) multisig = self.types.MultisigRedeemScriptType( pubkeys=pubkeys, signatures=map(lambda x: x.decode('hex') if x else '', txin.get('signatures')), m=txin.get('num_sig'), ) txinputtype = self.types.TxInputType( script_type=self.types.SPENDMULTISIG, multisig=multisig ) # find which key is mine for x_pubkey in x_pubkeys: if is_extended_pubkey(x_pubkey): xpub, s = BIP32_Account.parse_xpubkey(x_pubkey) if xpub in self.xpub_path: xpub_n = self.client_class.expand_path(self.xpub_path[xpub]) txinputtype.address_n.extend(xpub_n + s) break prev_hash = unhexlify(txin['prevout_hash']) prev_index = txin['prevout_n'] txinputtype.prev_hash = prev_hash txinputtype.prev_index = prev_index if 'scriptSig' in txin: script_sig = txin['scriptSig'].decode('hex') txinputtype.script_sig = script_sig if 'sequence' in txin: sequence = txin['sequence'] txinputtype.sequence = sequence inputs.append(txinputtype) return inputs def tx_outputs(self, wallet, tx): outputs = [] for type, address, amount in tx.outputs: assert type == 'address' txoutputtype = self.types.TxOutputType() if wallet.is_change(address): address_path = wallet.address_id(address) address_n = self.client_class.expand_path(address_path) txoutputtype.address_n.extend(address_n) else: txoutputtype.address = address txoutputtype.amount = amount addrtype, hash_160 = bc_address_to_hash_160(address) if addrtype == 0: txoutputtype.script_type = self.types.PAYTOADDRESS elif addrtype == 5: txoutputtype.script_type = self.types.PAYTOSCRIPTHASH else: raise BaseException('addrtype') outputs.append(txoutputtype) return outputs def electrum_tx_to_txtype(self, tx): t = self.types.TransactionType() d = deserialize(tx.raw) t.version = d['version'] t.lock_time = d['lockTime'] inputs = self.tx_inputs(tx) t.inputs.extend(inputs) for vout in d['outputs']: o = t.bin_outputs.add() o.amount = vout['value'] o.script_pubkey = vout['scriptPubKey'].decode('hex') return t # This function is called from the trezor libraries (via tx_api) def get_tx(self, tx_hash): tx = self.prev_tx[tx_hash] tx.deserialize() return self.electrum_tx_to_txtype(tx) @staticmethod def is_valid_seed(seed): return True