mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
use TxOutputHwInfo namedtuple warn user if device is set to wrong chain undo parts of prev re testnet. fix p2wpkh. testnet support. and minor stuff
782 lines
25 KiB
Python
782 lines
25 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Electrum - lightweight Bitcoin client
|
|
# Copyright (C) 2011 thomasv@gitorious
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person
|
|
# obtaining a copy of this software and associated documentation files
|
|
# (the "Software"), to deal in the Software without restriction,
|
|
# including without limitation the rights to use, copy, modify, merge,
|
|
# publish, distribute, sublicense, and/or sell copies of the Software,
|
|
# and to permit persons to whom the Software is furnished to do so,
|
|
# subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
|
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
|
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
|
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
|
|
import hashlib
|
|
from typing import List
|
|
|
|
from .util import bfh, bh2u, BitcoinException, print_error, assert_bytes, to_bytes, inv_dict
|
|
from . import version
|
|
from . import segwit_addr
|
|
from . import constants
|
|
from . import ecc
|
|
from .crypto import Hash, sha256, hash_160, hmac_oneshot
|
|
|
|
|
|
################################## transactions
|
|
|
|
COINBASE_MATURITY = 100
|
|
COIN = 100000000
|
|
TOTAL_COIN_SUPPLY_LIMIT_IN_BTC = 21000000
|
|
|
|
# supported types of transaction outputs
|
|
TYPE_ADDRESS = 0
|
|
TYPE_PUBKEY = 1
|
|
TYPE_SCRIPT = 2
|
|
|
|
|
|
def rev_hex(s):
|
|
return bh2u(bfh(s)[::-1])
|
|
|
|
|
|
def int_to_hex(i: int, length: int=1) -> str:
|
|
"""Converts int to little-endian hex string.
|
|
`length` is the number of bytes available
|
|
"""
|
|
if not isinstance(i, int):
|
|
raise TypeError('{} instead of int'.format(i))
|
|
range_size = pow(256, length)
|
|
if i < -range_size/2 or i >= range_size:
|
|
raise OverflowError('cannot convert int {} to hex ({} bytes)'.format(i, length))
|
|
if i < 0:
|
|
# two's complement
|
|
i = range_size + i
|
|
s = hex(i)[2:].rstrip('L')
|
|
s = "0"*(2*length - len(s)) + s
|
|
return rev_hex(s)
|
|
|
|
def script_num_to_hex(i: int) -> str:
|
|
"""See CScriptNum in Bitcoin Core.
|
|
Encodes an integer as hex, to be used in script.
|
|
|
|
ported from https://github.com/bitcoin/bitcoin/blob/8cbc5c4be4be22aca228074f087a374a7ec38be8/src/script/script.h#L326
|
|
"""
|
|
if i == 0:
|
|
return ''
|
|
|
|
result = bytearray()
|
|
neg = i < 0
|
|
absvalue = abs(i)
|
|
while absvalue > 0:
|
|
result.append(absvalue & 0xff)
|
|
absvalue >>= 8
|
|
|
|
if result[-1] & 0x80:
|
|
result.append(0x80 if neg else 0x00)
|
|
elif neg:
|
|
result[-1] |= 0x80
|
|
|
|
return bh2u(result)
|
|
|
|
|
|
def var_int(i: int) -> str:
|
|
# https://en.bitcoin.it/wiki/Protocol_specification#Variable_length_integer
|
|
if i<0xfd:
|
|
return int_to_hex(i)
|
|
elif i<=0xffff:
|
|
return "fd"+int_to_hex(i,2)
|
|
elif i<=0xffffffff:
|
|
return "fe"+int_to_hex(i,4)
|
|
else:
|
|
return "ff"+int_to_hex(i,8)
|
|
|
|
|
|
def witness_push(item: str) -> str:
|
|
"""Returns data in the form it should be present in the witness.
|
|
hex -> hex
|
|
"""
|
|
return var_int(len(item) // 2) + item
|
|
|
|
|
|
def op_push(i: int) -> str:
|
|
if i<0x4c: # OP_PUSHDATA1
|
|
return int_to_hex(i)
|
|
elif i<=0xff:
|
|
return '4c' + int_to_hex(i)
|
|
elif i<=0xffff:
|
|
return '4d' + int_to_hex(i,2)
|
|
else:
|
|
return '4e' + int_to_hex(i,4)
|
|
|
|
|
|
def push_script(data: str) -> str:
|
|
"""Returns pushed data to the script, automatically
|
|
choosing canonical opcodes depending on the length of the data.
|
|
hex -> hex
|
|
|
|
ported from https://github.com/btcsuite/btcd/blob/fdc2bc867bda6b351191b5872d2da8270df00d13/txscript/scriptbuilder.go#L128
|
|
"""
|
|
data = bfh(data)
|
|
from .transaction import opcodes
|
|
|
|
data_len = len(data)
|
|
|
|
# "small integer" opcodes
|
|
if data_len == 0 or data_len == 1 and data[0] == 0:
|
|
return bh2u(bytes([opcodes.OP_0]))
|
|
elif data_len == 1 and data[0] <= 16:
|
|
return bh2u(bytes([opcodes.OP_1 - 1 + data[0]]))
|
|
elif data_len == 1 and data[0] == 0x81:
|
|
return bh2u(bytes([opcodes.OP_1NEGATE]))
|
|
|
|
return op_push(data_len) + bh2u(data)
|
|
|
|
|
|
def add_number_to_script(i: int) -> bytes:
|
|
return bfh(push_script(script_num_to_hex(i)))
|
|
|
|
|
|
hash_encode = lambda x: bh2u(x[::-1])
|
|
hash_decode = lambda x: bfh(x)[::-1]
|
|
hmac_sha_512 = lambda x, y: hmac_oneshot(x, y, hashlib.sha512)
|
|
|
|
|
|
def is_new_seed(x, prefix=version.SEED_PREFIX):
|
|
from . import mnemonic
|
|
x = mnemonic.normalize_text(x)
|
|
s = bh2u(hmac_sha_512(b"Seed version", x.encode('utf8')))
|
|
return s.startswith(prefix)
|
|
|
|
|
|
def is_old_seed(seed):
|
|
from . import old_mnemonic, mnemonic
|
|
seed = mnemonic.normalize_text(seed)
|
|
words = seed.split()
|
|
try:
|
|
# checks here are deliberately left weak for legacy reasons, see #3149
|
|
old_mnemonic.mn_decode(words)
|
|
uses_electrum_words = True
|
|
except Exception:
|
|
uses_electrum_words = False
|
|
try:
|
|
seed = bfh(seed)
|
|
is_hex = (len(seed) == 16 or len(seed) == 32)
|
|
except Exception:
|
|
is_hex = False
|
|
return is_hex or (uses_electrum_words and (len(words) == 12 or len(words) == 24))
|
|
|
|
|
|
def seed_type(x):
|
|
if is_old_seed(x):
|
|
return 'old'
|
|
elif is_new_seed(x):
|
|
return 'standard'
|
|
elif is_new_seed(x, version.SEED_PREFIX_SW):
|
|
return 'segwit'
|
|
elif is_new_seed(x, version.SEED_PREFIX_2FA):
|
|
return '2fa'
|
|
return ''
|
|
|
|
is_seed = lambda x: bool(seed_type(x))
|
|
|
|
|
|
############ functions from pywallet #####################
|
|
|
|
def hash160_to_b58_address(h160: bytes, addrtype):
|
|
s = bytes([addrtype])
|
|
s += h160
|
|
return base_encode(s+Hash(s)[0:4], base=58)
|
|
|
|
|
|
def b58_address_to_hash160(addr):
|
|
addr = to_bytes(addr, 'ascii')
|
|
_bytes = base_decode(addr, 25, base=58)
|
|
return _bytes[0], _bytes[1:21]
|
|
|
|
|
|
def hash160_to_p2pkh(h160, *, net=None):
|
|
if net is None:
|
|
net = constants.net
|
|
return hash160_to_b58_address(h160, net.ADDRTYPE_P2PKH)
|
|
|
|
def hash160_to_p2sh(h160, *, net=None):
|
|
if net is None:
|
|
net = constants.net
|
|
return hash160_to_b58_address(h160, net.ADDRTYPE_P2SH)
|
|
|
|
def public_key_to_p2pkh(public_key: bytes) -> str:
|
|
return hash160_to_p2pkh(hash_160(public_key))
|
|
|
|
def hash_to_segwit_addr(h, witver, *, net=None):
|
|
if net is None:
|
|
net = constants.net
|
|
return segwit_addr.encode(net.SEGWIT_HRP, witver, h)
|
|
|
|
def public_key_to_p2wpkh(public_key):
|
|
return hash_to_segwit_addr(hash_160(public_key), witver=0)
|
|
|
|
def script_to_p2wsh(script):
|
|
return hash_to_segwit_addr(sha256(bfh(script)), witver=0)
|
|
|
|
def p2wpkh_nested_script(pubkey):
|
|
pkh = bh2u(hash_160(bfh(pubkey)))
|
|
return '00' + push_script(pkh)
|
|
|
|
def p2wsh_nested_script(witness_script):
|
|
wsh = bh2u(sha256(bfh(witness_script)))
|
|
return '00' + push_script(wsh)
|
|
|
|
def pubkey_to_address(txin_type, pubkey):
|
|
if txin_type == 'p2pkh':
|
|
return public_key_to_p2pkh(bfh(pubkey))
|
|
elif txin_type == 'p2wpkh':
|
|
return public_key_to_p2wpkh(bfh(pubkey))
|
|
elif txin_type == 'p2wpkh-p2sh':
|
|
scriptSig = p2wpkh_nested_script(pubkey)
|
|
return hash160_to_p2sh(hash_160(bfh(scriptSig)))
|
|
else:
|
|
raise NotImplementedError(txin_type)
|
|
|
|
def redeem_script_to_address(txin_type, redeem_script):
|
|
if txin_type == 'p2sh':
|
|
return hash160_to_p2sh(hash_160(bfh(redeem_script)))
|
|
elif txin_type == 'p2wsh':
|
|
return script_to_p2wsh(redeem_script)
|
|
elif txin_type == 'p2wsh-p2sh':
|
|
scriptSig = p2wsh_nested_script(redeem_script)
|
|
return hash160_to_p2sh(hash_160(bfh(scriptSig)))
|
|
else:
|
|
raise NotImplementedError(txin_type)
|
|
|
|
|
|
def script_to_address(script, *, net=None):
|
|
from .transaction import get_address_from_output_script
|
|
t, addr = get_address_from_output_script(bfh(script), net=net)
|
|
assert t == TYPE_ADDRESS
|
|
return addr
|
|
|
|
def address_to_script(addr, *, net=None):
|
|
if net is None:
|
|
net = constants.net
|
|
witver, witprog = segwit_addr.decode(net.SEGWIT_HRP, addr)
|
|
if witprog is not None:
|
|
if not (0 <= witver <= 16):
|
|
raise BitcoinException('impossible witness version: {}'.format(witver))
|
|
OP_n = witver + 0x50 if witver > 0 else 0
|
|
script = bh2u(bytes([OP_n]))
|
|
script += push_script(bh2u(bytes(witprog)))
|
|
return script
|
|
addrtype, hash_160 = b58_address_to_hash160(addr)
|
|
if addrtype == net.ADDRTYPE_P2PKH:
|
|
script = '76a9' # op_dup, op_hash_160
|
|
script += push_script(bh2u(hash_160))
|
|
script += '88ac' # op_equalverify, op_checksig
|
|
elif addrtype == net.ADDRTYPE_P2SH:
|
|
script = 'a9' # op_hash_160
|
|
script += push_script(bh2u(hash_160))
|
|
script += '87' # op_equal
|
|
else:
|
|
raise BitcoinException('unknown address type: {}'.format(addrtype))
|
|
return script
|
|
|
|
def address_to_scripthash(addr):
|
|
script = address_to_script(addr)
|
|
return script_to_scripthash(script)
|
|
|
|
def script_to_scripthash(script):
|
|
h = sha256(bytes.fromhex(script))[0:32]
|
|
return bh2u(bytes(reversed(h)))
|
|
|
|
def public_key_to_p2pk_script(pubkey):
|
|
script = push_script(pubkey)
|
|
script += 'ac' # op_checksig
|
|
return script
|
|
|
|
__b58chars = b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
|
|
assert len(__b58chars) == 58
|
|
|
|
__b43chars = b'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ$*+-./:'
|
|
assert len(__b43chars) == 43
|
|
|
|
|
|
def base_encode(v: bytes, base: int) -> str:
|
|
""" encode v, which is a string of bytes, to base58."""
|
|
assert_bytes(v)
|
|
if base not in (58, 43):
|
|
raise ValueError('not supported base: {}'.format(base))
|
|
chars = __b58chars
|
|
if base == 43:
|
|
chars = __b43chars
|
|
long_value = 0
|
|
for (i, c) in enumerate(v[::-1]):
|
|
long_value += (256**i) * c
|
|
result = bytearray()
|
|
while long_value >= base:
|
|
div, mod = divmod(long_value, base)
|
|
result.append(chars[mod])
|
|
long_value = div
|
|
result.append(chars[long_value])
|
|
# Bitcoin does a little leading-zero-compression:
|
|
# leading 0-bytes in the input become leading-1s
|
|
nPad = 0
|
|
for c in v:
|
|
if c == 0x00:
|
|
nPad += 1
|
|
else:
|
|
break
|
|
result.extend([chars[0]] * nPad)
|
|
result.reverse()
|
|
return result.decode('ascii')
|
|
|
|
|
|
def base_decode(v, length, base):
|
|
""" decode v into a string of len bytes."""
|
|
# assert_bytes(v)
|
|
v = to_bytes(v, 'ascii')
|
|
if base not in (58, 43):
|
|
raise ValueError('not supported base: {}'.format(base))
|
|
chars = __b58chars
|
|
if base == 43:
|
|
chars = __b43chars
|
|
long_value = 0
|
|
for (i, c) in enumerate(v[::-1]):
|
|
digit = chars.find(bytes([c]))
|
|
if digit == -1:
|
|
raise ValueError('Forbidden character {} for base {}'.format(c, base))
|
|
long_value += digit * (base**i)
|
|
result = bytearray()
|
|
while long_value >= 256:
|
|
div, mod = divmod(long_value, 256)
|
|
result.append(mod)
|
|
long_value = div
|
|
result.append(long_value)
|
|
nPad = 0
|
|
for c in v:
|
|
if c == chars[0]:
|
|
nPad += 1
|
|
else:
|
|
break
|
|
result.extend(b'\x00' * nPad)
|
|
if length is not None and len(result) != length:
|
|
return None
|
|
result.reverse()
|
|
return bytes(result)
|
|
|
|
|
|
class InvalidChecksum(Exception):
|
|
pass
|
|
|
|
|
|
def EncodeBase58Check(vchIn):
|
|
hash = Hash(vchIn)
|
|
return base_encode(vchIn + hash[0:4], base=58)
|
|
|
|
|
|
def DecodeBase58Check(psz):
|
|
vchRet = base_decode(psz, None, base=58)
|
|
key = vchRet[0:-4]
|
|
csum = vchRet[-4:]
|
|
hash = Hash(key)
|
|
cs32 = hash[0:4]
|
|
if cs32 != csum:
|
|
raise InvalidChecksum('expected {}, actual {}'.format(bh2u(cs32), bh2u(csum)))
|
|
else:
|
|
return key
|
|
|
|
|
|
# backwards compat
|
|
# extended WIF for segwit (used in 3.0.x; but still used internally)
|
|
# the keys in this dict should be a superset of what Imported Wallets can import
|
|
WIF_SCRIPT_TYPES = {
|
|
'p2pkh':0,
|
|
'p2wpkh':1,
|
|
'p2wpkh-p2sh':2,
|
|
'p2sh':5,
|
|
'p2wsh':6,
|
|
'p2wsh-p2sh':7
|
|
}
|
|
WIF_SCRIPT_TYPES_INV = inv_dict(WIF_SCRIPT_TYPES)
|
|
|
|
|
|
PURPOSE48_SCRIPT_TYPES = {
|
|
'p2wsh-p2sh': 1, # specifically multisig
|
|
'p2wsh': 2, # specifically multisig
|
|
}
|
|
PURPOSE48_SCRIPT_TYPES_INV = inv_dict(PURPOSE48_SCRIPT_TYPES)
|
|
|
|
|
|
def serialize_privkey(secret: bytes, compressed: bool, txin_type: str,
|
|
internal_use: bool=False) -> str:
|
|
# we only export secrets inside curve range
|
|
secret = ecc.ECPrivkey.normalize_secret_bytes(secret)
|
|
if internal_use:
|
|
prefix = bytes([(WIF_SCRIPT_TYPES[txin_type] + constants.net.WIF_PREFIX) & 255])
|
|
else:
|
|
prefix = bytes([constants.net.WIF_PREFIX])
|
|
suffix = b'\01' if compressed else b''
|
|
vchIn = prefix + secret + suffix
|
|
base58_wif = EncodeBase58Check(vchIn)
|
|
if internal_use:
|
|
return base58_wif
|
|
else:
|
|
return '{}:{}'.format(txin_type, base58_wif)
|
|
|
|
|
|
def deserialize_privkey(key: str) -> (str, bytes, bool):
|
|
if is_minikey(key):
|
|
return 'p2pkh', minikey_to_private_key(key), False
|
|
|
|
txin_type = None
|
|
if ':' in key:
|
|
txin_type, key = key.split(sep=':', maxsplit=1)
|
|
if txin_type not in WIF_SCRIPT_TYPES:
|
|
raise BitcoinException('unknown script type: {}'.format(txin_type))
|
|
try:
|
|
vch = DecodeBase58Check(key)
|
|
except BaseException:
|
|
neutered_privkey = str(key)[:3] + '..' + str(key)[-2:]
|
|
raise BitcoinException("cannot deserialize privkey {}"
|
|
.format(neutered_privkey))
|
|
|
|
if txin_type is None:
|
|
# keys exported in version 3.0.x encoded script type in first byte
|
|
prefix_value = vch[0] - constants.net.WIF_PREFIX
|
|
try:
|
|
txin_type = WIF_SCRIPT_TYPES_INV[prefix_value]
|
|
except KeyError:
|
|
raise BitcoinException('invalid prefix ({}) for WIF key (1)'.format(vch[0]))
|
|
else:
|
|
# all other keys must have a fixed first byte
|
|
if vch[0] != constants.net.WIF_PREFIX:
|
|
raise BitcoinException('invalid prefix ({}) for WIF key (2)'.format(vch[0]))
|
|
|
|
if len(vch) not in [33, 34]:
|
|
raise BitcoinException('invalid vch len for WIF key: {}'.format(len(vch)))
|
|
compressed = len(vch) == 34
|
|
secret_bytes = vch[1:33]
|
|
# we accept secrets outside curve range; cast into range here:
|
|
secret_bytes = ecc.ECPrivkey.normalize_secret_bytes(secret_bytes)
|
|
return txin_type, secret_bytes, compressed
|
|
|
|
|
|
def is_compressed(sec):
|
|
return deserialize_privkey(sec)[2]
|
|
|
|
|
|
def address_from_private_key(sec):
|
|
txin_type, privkey, compressed = deserialize_privkey(sec)
|
|
public_key = ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed)
|
|
return pubkey_to_address(txin_type, public_key)
|
|
|
|
def is_segwit_address(addr):
|
|
try:
|
|
witver, witprog = segwit_addr.decode(constants.net.SEGWIT_HRP, addr)
|
|
except Exception as e:
|
|
return False
|
|
return witprog is not None
|
|
|
|
def is_b58_address(addr):
|
|
try:
|
|
addrtype, h = b58_address_to_hash160(addr)
|
|
except Exception as e:
|
|
return False
|
|
if addrtype not in [constants.net.ADDRTYPE_P2PKH, constants.net.ADDRTYPE_P2SH]:
|
|
return False
|
|
return addr == hash160_to_b58_address(h, addrtype)
|
|
|
|
def is_address(addr):
|
|
return is_segwit_address(addr) or is_b58_address(addr)
|
|
|
|
|
|
def is_private_key(key):
|
|
try:
|
|
k = deserialize_privkey(key)
|
|
return k is not False
|
|
except:
|
|
return False
|
|
|
|
|
|
########### end pywallet functions #######################
|
|
|
|
def is_minikey(text):
|
|
# Minikeys are typically 22 or 30 characters, but this routine
|
|
# permits any length of 20 or more provided the minikey is valid.
|
|
# A valid minikey must begin with an 'S', be in base58, and when
|
|
# suffixed with '?' have its SHA256 hash begin with a zero byte.
|
|
# They are widely used in Casascius physical bitcoins.
|
|
return (len(text) >= 20 and text[0] == 'S'
|
|
and all(ord(c) in __b58chars for c in text)
|
|
and sha256(text + '?')[0] == 0x00)
|
|
|
|
def minikey_to_private_key(text):
|
|
return sha256(text)
|
|
|
|
|
|
###################################### BIP32 ##############################
|
|
|
|
BIP32_PRIME = 0x80000000
|
|
|
|
|
|
def protect_against_invalid_ecpoint(func):
|
|
def func_wrapper(*args):
|
|
n = args[-1]
|
|
while True:
|
|
is_prime = n & BIP32_PRIME
|
|
try:
|
|
return func(*args[:-1], n=n)
|
|
except ecc.InvalidECPointException:
|
|
print_error('bip32 protect_against_invalid_ecpoint: skipping index')
|
|
n += 1
|
|
is_prime2 = n & BIP32_PRIME
|
|
if is_prime != is_prime2: raise OverflowError()
|
|
return func_wrapper
|
|
|
|
|
|
# Child private key derivation function (from master private key)
|
|
# k = master private key (32 bytes)
|
|
# c = master chain code (extra entropy for key derivation) (32 bytes)
|
|
# n = the index of the key we want to derive. (only 32 bits will be used)
|
|
# If n is hardened (i.e. the 32nd bit is set), the resulting private key's
|
|
# corresponding public key can NOT be determined without the master private key.
|
|
# However, if n is not hardened, the resulting private key's corresponding
|
|
# public key can be determined without the master private key.
|
|
@protect_against_invalid_ecpoint
|
|
def CKD_priv(k, c, n):
|
|
if n < 0: raise ValueError('the bip32 index needs to be non-negative')
|
|
is_prime = n & BIP32_PRIME
|
|
return _CKD_priv(k, c, bfh(rev_hex(int_to_hex(n,4))), is_prime)
|
|
|
|
|
|
def _CKD_priv(k, c, s, is_prime):
|
|
try:
|
|
keypair = ecc.ECPrivkey(k)
|
|
except ecc.InvalidECPointException as e:
|
|
raise BitcoinException('Impossible xprv (not within curve order)') from e
|
|
cK = keypair.get_public_key_bytes(compressed=True)
|
|
data = bytes([0]) + k + s if is_prime else cK + s
|
|
I = hmac_oneshot(c, data, hashlib.sha512)
|
|
I_left = ecc.string_to_number(I[0:32])
|
|
k_n = (I_left + ecc.string_to_number(k)) % ecc.CURVE_ORDER
|
|
if I_left >= ecc.CURVE_ORDER or k_n == 0:
|
|
raise ecc.InvalidECPointException()
|
|
k_n = ecc.number_to_string(k_n, ecc.CURVE_ORDER)
|
|
c_n = I[32:]
|
|
return k_n, c_n
|
|
|
|
# Child public key derivation function (from public key only)
|
|
# K = master public key
|
|
# c = master chain code
|
|
# n = index of key we want to derive
|
|
# This function allows us to find the nth public key, as long as n is
|
|
# not hardened. If n is hardened, we need the master private key to find it.
|
|
@protect_against_invalid_ecpoint
|
|
def CKD_pub(cK, c, n):
|
|
if n < 0: raise ValueError('the bip32 index needs to be non-negative')
|
|
if n & BIP32_PRIME: raise Exception()
|
|
return _CKD_pub(cK, c, bfh(rev_hex(int_to_hex(n,4))))
|
|
|
|
# helper function, callable with arbitrary string.
|
|
# note: 's' does not need to fit into 32 bits here! (c.f. trustedcoin billing)
|
|
def _CKD_pub(cK, c, s):
|
|
I = hmac_oneshot(c, cK + s, hashlib.sha512)
|
|
pubkey = ecc.ECPrivkey(I[0:32]) + ecc.ECPubkey(cK)
|
|
if pubkey.is_at_infinity():
|
|
raise ecc.InvalidECPointException()
|
|
cK_n = pubkey.get_public_key_bytes(compressed=True)
|
|
c_n = I[32:]
|
|
return cK_n, c_n
|
|
|
|
|
|
def xprv_header(xtype, *, net=None):
|
|
if net is None:
|
|
net = constants.net
|
|
return bfh("%08x" % net.XPRV_HEADERS[xtype])
|
|
|
|
|
|
def xpub_header(xtype, *, net=None):
|
|
if net is None:
|
|
net = constants.net
|
|
return bfh("%08x" % net.XPUB_HEADERS[xtype])
|
|
|
|
|
|
def serialize_xprv(xtype, c, k, depth=0, fingerprint=b'\x00'*4,
|
|
child_number=b'\x00'*4, *, net=None):
|
|
if not ecc.is_secret_within_curve_range(k):
|
|
raise BitcoinException('Impossible xprv (not within curve order)')
|
|
xprv = xprv_header(xtype, net=net) \
|
|
+ bytes([depth]) + fingerprint + child_number + c + bytes([0]) + k
|
|
return EncodeBase58Check(xprv)
|
|
|
|
|
|
def serialize_xpub(xtype, c, cK, depth=0, fingerprint=b'\x00'*4,
|
|
child_number=b'\x00'*4, *, net=None):
|
|
xpub = xpub_header(xtype, net=net) \
|
|
+ bytes([depth]) + fingerprint + child_number + c + cK
|
|
return EncodeBase58Check(xpub)
|
|
|
|
|
|
class InvalidMasterKeyVersionBytes(BitcoinException): pass
|
|
|
|
|
|
def deserialize_xkey(xkey, prv, *, net=None):
|
|
if net is None:
|
|
net = constants.net
|
|
xkey = DecodeBase58Check(xkey)
|
|
if len(xkey) != 78:
|
|
raise BitcoinException('Invalid length for extended key: {}'
|
|
.format(len(xkey)))
|
|
depth = xkey[4]
|
|
fingerprint = xkey[5:9]
|
|
child_number = xkey[9:13]
|
|
c = xkey[13:13+32]
|
|
header = int('0x' + bh2u(xkey[0:4]), 16)
|
|
headers = net.XPRV_HEADERS if prv else net.XPUB_HEADERS
|
|
if header not in headers.values():
|
|
raise InvalidMasterKeyVersionBytes('Invalid extended key format: {}'
|
|
.format(hex(header)))
|
|
xtype = list(headers.keys())[list(headers.values()).index(header)]
|
|
n = 33 if prv else 32
|
|
K_or_k = xkey[13+n:]
|
|
if prv and not ecc.is_secret_within_curve_range(K_or_k):
|
|
raise BitcoinException('Impossible xprv (not within curve order)')
|
|
return xtype, depth, fingerprint, child_number, c, K_or_k
|
|
|
|
|
|
def deserialize_xpub(xkey, *, net=None):
|
|
return deserialize_xkey(xkey, False, net=net)
|
|
|
|
def deserialize_xprv(xkey, *, net=None):
|
|
return deserialize_xkey(xkey, True, net=net)
|
|
|
|
def xpub_type(x):
|
|
return deserialize_xpub(x)[0]
|
|
|
|
|
|
def is_xpub(text):
|
|
try:
|
|
deserialize_xpub(text)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def is_xprv(text):
|
|
try:
|
|
deserialize_xprv(text)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def xpub_from_xprv(xprv):
|
|
xtype, depth, fingerprint, child_number, c, k = deserialize_xprv(xprv)
|
|
cK = ecc.ECPrivkey(k).get_public_key_bytes(compressed=True)
|
|
return serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
|
|
|
|
|
|
def bip32_root(seed, xtype):
|
|
I = hmac_oneshot(b"Bitcoin seed", seed, hashlib.sha512)
|
|
master_k = I[0:32]
|
|
master_c = I[32:]
|
|
# create xprv first, as that will check if master_k is within curve order
|
|
xprv = serialize_xprv(xtype, master_c, master_k)
|
|
cK = ecc.ECPrivkey(master_k).get_public_key_bytes(compressed=True)
|
|
xpub = serialize_xpub(xtype, master_c, cK)
|
|
return xprv, xpub
|
|
|
|
|
|
def xpub_from_pubkey(xtype, cK):
|
|
if cK[0] not in (0x02, 0x03):
|
|
raise ValueError('Unexpected first byte: {}'.format(cK[0]))
|
|
return serialize_xpub(xtype, b'\x00'*32, cK)
|
|
|
|
|
|
def bip32_derivation(s):
|
|
if not s.startswith('m/'):
|
|
raise ValueError('invalid bip32 derivation path: {}'.format(s))
|
|
s = s[2:]
|
|
for n in s.split('/'):
|
|
if n == '': continue
|
|
i = int(n[:-1]) + BIP32_PRIME if n[-1] == "'" else int(n)
|
|
yield i
|
|
|
|
def convert_bip32_path_to_list_of_uint32(n: str) -> List[int]:
|
|
"""Convert bip32 path to list of uint32 integers with prime flags
|
|
m/0/-1/1' -> [0, 0x80000001, 0x80000001]
|
|
|
|
based on code in trezorlib
|
|
"""
|
|
path = []
|
|
for x in n.split('/')[1:]:
|
|
if x == '': continue
|
|
prime = 0
|
|
if x.endswith("'"):
|
|
x = x.replace('\'', '')
|
|
prime = BIP32_PRIME
|
|
if x.startswith('-'):
|
|
prime = BIP32_PRIME
|
|
path.append(abs(int(x)) | prime)
|
|
return path
|
|
|
|
def is_bip32_derivation(x):
|
|
try:
|
|
[ i for i in bip32_derivation(x)]
|
|
return True
|
|
except :
|
|
return False
|
|
|
|
def bip32_private_derivation(xprv, branch, sequence):
|
|
if not sequence.startswith(branch):
|
|
raise ValueError('incompatible branch ({}) and sequence ({})'
|
|
.format(branch, sequence))
|
|
if branch == sequence:
|
|
return xprv, xpub_from_xprv(xprv)
|
|
xtype, depth, fingerprint, child_number, c, k = deserialize_xprv(xprv)
|
|
sequence = sequence[len(branch):]
|
|
for n in sequence.split('/'):
|
|
if n == '': continue
|
|
i = int(n[:-1]) + BIP32_PRIME if n[-1] == "'" else int(n)
|
|
parent_k = k
|
|
k, c = CKD_priv(k, c, i)
|
|
depth += 1
|
|
parent_cK = ecc.ECPrivkey(parent_k).get_public_key_bytes(compressed=True)
|
|
fingerprint = hash_160(parent_cK)[0:4]
|
|
child_number = bfh("%08X"%i)
|
|
cK = ecc.ECPrivkey(k).get_public_key_bytes(compressed=True)
|
|
xpub = serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
|
|
xprv = serialize_xprv(xtype, c, k, depth, fingerprint, child_number)
|
|
return xprv, xpub
|
|
|
|
|
|
def bip32_public_derivation(xpub, branch, sequence):
|
|
xtype, depth, fingerprint, child_number, c, cK = deserialize_xpub(xpub)
|
|
if not sequence.startswith(branch):
|
|
raise ValueError('incompatible branch ({}) and sequence ({})'
|
|
.format(branch, sequence))
|
|
sequence = sequence[len(branch):]
|
|
for n in sequence.split('/'):
|
|
if n == '': continue
|
|
i = int(n)
|
|
parent_cK = cK
|
|
cK, c = CKD_pub(cK, c, i)
|
|
depth += 1
|
|
fingerprint = hash_160(parent_cK)[0:4]
|
|
child_number = bfh("%08X"%i)
|
|
return serialize_xpub(xtype, c, cK, depth, fingerprint, child_number)
|
|
|
|
|
|
def bip32_private_key(sequence, k, chain):
|
|
for i in sequence:
|
|
k, chain = CKD_priv(k, chain, i)
|
|
return k
|