From 6ae8f20db5e3ff1ef1cac552f3acd57cf377396b Mon Sep 17 00:00:00 2001 From: kodxana Date: Wed, 18 Nov 2020 20:48:35 +0100 Subject: [PATCH] Update ledger.py --- electrum/plugins/ledger/ledger.py | 197 ++++++++++++++++++++++-------- 1 file changed, 144 insertions(+), 53 deletions(-) diff --git a/electrum/plugins/ledger/ledger.py b/electrum/plugins/ledger/ledger.py index da0e29640..a19b28375 100644 --- a/electrum/plugins/ledger/ledger.py +++ b/electrum/plugins/ledger/ledger.py @@ -2,7 +2,7 @@ from struct import pack, unpack import hashlib import sys import traceback -from typing import Optional +from typing import Optional, Tuple from electrum import ecc from electrum import bip32 @@ -16,9 +16,10 @@ from electrum.wallet import Standard_Wallet from electrum.util import bfh, bh2u, versiontuple, UserFacingException from electrum.base_wizard import ScriptTypeNotSupported from electrum.logging import get_logger +from electrum.plugin import runs_in_hwd_thread, Device from ..hw_wallet import HW_PluginBase, HardwareClientBase -from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output +from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output, LibraryFoundButUnusable _logger = get_logger(__name__) @@ -44,6 +45,7 @@ MSG_NEEDS_FW_UPDATE_SEGWIT = _('Firmware version (or "Bitcoin" app) too old for MULTI_OUTPUT_SUPPORT = '1.1.4' SEGWIT_SUPPORT = '1.1.10' SEGWIT_SUPPORT_SPECIAL = '1.0.4' +SEGWIT_TRUSTEDINPUTS = '1.4.0' def test_pin_unlocked(func): @@ -62,29 +64,40 @@ def test_pin_unlocked(func): class Ledger_Client(HardwareClientBase): - def __init__(self, hidDevice, *, is_hw1: bool = False): + def __init__(self, hidDevice, *, product_key: Tuple[int, int], + plugin: HW_PluginBase): + HardwareClientBase.__init__(self, plugin=plugin) self.dongleObject = btchip(hidDevice) self.preflightDone = False - self._is_hw1 = is_hw1 + self._product_key = product_key + self._soft_device_id = None def is_pairable(self): return True + @runs_in_hwd_thread def close(self): self.dongleObject.dongle.close() - def timeout(self, cutoff): - pass - def is_initialized(self): return True - def label(self): - return "" + @runs_in_hwd_thread + def get_soft_device_id(self): + if self._soft_device_id is None: + # modern ledger can provide xpub without user interaction + # (hw1 would prompt for PIN) + if not self.is_hw1(): + self._soft_device_id = self.request_root_fingerprint_from_device() + return self._soft_device_id def is_hw1(self) -> bool: - return self._is_hw1 + return self._product_key[0] == 0x2581 + def device_model_name(self): + return LedgerPlugin.device_name_from_product_key(self._product_key) + + @runs_in_hwd_thread def has_usable_connection_with_device(self): try: self.dongleObject.getFirmwareVersion() @@ -92,6 +105,7 @@ class Ledger_Client(HardwareClientBase): return False return True + @runs_in_hwd_thread @test_pin_unlocked def get_xpub(self, bip32_path, xtype): self.checkDevice() @@ -156,6 +170,10 @@ class Ledger_Client(HardwareClientBase): def supports_native_segwit(self): return self.nativeSegwitSupported + def supports_segwit_trustedInputs(self): + return self.segwitTrustedInputs + + @runs_in_hwd_thread def perform_hw1_preflight(self): try: firmwareInfo = self.dongleObject.getFirmwareVersion() @@ -163,20 +181,22 @@ class Ledger_Client(HardwareClientBase): self.multiOutputSupported = versiontuple(firmware) >= versiontuple(MULTI_OUTPUT_SUPPORT) self.nativeSegwitSupported = versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT) self.segwitSupported = self.nativeSegwitSupported or (firmwareInfo['specialVersion'] == 0x20 and versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT_SPECIAL)) + self.segwitTrustedInputs = versiontuple(firmware) >= versiontuple(SEGWIT_TRUSTEDINPUTS) if not checkFirmware(firmwareInfo): - self.dongleObject.dongle.close() + self.close() raise UserFacingException(MSG_NEEDS_FW_UPDATE_GENERIC) try: self.dongleObject.getOperationMode() except BTChipException as e: if (e.sw == 0x6985): - self.dongleObject.dongle.close() + self.close() self.handler.get_setup( ) # Acquire the new client on the next run else: raise e - if self.has_detached_pin_support(self.dongleObject) and not self.is_pin_validated(self.dongleObject) and (self.handler is not None): + if self.has_detached_pin_support(self.dongleObject) and not self.is_pin_validated(self.dongleObject): + assert self.handler, "no handler for client" remaining_attempts = self.dongleObject.getVerifyPinRemainingAttempts() if remaining_attempts != 1: msg = "Enter your Ledger PIN - remaining attempts : " + str(remaining_attempts) @@ -198,6 +218,7 @@ class Ledger_Client(HardwareClientBase): "Please make sure that 'Browser support' is disabled on your device.") raise e + @runs_in_hwd_thread def checkDevice(self): if not self.preflightDone: try: @@ -264,24 +285,27 @@ class Ledger_KeyStore(Hardware_KeyStore): def decrypt_message(self, pubkey, message, password): raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device)) + @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing def sign_message(self, sequence, message, password): message = message.encode('utf8') message_hash = hashlib.sha256(message).hexdigest().upper() # prompt for the PIN before displaying the dialog if necessary - client = self.get_client() + client_ledger = self.get_client() + client_electrum = self.get_client_electrum() address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence self.handler.show_message("Signing message ...\r\nMessage hash: "+message_hash) try: - info = self.get_client().signMessagePrepare(address_path, message) + info = client_ledger.signMessagePrepare(address_path, message) pin = "" if info['confirmationNeeded']: - pin = self.handler.get_auth( info ) # does the authenticate dialog and returns pin + # do the authenticate dialog and get pin: + pin = self.handler.get_auth(info, client=client_electrum) if not pin: raise UserWarning(_('Cancelled by user')) pin = str(pin).encode() - signature = self.get_client().signMessageSign(pin) + signature = client_ledger.signMessageSign(pin) except BTChipException as e: if e.sw == 0x6a80: self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. Only alphanumerical messages shorter than 140 characters are supported. Please remove any extra characters (tab, carriage return) and retry.") @@ -310,6 +334,7 @@ class Ledger_KeyStore(Hardware_KeyStore): # And convert it return bytes([27 + 4 + (signature[0] & 0x01)]) + r + s + @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing def sign_transaction(self, tx, password): @@ -324,7 +349,7 @@ class Ledger_KeyStore(Hardware_KeyStore): p2shTransaction = False segwitTransaction = False pin = "" - self.get_client() # prompt for the PIN before displaying the dialog if necessary + client_ledger = self.get_client() # prompt for the PIN before displaying the dialog if necessary client_electrum = self.get_client_electrum() assert client_electrum @@ -353,7 +378,7 @@ class Ledger_KeyStore(Hardware_KeyStore): redeemScript = Transaction.get_preimage_script(txin) txin_prev_tx = txin.utxo - if txin_prev_tx is None and not Transaction.is_segwit_input(txin): + if txin_prev_tx is None and not txin.is_segwit(): raise UserFacingException(_('Missing previous tx for legacy input.')) txin_prev_tx_raw = txin_prev_tx.serialize() if txin_prev_tx else None inputs.append([txin_prev_tx_raw, @@ -415,18 +440,23 @@ class Ledger_KeyStore(Hardware_KeyStore): # Get trusted inputs from the original transactions for utxo in inputs: sequence = int_to_hex(utxo[5], 4) - if segwitTransaction: + if segwitTransaction and not client_electrum.supports_segwit_trustedInputs(): tmp = bfh(utxo[3])[::-1] tmp += bfh(int_to_hex(utxo[1], 4)) tmp += bfh(int_to_hex(utxo[6], 8)) # txin['value'] chipInputs.append({'value' : tmp, 'witness' : True, 'sequence' : sequence}) redeemScripts.append(bfh(utxo[2])) - elif not p2shTransaction: + elif (not p2shTransaction) or client_electrum.supports_multi_output(): txtmp = bitcoinTransaction(bfh(utxo[0])) - trustedInput = self.get_client().getTrustedInput(txtmp, utxo[1]) + trustedInput = client_ledger.getTrustedInput(txtmp, utxo[1]) trustedInput['sequence'] = sequence + if segwitTransaction: + trustedInput['witness'] = True chipInputs.append(trustedInput) - redeemScripts.append(txtmp.outputs[utxo[1]].script) + if p2shTransaction or segwitTransaction: + redeemScripts.append(bfh(utxo[2])) + else: + redeemScripts.append(txtmp.outputs[utxo[1]].script) else: tmp = bfh(utxo[3])[::-1] tmp += bfh(int_to_hex(utxo[1], 4)) @@ -437,26 +467,27 @@ class Ledger_KeyStore(Hardware_KeyStore): firstTransaction = True inputIndex = 0 rawTx = tx.serialize_to_network() - self.get_client().enableAlternate2fa(False) + client_ledger.enableAlternate2fa(False) if segwitTransaction: - self.get_client().startUntrustedTransaction(True, inputIndex, + client_ledger.startUntrustedTransaction(True, inputIndex, chipInputs, redeemScripts[inputIndex], version=tx.version) # we don't set meaningful outputAddress, amount and fees # as we only care about the alternateEncoding==True branch - outputData = self.get_client().finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) + outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) outputData['outputData'] = txOutput if outputData['confirmationNeeded']: outputData['address'] = output self.handler.finished() - pin = self.handler.get_auth( outputData ) # does the authenticate dialog and returns pin + # do the authenticate dialog and get pin: + pin = self.handler.get_auth(outputData, client=client_electrum) if not pin: raise UserWarning() self.handler.show_message(_("Confirmed. Signing Transaction...")) while inputIndex < len(inputs): singleInput = [ chipInputs[inputIndex] ] - self.get_client().startUntrustedTransaction(False, 0, + client_ledger.startUntrustedTransaction(False, 0, singleInput, redeemScripts[inputIndex], version=tx.version) - inputSignature = self.get_client().untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) + inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) inputSignature[0] = 0x30 # force for 1.4.9+ my_pubkey = inputs[inputIndex][4] tx.add_signature_to_txin(txin_idx=inputIndex, @@ -465,22 +496,23 @@ class Ledger_KeyStore(Hardware_KeyStore): inputIndex = inputIndex + 1 else: while inputIndex < len(inputs): - self.get_client().startUntrustedTransaction(firstTransaction, inputIndex, + client_ledger.startUntrustedTransaction(firstTransaction, inputIndex, chipInputs, redeemScripts[inputIndex], version=tx.version) # we don't set meaningful outputAddress, amount and fees # as we only care about the alternateEncoding==True branch - outputData = self.get_client().finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) + outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) outputData['outputData'] = txOutput if outputData['confirmationNeeded']: outputData['address'] = output self.handler.finished() - pin = self.handler.get_auth( outputData ) # does the authenticate dialog and returns pin + # do the authenticate dialog and get pin: + pin = self.handler.get_auth(outputData, client=client_electrum) if not pin: raise UserWarning() self.handler.show_message(_("Confirmed. Signing Transaction...")) else: # Sign input with the provided PIN - inputSignature = self.get_client().untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) + inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) inputSignature[0] = 0x30 # force for 1.4.9+ my_pubkey = inputs[inputIndex][4] tx.add_signature_to_txin(txin_idx=inputIndex, @@ -505,6 +537,7 @@ class Ledger_KeyStore(Hardware_KeyStore): finally: self.handler.finished() + @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing def show_address(self, sequence, txin_type): @@ -535,8 +568,8 @@ class Ledger_KeyStore(Hardware_KeyStore): self.handler.finished() class LedgerPlugin(HW_PluginBase): - libraries_available = BTCHIP keystore_class = Ledger_KeyStore + minimum_library = (0, 1, 30) client = None DEVICE_IDS = [ (0x2581, 0x1807), # HW.1 legacy btchip @@ -553,14 +586,75 @@ class LedgerPlugin(HW_PluginBase): (0x2c97, 0x0009), # RFU (0x2c97, 0x000a) # RFU ] + VENDOR_IDS = (0x2c97, ) + LEDGER_MODEL_IDS = { + 0x10: "Ledger Nano S", + 0x40: "Ledger Nano X", + } SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') def __init__(self, parent, config, name): self.segwit = config.get("segwit") HW_PluginBase.__init__(self, parent, config, name) - if self.libraries_available: - self.device_manager().register_devices(self.DEVICE_IDS) + self.libraries_available = self.check_libraries_available() + if not self.libraries_available: + return + # to support legacy devices and legacy firmwares + self.device_manager().register_devices(self.DEVICE_IDS, plugin=self) + # to support modern firmware + self.device_manager().register_vendor_ids(self.VENDOR_IDS, plugin=self) + def get_library_version(self): + try: + import btchip + version = btchip.__version__ + except ImportError: + raise + except: + version = "unknown" + if BTCHIP: + return version + else: + raise LibraryFoundButUnusable(library_version=version) + + @classmethod + def _recognize_device(cls, product_key) -> Tuple[bool, Optional[str]]: + """Returns (can_recognize, model_name) tuple.""" + # legacy product_keys + if product_key in cls.DEVICE_IDS: + if product_key[0] == 0x2581: + return True, "Ledger HW.1" + if product_key == (0x2c97, 0x0000): + return True, "Ledger Blue" + if product_key == (0x2c97, 0x0001): + return True, "Ledger Nano S" + if product_key == (0x2c97, 0x0004): + return True, "Ledger Nano X" + return True, None + # modern product_keys + if product_key[0] == 0x2c97: + product_id = product_key[1] + model_id = product_id >> 8 + if model_id in cls.LEDGER_MODEL_IDS: + model_name = cls.LEDGER_MODEL_IDS[model_id] + return True, model_name + # give up + return False, None + + def can_recognize_device(self, device: Device) -> bool: + return self._recognize_device(device.product_key)[0] + + @classmethod + def device_name_from_product_key(cls, product_key) -> Optional[str]: + return cls._recognize_device(product_key)[1] + + def create_device_from_hid_enumeration(self, d, *, product_key): + device = super().create_device_from_hid_enumeration(d, product_key=product_key) + if not self.can_recognize_device(device): + return None + return device + + @runs_in_hwd_thread def get_btchip_device(self, device): ledger = False if device.product_key[0] == 0x2581 and device.product_key[1] == 0x3b7c: @@ -577,42 +671,38 @@ class LedgerPlugin(HW_PluginBase): dev.set_nonblocking(True) return HIDDongleHIDAPI(dev, ledger, BTCHIP_DEBUG) + @runs_in_hwd_thread def create_client(self, device, handler): if handler: self.handler = handler client = self.get_btchip_device(device) if client is not None: - is_hw1 = device.product_key[0] == 0x2581 - client = Ledger_Client(client, is_hw1=is_hw1) + client = Ledger_Client(client, product_key=device.product_key, plugin=self) return client def setup_device(self, device_info, wizard, purpose): - devmgr = self.device_manager() device_id = device_info.device.id_ - client = devmgr.client_by_id(device_id) - if client is None: - raise UserFacingException(_('Failed to create a client for this device.') + '\n' + - _('Make sure it is in the correct state.')) - client.handler = self.create_handler(wizard) - client.get_xpub("m/44'/0'", 'standard') # TODO replace by direct derivation once Nano S > 1.1 + client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) + wizard.run_task_without_blocking_gui( + task=lambda: client.get_xpub("m/0'", 'standard')) # TODO replace by direct derivation once Nano S > 1.1 + return client def get_xpub(self, device_id, derivation, xtype, wizard): if xtype not in self.SUPPORTED_XTYPES: raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) - devmgr = self.device_manager() - client = devmgr.client_by_id(device_id) - client.handler = self.create_handler(wizard) + client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) client.checkDevice() xpub = client.get_xpub(derivation, xtype) return xpub - def get_client(self, keystore, force_pair=True): + @runs_in_hwd_thread + def get_client(self, keystore, force_pair=True, *, + devices=None, allow_user_interaction=True): # All client interaction should not be in the main GUI thread - devmgr = self.device_manager() - handler = keystore.handler - with devmgr.hid_lock: - client = devmgr.client_for_keystore(self, handler, keystore, force_pair) + client = super().get_client(keystore, force_pair, + devices=devices, + allow_user_interaction=allow_user_interaction) # returns the client for a given keystore. can use xpub #if client: # client.used() @@ -620,6 +710,7 @@ class LedgerPlugin(HW_PluginBase): client.checkDevice() return client + @runs_in_hwd_thread def show_address(self, wallet, address, keystore=None): if keystore is None: keystore = wallet.get_keystore()