# # Coldcard Electrum plugin main code. # # from struct import pack, unpack import os, sys, time, io import traceback from electrum.bip32 import serialize_xpub, deserialize_xpub, InvalidMasterKeyVersionBytes from electrum.i18n import _ from electrum.plugin import Device from electrum.keystore import Hardware_KeyStore, xpubkey_to_pubkey, Xpub from electrum.transaction import Transaction from electrum.wallet import Standard_Wallet from electrum.crypto import hash_160 from electrum.util import print_error, bfh, bh2u, versiontuple from electrum.base_wizard import ScriptTypeNotSupported from ..hw_wallet import HW_PluginBase try: import hid from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError from ckcc.constants import (MAX_MSG_LEN, MAX_BLK_LEN, MSG_SIGNING_MAX_LENGTH, MAX_TXN_LEN, AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH) from ckcc.constants import ( PSBT_GLOBAL_UNSIGNED_TX, PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_WITNESS_UTXO, PSBT_IN_SIGHASH_TYPE, PSBT_IN_REDEEM_SCRIPT, PSBT_IN_WITNESS_SCRIPT, PSBT_IN_BIP32_DERIVATION, PSBT_OUT_BIP32_DERIVATION, PSBT_OUT_REDEEM_SCRIPT) from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID, CKCC_SIMULATOR_PATH requirements_ok = True class ElectrumColdcardDevice(ColdcardDevice): # avoid use of pycoin for MiTM message signature test def mitm_verify(self, sig, expect_xpub): # verify a signature (65 bytes) over the session key, using the master bip32 node # - customized to use specific EC library of Electrum. from electrum.ecc import ECPubkey xtype, depth, parent_fingerprint, child_number, chain_code, K_or_k \ = deserialize_xpub(expect_xpub) pubkey = ECPubkey(K_or_k) try: pubkey.verify_message_hash(sig[1:65], self.session_key) return True except: return False except ImportError: requirements_ok = False COINKITE_VID = 0xd13e CKCC_PID = 0xcc10 CKCC_SIMULATED_PID = CKCC_PID ^ 0x55aa def my_var_int(l): # Bitcoin serialization of integers... directly into binary! if l < 253: return pack("B", l) elif l < 0x10000: return pack("' % (self.dev.master_fingerprint, self.label()) def verify_connection(self, expected_xfp, expected_xpub): ex = (expected_xfp, expected_xpub) if self._expected_device == ex: # all is as expected return if ( (self._expected_device is not None) or (self.dev.master_fingerprint != expected_xfp) or (self.dev.master_xpub != expected_xpub)): # probably indicating programing error, not hacking raise RuntimeError("Expecting 0x%08x but that's not whats connected?!" % expected_xfp) # check signature over session key # - mitm might have lied about xfp and xpub up to here # - important that we use value capture at wallet creation time, not some value # we read over USB today self.dev.check_mitm(expected_xpub=expected_xpub) self._expected_device = ex print_error("[coldcard]", "Successfully verified against MiTM") def is_pairable(self): # can't do anything w/ devices that aren't setup (but not normally reachable) return bool(self.dev.master_xpub) def timeout(self, cutoff): # nothing to do? pass def close(self): # close the HID device (so can be reused) self.dev.close() self.dev = None def is_initialized(self): return bool(self.dev.master_xpub) def label(self): # 'label' of this Coldcard. Warning: gets saved into wallet file, which might # not be encrypted, so better for privacy if based on xpub/fingerprint rather than # USB serial number. if self.dev.is_simulator: lab = 'Coldcard Simulator 0x%08x' % self.dev.master_fingerprint elif not self.dev.master_fingerprint: # failback; not expected lab = 'Coldcard #' + self.dev.serial else: lab = 'Coldcard 0x%08x' % self.dev.master_fingerprint # Hack zone: during initial setup I need the xfp and master xpub but # very few objects are passed between the various steps of base_wizard. # Solution: return a string with some hidden metadata # - see # - needs to work w/ deepcopy class LabelStr(str): def __new__(cls, s, xfp=None, xpub=None): self = super().__new__(cls, str(s)) self.xfp = getattr(s, 'xfp', xfp) self.xpub = getattr(s, 'xpub', xpub) return self return LabelStr(lab, self.dev.master_fingerprint, self.dev.master_xpub) def has_usable_connection_with_device(self): # Do end-to-end ping test try: self.ping_check() return True except: return False def get_xpub(self, bip32_path, xtype): assert xtype in ColdcardPlugin.SUPPORTED_XTYPES print_error('[coldcard]', 'Derive xtype = %r' % xtype) xpub = self.dev.send_recv(CCProtocolPacker.get_xpub(bip32_path), timeout=5000) # TODO handle timeout? # change type of xpub to the requested type try: __, depth, fingerprint, child_number, c, cK = deserialize_xpub(xpub) except InvalidMasterKeyVersionBytes: raise Exception(_('Invalid xpub magic. Make sure your {} device is set to the correct chain.') .format(self.device)) from None if xtype != 'standard': xpub = serialize_xpub(xtype, c, cK, depth, fingerprint, child_number) return xpub def ping_check(self): # check connection is working assert self.dev.session_key, 'not encrypted?' req = b'1234 Electrum Plugin 4321' # free up to 59 bytes try: echo = self.dev.send_recv(CCProtocolPacker.ping(req)) assert echo == req except: raise RuntimeError("Communication trouble with Coldcard") def show_address(self, path, addr_fmt): # prompt user w/ addres, also returns it immediately. return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) def get_version(self): # gives list of strings return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n') def sign_message_start(self, path, msg): # this starts the UX experience. self.dev.send_recv(CCProtocolPacker.sign_message(msg, path), timeout=None) def sign_message_poll(self): # poll device... if user has approved, will get tuple: (addr, sig) else None return self.dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) def sign_transaction_start(self, raw_psbt, finalize=True): # Multiple steps to sign: # - upload binary # - start signing UX # - wait for coldcard to complete process, or have it refused. # - download resulting txn assert 20 <= len(raw_psbt) < MAX_TXN_LEN, 'PSBT is too big' dlen, chk = self.dev.upload_file(raw_psbt) resp = self.dev.send_recv(CCProtocolPacker.sign_transaction(dlen, chk, finalize=finalize), timeout=None) if resp != None: raise ValueError(resp) def sign_transaction_poll(self): # poll device... if user has approved, will get tuple: (legnth, checksum) else None return self.dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) def download_file(self, length, checksum, file_number=1): # get a file return self.dev.download_file(length, checksum, file_number=file_number) class Coldcard_KeyStore(Hardware_KeyStore): hw_type = 'coldcard' device = 'Coldcard' def __init__(self, d): Hardware_KeyStore.__init__(self, d) # Errors and other user interaction is done through the wallet's # handler. The handler is per-window and preserved across # device reconnects self.force_watching_only = False self.ux_busy = False # Seems like only the derivation path and resulting **derived** xpub is stored in # the wallet file... however, we need to know at least the fingerprint of the master # xpub to verify against MiTM, and also so we can put the right value into the subkey paths # of PSBT files that might be generated offline. # - save the fingerprint of the master xpub, as "xfp" # - it's a LE32 int, but hex more natural way to see it # - device reports these value during encryption setup process lab = d['label'] if hasattr(lab, 'xfp'): # initial setup self.ckcc_xfp = lab.xfp self.ckcc_xpub = lab.xpub else: # wallet load: fatal if missing, we need them! self.ckcc_xfp = d['ckcc_xfp'] self.ckcc_xpub = d['ckcc_xpub'] def dump(self): # our additions to the stored data about keystore -- only during creation? d = Hardware_KeyStore.dump(self) d['ckcc_xfp'] = self.ckcc_xfp d['ckcc_xpub'] = self.ckcc_xpub return d def get_derivation(self): return self.derivation def get_client(self): # called when user tries to do something like view address, sign somthing. # - not called during probing/setup rv = self.plugin.get_client(self) if rv: rv.verify_connection(self.ckcc_xfp, self.ckcc_xpub) return rv def give_error(self, message, clear_client=False): print_error(message) if not self.ux_busy: self.handler.show_error(message) else: self.ux_busy = False if clear_client: self.client = None raise Exception(message) def wrap_busy(func): # decorator: function takes over the UX on the device. def wrapper(self, *args, **kwargs): try: self.ux_busy = True return func(self, *args, **kwargs) finally: self.ux_busy = False return wrapper def decrypt_message(self, pubkey, message, password): raise RuntimeError(_('Encryption and decryption are currently not supported for {}').format(self.device)) @wrap_busy def sign_message(self, sequence, message, password): # Sign a message on device. Since we have big screen, of course we # have to show the message unabiguously there first! try: msg = message.encode('ascii', errors='strict') assert 1 <= len(msg) <= MSG_SIGNING_MAX_LENGTH except (UnicodeError, AssertionError): # there are other restrictions on message content, # but let the device enforce and report those self.handler.show_error('Only short (%d max) ASCII messages can be signed.' % MSG_SIGNING_MAX_LENGTH) return b'' client = self.get_client() path = self.get_derivation() + ("/%d/%d" % sequence) try: cl = self.get_client() try: self.handler.show_message("Signing message (using %s)..." % path) cl.sign_message_start(path, msg) while 1: # How to kill some time, without locking UI? time.sleep(0.250) resp = cl.sign_message_poll() if resp is not None: break finally: self.handler.finished() assert len(resp) == 2 addr, raw_sig = resp # already encoded in Bitcoin fashion, binary. assert 40 < len(raw_sig) <= 65 return raw_sig except (CCUserRefused, CCBusyError) as exc: self.handler.show_error(str(exc)) except CCProtoError as exc: traceback.print_exc(file=sys.stderr) self.handler.show_error('{}\n\n{}'.format( _('Error showing address') + ':', str(exc))) except Exception as e: self.give_error(e, True) # give empty bytes for error cases; it seems to clear the old signature box return b'' def build_psbt(self, tx: Transaction, wallet=None, xfp=None): # Render a PSBT file, for upload to Coldcard. # if xfp is None: # need fingerprint of MASTER xpub, not the derived key xfp = self.ckcc_xfp inputs = tx.inputs() if 'prev_tx' not in inputs[0]: # fetch info about inputs, if needed? # - needed during export PSBT flow, not normal online signing assert wallet, 'need wallet reference' wallet.add_hw_info(tx) # wallet.add_hw_info installs this attr assert tx.output_info is not None, 'need data about outputs' # Build map of pubkey needed as derivation from master, in PSBT binary format # 1) binary version of the common subpath for all keys # m/ => fingerprint LE32 # a/b/c => ints base_path = pack(' not multisig, must be bip32 if type(wallet) is not Standard_Wallet: keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device)) return sequence = wallet.get_address_index(address) txin_type = wallet.get_txin_type(address) keystore.show_address(sequence, txin_type) # EOF