mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-31 09:21:39 +00:00
Update ledger.py
This commit is contained in:
parent
b7ed21ba8f
commit
6ae8f20db5
1 changed files with 144 additions and 53 deletions
|
@ -2,7 +2,7 @@ from struct import pack, unpack
|
|||
import hashlib
|
||||
import sys
|
||||
import traceback
|
||||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from electrum import ecc
|
||||
from electrum import bip32
|
||||
|
@ -16,9 +16,10 @@ from electrum.wallet import Standard_Wallet
|
|||
from electrum.util import bfh, bh2u, versiontuple, UserFacingException
|
||||
from electrum.base_wizard import ScriptTypeNotSupported
|
||||
from electrum.logging import get_logger
|
||||
from electrum.plugin import runs_in_hwd_thread, Device
|
||||
|
||||
from ..hw_wallet import HW_PluginBase, HardwareClientBase
|
||||
from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output
|
||||
from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output, LibraryFoundButUnusable
|
||||
|
||||
|
||||
_logger = get_logger(__name__)
|
||||
|
@ -44,6 +45,7 @@ MSG_NEEDS_FW_UPDATE_SEGWIT = _('Firmware version (or "Bitcoin" app) too old for
|
|||
MULTI_OUTPUT_SUPPORT = '1.1.4'
|
||||
SEGWIT_SUPPORT = '1.1.10'
|
||||
SEGWIT_SUPPORT_SPECIAL = '1.0.4'
|
||||
SEGWIT_TRUSTEDINPUTS = '1.4.0'
|
||||
|
||||
|
||||
def test_pin_unlocked(func):
|
||||
|
@ -62,29 +64,40 @@ def test_pin_unlocked(func):
|
|||
|
||||
|
||||
class Ledger_Client(HardwareClientBase):
|
||||
def __init__(self, hidDevice, *, is_hw1: bool = False):
|
||||
def __init__(self, hidDevice, *, product_key: Tuple[int, int],
|
||||
plugin: HW_PluginBase):
|
||||
HardwareClientBase.__init__(self, plugin=plugin)
|
||||
self.dongleObject = btchip(hidDevice)
|
||||
self.preflightDone = False
|
||||
self._is_hw1 = is_hw1
|
||||
self._product_key = product_key
|
||||
self._soft_device_id = None
|
||||
|
||||
def is_pairable(self):
|
||||
return True
|
||||
|
||||
@runs_in_hwd_thread
|
||||
def close(self):
|
||||
self.dongleObject.dongle.close()
|
||||
|
||||
def timeout(self, cutoff):
|
||||
pass
|
||||
|
||||
def is_initialized(self):
|
||||
return True
|
||||
|
||||
def label(self):
|
||||
return ""
|
||||
@runs_in_hwd_thread
|
||||
def get_soft_device_id(self):
|
||||
if self._soft_device_id is None:
|
||||
# modern ledger can provide xpub without user interaction
|
||||
# (hw1 would prompt for PIN)
|
||||
if not self.is_hw1():
|
||||
self._soft_device_id = self.request_root_fingerprint_from_device()
|
||||
return self._soft_device_id
|
||||
|
||||
def is_hw1(self) -> bool:
|
||||
return self._is_hw1
|
||||
return self._product_key[0] == 0x2581
|
||||
|
||||
def device_model_name(self):
|
||||
return LedgerPlugin.device_name_from_product_key(self._product_key)
|
||||
|
||||
@runs_in_hwd_thread
|
||||
def has_usable_connection_with_device(self):
|
||||
try:
|
||||
self.dongleObject.getFirmwareVersion()
|
||||
|
@ -92,6 +105,7 @@ class Ledger_Client(HardwareClientBase):
|
|||
return False
|
||||
return True
|
||||
|
||||
@runs_in_hwd_thread
|
||||
@test_pin_unlocked
|
||||
def get_xpub(self, bip32_path, xtype):
|
||||
self.checkDevice()
|
||||
|
@ -156,6 +170,10 @@ class Ledger_Client(HardwareClientBase):
|
|||
def supports_native_segwit(self):
|
||||
return self.nativeSegwitSupported
|
||||
|
||||
def supports_segwit_trustedInputs(self):
|
||||
return self.segwitTrustedInputs
|
||||
|
||||
@runs_in_hwd_thread
|
||||
def perform_hw1_preflight(self):
|
||||
try:
|
||||
firmwareInfo = self.dongleObject.getFirmwareVersion()
|
||||
|
@ -163,20 +181,22 @@ class Ledger_Client(HardwareClientBase):
|
|||
self.multiOutputSupported = versiontuple(firmware) >= versiontuple(MULTI_OUTPUT_SUPPORT)
|
||||
self.nativeSegwitSupported = versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT)
|
||||
self.segwitSupported = self.nativeSegwitSupported or (firmwareInfo['specialVersion'] == 0x20 and versiontuple(firmware) >= versiontuple(SEGWIT_SUPPORT_SPECIAL))
|
||||
self.segwitTrustedInputs = versiontuple(firmware) >= versiontuple(SEGWIT_TRUSTEDINPUTS)
|
||||
|
||||
if not checkFirmware(firmwareInfo):
|
||||
self.dongleObject.dongle.close()
|
||||
self.close()
|
||||
raise UserFacingException(MSG_NEEDS_FW_UPDATE_GENERIC)
|
||||
try:
|
||||
self.dongleObject.getOperationMode()
|
||||
except BTChipException as e:
|
||||
if (e.sw == 0x6985):
|
||||
self.dongleObject.dongle.close()
|
||||
self.close()
|
||||
self.handler.get_setup( )
|
||||
# Acquire the new client on the next run
|
||||
else:
|
||||
raise e
|
||||
if self.has_detached_pin_support(self.dongleObject) and not self.is_pin_validated(self.dongleObject) and (self.handler is not None):
|
||||
if self.has_detached_pin_support(self.dongleObject) and not self.is_pin_validated(self.dongleObject):
|
||||
assert self.handler, "no handler for client"
|
||||
remaining_attempts = self.dongleObject.getVerifyPinRemainingAttempts()
|
||||
if remaining_attempts != 1:
|
||||
msg = "Enter your Ledger PIN - remaining attempts : " + str(remaining_attempts)
|
||||
|
@ -198,6 +218,7 @@ class Ledger_Client(HardwareClientBase):
|
|||
"Please make sure that 'Browser support' is disabled on your device.")
|
||||
raise e
|
||||
|
||||
@runs_in_hwd_thread
|
||||
def checkDevice(self):
|
||||
if not self.preflightDone:
|
||||
try:
|
||||
|
@ -264,24 +285,27 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
def decrypt_message(self, pubkey, message, password):
|
||||
raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device))
|
||||
|
||||
@runs_in_hwd_thread
|
||||
@test_pin_unlocked
|
||||
@set_and_unset_signing
|
||||
def sign_message(self, sequence, message, password):
|
||||
message = message.encode('utf8')
|
||||
message_hash = hashlib.sha256(message).hexdigest().upper()
|
||||
# prompt for the PIN before displaying the dialog if necessary
|
||||
client = self.get_client()
|
||||
client_ledger = self.get_client()
|
||||
client_electrum = self.get_client_electrum()
|
||||
address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence
|
||||
self.handler.show_message("Signing message ...\r\nMessage hash: "+message_hash)
|
||||
try:
|
||||
info = self.get_client().signMessagePrepare(address_path, message)
|
||||
info = client_ledger.signMessagePrepare(address_path, message)
|
||||
pin = ""
|
||||
if info['confirmationNeeded']:
|
||||
pin = self.handler.get_auth( info ) # does the authenticate dialog and returns pin
|
||||
# do the authenticate dialog and get pin:
|
||||
pin = self.handler.get_auth(info, client=client_electrum)
|
||||
if not pin:
|
||||
raise UserWarning(_('Cancelled by user'))
|
||||
pin = str(pin).encode()
|
||||
signature = self.get_client().signMessageSign(pin)
|
||||
signature = client_ledger.signMessageSign(pin)
|
||||
except BTChipException as e:
|
||||
if e.sw == 0x6a80:
|
||||
self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. Only alphanumerical messages shorter than 140 characters are supported. Please remove any extra characters (tab, carriage return) and retry.")
|
||||
|
@ -310,6 +334,7 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
# And convert it
|
||||
return bytes([27 + 4 + (signature[0] & 0x01)]) + r + s
|
||||
|
||||
@runs_in_hwd_thread
|
||||
@test_pin_unlocked
|
||||
@set_and_unset_signing
|
||||
def sign_transaction(self, tx, password):
|
||||
|
@ -324,7 +349,7 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
p2shTransaction = False
|
||||
segwitTransaction = False
|
||||
pin = ""
|
||||
self.get_client() # prompt for the PIN before displaying the dialog if necessary
|
||||
client_ledger = self.get_client() # prompt for the PIN before displaying the dialog if necessary
|
||||
client_electrum = self.get_client_electrum()
|
||||
assert client_electrum
|
||||
|
||||
|
@ -353,7 +378,7 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
|
||||
redeemScript = Transaction.get_preimage_script(txin)
|
||||
txin_prev_tx = txin.utxo
|
||||
if txin_prev_tx is None and not Transaction.is_segwit_input(txin):
|
||||
if txin_prev_tx is None and not txin.is_segwit():
|
||||
raise UserFacingException(_('Missing previous tx for legacy input.'))
|
||||
txin_prev_tx_raw = txin_prev_tx.serialize() if txin_prev_tx else None
|
||||
inputs.append([txin_prev_tx_raw,
|
||||
|
@ -415,18 +440,23 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
# Get trusted inputs from the original transactions
|
||||
for utxo in inputs:
|
||||
sequence = int_to_hex(utxo[5], 4)
|
||||
if segwitTransaction:
|
||||
if segwitTransaction and not client_electrum.supports_segwit_trustedInputs():
|
||||
tmp = bfh(utxo[3])[::-1]
|
||||
tmp += bfh(int_to_hex(utxo[1], 4))
|
||||
tmp += bfh(int_to_hex(utxo[6], 8)) # txin['value']
|
||||
chipInputs.append({'value' : tmp, 'witness' : True, 'sequence' : sequence})
|
||||
redeemScripts.append(bfh(utxo[2]))
|
||||
elif not p2shTransaction:
|
||||
elif (not p2shTransaction) or client_electrum.supports_multi_output():
|
||||
txtmp = bitcoinTransaction(bfh(utxo[0]))
|
||||
trustedInput = self.get_client().getTrustedInput(txtmp, utxo[1])
|
||||
trustedInput = client_ledger.getTrustedInput(txtmp, utxo[1])
|
||||
trustedInput['sequence'] = sequence
|
||||
if segwitTransaction:
|
||||
trustedInput['witness'] = True
|
||||
chipInputs.append(trustedInput)
|
||||
redeemScripts.append(txtmp.outputs[utxo[1]].script)
|
||||
if p2shTransaction or segwitTransaction:
|
||||
redeemScripts.append(bfh(utxo[2]))
|
||||
else:
|
||||
redeemScripts.append(txtmp.outputs[utxo[1]].script)
|
||||
else:
|
||||
tmp = bfh(utxo[3])[::-1]
|
||||
tmp += bfh(int_to_hex(utxo[1], 4))
|
||||
|
@ -437,26 +467,27 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
firstTransaction = True
|
||||
inputIndex = 0
|
||||
rawTx = tx.serialize_to_network()
|
||||
self.get_client().enableAlternate2fa(False)
|
||||
client_ledger.enableAlternate2fa(False)
|
||||
if segwitTransaction:
|
||||
self.get_client().startUntrustedTransaction(True, inputIndex,
|
||||
client_ledger.startUntrustedTransaction(True, inputIndex,
|
||||
chipInputs, redeemScripts[inputIndex], version=tx.version)
|
||||
# we don't set meaningful outputAddress, amount and fees
|
||||
# as we only care about the alternateEncoding==True branch
|
||||
outputData = self.get_client().finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
|
||||
outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
|
||||
outputData['outputData'] = txOutput
|
||||
if outputData['confirmationNeeded']:
|
||||
outputData['address'] = output
|
||||
self.handler.finished()
|
||||
pin = self.handler.get_auth( outputData ) # does the authenticate dialog and returns pin
|
||||
# do the authenticate dialog and get pin:
|
||||
pin = self.handler.get_auth(outputData, client=client_electrum)
|
||||
if not pin:
|
||||
raise UserWarning()
|
||||
self.handler.show_message(_("Confirmed. Signing Transaction..."))
|
||||
while inputIndex < len(inputs):
|
||||
singleInput = [ chipInputs[inputIndex] ]
|
||||
self.get_client().startUntrustedTransaction(False, 0,
|
||||
client_ledger.startUntrustedTransaction(False, 0,
|
||||
singleInput, redeemScripts[inputIndex], version=tx.version)
|
||||
inputSignature = self.get_client().untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
|
||||
inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
|
||||
inputSignature[0] = 0x30 # force for 1.4.9+
|
||||
my_pubkey = inputs[inputIndex][4]
|
||||
tx.add_signature_to_txin(txin_idx=inputIndex,
|
||||
|
@ -465,22 +496,23 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
inputIndex = inputIndex + 1
|
||||
else:
|
||||
while inputIndex < len(inputs):
|
||||
self.get_client().startUntrustedTransaction(firstTransaction, inputIndex,
|
||||
client_ledger.startUntrustedTransaction(firstTransaction, inputIndex,
|
||||
chipInputs, redeemScripts[inputIndex], version=tx.version)
|
||||
# we don't set meaningful outputAddress, amount and fees
|
||||
# as we only care about the alternateEncoding==True branch
|
||||
outputData = self.get_client().finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
|
||||
outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx))
|
||||
outputData['outputData'] = txOutput
|
||||
if outputData['confirmationNeeded']:
|
||||
outputData['address'] = output
|
||||
self.handler.finished()
|
||||
pin = self.handler.get_auth( outputData ) # does the authenticate dialog and returns pin
|
||||
# do the authenticate dialog and get pin:
|
||||
pin = self.handler.get_auth(outputData, client=client_electrum)
|
||||
if not pin:
|
||||
raise UserWarning()
|
||||
self.handler.show_message(_("Confirmed. Signing Transaction..."))
|
||||
else:
|
||||
# Sign input with the provided PIN
|
||||
inputSignature = self.get_client().untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
|
||||
inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime)
|
||||
inputSignature[0] = 0x30 # force for 1.4.9+
|
||||
my_pubkey = inputs[inputIndex][4]
|
||||
tx.add_signature_to_txin(txin_idx=inputIndex,
|
||||
|
@ -505,6 +537,7 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
finally:
|
||||
self.handler.finished()
|
||||
|
||||
@runs_in_hwd_thread
|
||||
@test_pin_unlocked
|
||||
@set_and_unset_signing
|
||||
def show_address(self, sequence, txin_type):
|
||||
|
@ -535,8 +568,8 @@ class Ledger_KeyStore(Hardware_KeyStore):
|
|||
self.handler.finished()
|
||||
|
||||
class LedgerPlugin(HW_PluginBase):
|
||||
libraries_available = BTCHIP
|
||||
keystore_class = Ledger_KeyStore
|
||||
minimum_library = (0, 1, 30)
|
||||
client = None
|
||||
DEVICE_IDS = [
|
||||
(0x2581, 0x1807), # HW.1 legacy btchip
|
||||
|
@ -553,14 +586,75 @@ class LedgerPlugin(HW_PluginBase):
|
|||
(0x2c97, 0x0009), # RFU
|
||||
(0x2c97, 0x000a) # RFU
|
||||
]
|
||||
VENDOR_IDS = (0x2c97, )
|
||||
LEDGER_MODEL_IDS = {
|
||||
0x10: "Ledger Nano S",
|
||||
0x40: "Ledger Nano X",
|
||||
}
|
||||
SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
|
||||
|
||||
def __init__(self, parent, config, name):
|
||||
self.segwit = config.get("segwit")
|
||||
HW_PluginBase.__init__(self, parent, config, name)
|
||||
if self.libraries_available:
|
||||
self.device_manager().register_devices(self.DEVICE_IDS)
|
||||
self.libraries_available = self.check_libraries_available()
|
||||
if not self.libraries_available:
|
||||
return
|
||||
# to support legacy devices and legacy firmwares
|
||||
self.device_manager().register_devices(self.DEVICE_IDS, plugin=self)
|
||||
# to support modern firmware
|
||||
self.device_manager().register_vendor_ids(self.VENDOR_IDS, plugin=self)
|
||||
|
||||
def get_library_version(self):
|
||||
try:
|
||||
import btchip
|
||||
version = btchip.__version__
|
||||
except ImportError:
|
||||
raise
|
||||
except:
|
||||
version = "unknown"
|
||||
if BTCHIP:
|
||||
return version
|
||||
else:
|
||||
raise LibraryFoundButUnusable(library_version=version)
|
||||
|
||||
@classmethod
|
||||
def _recognize_device(cls, product_key) -> Tuple[bool, Optional[str]]:
|
||||
"""Returns (can_recognize, model_name) tuple."""
|
||||
# legacy product_keys
|
||||
if product_key in cls.DEVICE_IDS:
|
||||
if product_key[0] == 0x2581:
|
||||
return True, "Ledger HW.1"
|
||||
if product_key == (0x2c97, 0x0000):
|
||||
return True, "Ledger Blue"
|
||||
if product_key == (0x2c97, 0x0001):
|
||||
return True, "Ledger Nano S"
|
||||
if product_key == (0x2c97, 0x0004):
|
||||
return True, "Ledger Nano X"
|
||||
return True, None
|
||||
# modern product_keys
|
||||
if product_key[0] == 0x2c97:
|
||||
product_id = product_key[1]
|
||||
model_id = product_id >> 8
|
||||
if model_id in cls.LEDGER_MODEL_IDS:
|
||||
model_name = cls.LEDGER_MODEL_IDS[model_id]
|
||||
return True, model_name
|
||||
# give up
|
||||
return False, None
|
||||
|
||||
def can_recognize_device(self, device: Device) -> bool:
|
||||
return self._recognize_device(device.product_key)[0]
|
||||
|
||||
@classmethod
|
||||
def device_name_from_product_key(cls, product_key) -> Optional[str]:
|
||||
return cls._recognize_device(product_key)[1]
|
||||
|
||||
def create_device_from_hid_enumeration(self, d, *, product_key):
|
||||
device = super().create_device_from_hid_enumeration(d, product_key=product_key)
|
||||
if not self.can_recognize_device(device):
|
||||
return None
|
||||
return device
|
||||
|
||||
@runs_in_hwd_thread
|
||||
def get_btchip_device(self, device):
|
||||
ledger = False
|
||||
if device.product_key[0] == 0x2581 and device.product_key[1] == 0x3b7c:
|
||||
|
@ -577,42 +671,38 @@ class LedgerPlugin(HW_PluginBase):
|
|||
dev.set_nonblocking(True)
|
||||
return HIDDongleHIDAPI(dev, ledger, BTCHIP_DEBUG)
|
||||
|
||||
@runs_in_hwd_thread
|
||||
def create_client(self, device, handler):
|
||||
if handler:
|
||||
self.handler = handler
|
||||
|
||||
client = self.get_btchip_device(device)
|
||||
if client is not None:
|
||||
is_hw1 = device.product_key[0] == 0x2581
|
||||
client = Ledger_Client(client, is_hw1=is_hw1)
|
||||
client = Ledger_Client(client, product_key=device.product_key, plugin=self)
|
||||
return client
|
||||
|
||||
def setup_device(self, device_info, wizard, purpose):
|
||||
devmgr = self.device_manager()
|
||||
device_id = device_info.device.id_
|
||||
client = devmgr.client_by_id(device_id)
|
||||
if client is None:
|
||||
raise UserFacingException(_('Failed to create a client for this device.') + '\n' +
|
||||
_('Make sure it is in the correct state.'))
|
||||
client.handler = self.create_handler(wizard)
|
||||
client.get_xpub("m/44'/0'", 'standard') # TODO replace by direct derivation once Nano S > 1.1
|
||||
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
|
||||
wizard.run_task_without_blocking_gui(
|
||||
task=lambda: client.get_xpub("m/0'", 'standard')) # TODO replace by direct derivation once Nano S > 1.1
|
||||
return client
|
||||
|
||||
def get_xpub(self, device_id, derivation, xtype, wizard):
|
||||
if xtype not in self.SUPPORTED_XTYPES:
|
||||
raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device))
|
||||
devmgr = self.device_manager()
|
||||
client = devmgr.client_by_id(device_id)
|
||||
client.handler = self.create_handler(wizard)
|
||||
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard)
|
||||
client.checkDevice()
|
||||
xpub = client.get_xpub(derivation, xtype)
|
||||
return xpub
|
||||
|
||||
def get_client(self, keystore, force_pair=True):
|
||||
@runs_in_hwd_thread
|
||||
def get_client(self, keystore, force_pair=True, *,
|
||||
devices=None, allow_user_interaction=True):
|
||||
# All client interaction should not be in the main GUI thread
|
||||
devmgr = self.device_manager()
|
||||
handler = keystore.handler
|
||||
with devmgr.hid_lock:
|
||||
client = devmgr.client_for_keystore(self, handler, keystore, force_pair)
|
||||
client = super().get_client(keystore, force_pair,
|
||||
devices=devices,
|
||||
allow_user_interaction=allow_user_interaction)
|
||||
# returns the client for a given keystore. can use xpub
|
||||
#if client:
|
||||
# client.used()
|
||||
|
@ -620,6 +710,7 @@ class LedgerPlugin(HW_PluginBase):
|
|||
client.checkDevice()
|
||||
return client
|
||||
|
||||
@runs_in_hwd_thread
|
||||
def show_address(self, wallet, address, keystore=None):
|
||||
if keystore is None:
|
||||
keystore = wallet.get_keystore()
|
||||
|
|
Loading…
Add table
Reference in a new issue