mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-31 01:11:35 +00:00
commands: fix encrypt/decrypt
based on Electron-Cash/Electron-Cash@62aa08a0ff
This commit is contained in:
parent
387834164c
commit
fd5b1acdc8
7 changed files with 44 additions and 18 deletions
|
@ -35,20 +35,16 @@ from decimal import Decimal
|
|||
from typing import Optional, TYPE_CHECKING
|
||||
|
||||
from .import util, ecc
|
||||
from .util import bfh, bh2u, format_satoshis, json_decode, json_encode, is_hash256_str
|
||||
from .util import bfh, bh2u, format_satoshis, json_decode, json_encode, is_hash256_str, is_hex_str, to_bytes
|
||||
from . import bitcoin
|
||||
from .bitcoin import is_address, hash_160, COIN, TYPE_ADDRESS
|
||||
from . import bip32
|
||||
from .bip32 import BIP32Node
|
||||
from .i18n import _
|
||||
from .transaction import Transaction, multisig_script, TxOutput
|
||||
from .paymentrequest import PR_PAID, PR_UNPAID, PR_UNKNOWN, PR_EXPIRED
|
||||
from .synchronizer import Notifier
|
||||
from .storage import WalletStorage
|
||||
from . import keystore
|
||||
from .wallet import Wallet, Imported_Wallet, Abstract_Wallet, create_new_wallet, restore_wallet_from_text
|
||||
from .wallet import Abstract_Wallet, create_new_wallet, restore_wallet_from_text
|
||||
from .address_synchronizer import TX_HEIGHT_LOCAL
|
||||
from .mnemonic import Mnemonic
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .network import Network
|
||||
|
@ -582,16 +578,27 @@ class Commands:
|
|||
return tx.as_dict()
|
||||
|
||||
@command('')
|
||||
def encrypt(self, pubkey, message):
|
||||
def encrypt(self, pubkey, message) -> str:
|
||||
"""Encrypt a message with a public key. Use quotes if the message contains whitespaces."""
|
||||
if not is_hex_str(pubkey):
|
||||
raise Exception(f"pubkey must be a hex string instead of {repr(pubkey)}")
|
||||
try:
|
||||
message = to_bytes(message)
|
||||
except TypeError:
|
||||
raise Exception(f"message must be a string-like object instead of {repr(message)}")
|
||||
public_key = ecc.ECPubkey(bfh(pubkey))
|
||||
encrypted = public_key.encrypt_message(message)
|
||||
return encrypted
|
||||
return encrypted.decode('utf-8')
|
||||
|
||||
@command('wp')
|
||||
def decrypt(self, pubkey, encrypted, password=None):
|
||||
def decrypt(self, pubkey, encrypted, password=None) -> str:
|
||||
"""Decrypt a message encrypted with a public key."""
|
||||
return self.wallet.decrypt_message(pubkey, encrypted, password)
|
||||
if not is_hex_str(pubkey):
|
||||
raise Exception(f"pubkey must be a hex string instead of {repr(pubkey)}")
|
||||
if not isinstance(encrypted, (str, bytes, bytearray)):
|
||||
raise Exception(f"encrypted must be a string-like object instead of {repr(encrypted)}")
|
||||
decrypted = self.wallet.decrypt_message(pubkey, encrypted, password)
|
||||
return decrypted.decode('utf-8')
|
||||
|
||||
def _format_request(self, out):
|
||||
pr_str = {
|
||||
|
|
|
@ -274,7 +274,7 @@ class ECPubkey(object):
|
|||
verifying_key = _MyVerifyingKey.from_public_point(ecdsa_point, curve=SECP256k1)
|
||||
verifying_key.verify_digest(sig_string, msg_hash, sigdecode=ecdsa.util.sigdecode_string)
|
||||
|
||||
def encrypt_message(self, message: bytes, magic: bytes = b'BIE1'):
|
||||
def encrypt_message(self, message: bytes, magic: bytes = b'BIE1') -> bytes:
|
||||
"""
|
||||
ECIES encryption/decryption methods; AES-128-CBC with PKCS7 is used as the cipher; hmac-sha256 is used as the mac
|
||||
"""
|
||||
|
|
|
@ -105,12 +105,12 @@ class Software_KeyStore(KeyStore):
|
|||
def may_have_password(self):
|
||||
return not self.is_watching_only()
|
||||
|
||||
def sign_message(self, sequence, message, password):
|
||||
def sign_message(self, sequence, message, password) -> bytes:
|
||||
privkey, compressed = self.get_private_key(sequence, password)
|
||||
key = ecc.ECPrivkey(privkey)
|
||||
return key.sign_message(message, compressed)
|
||||
|
||||
def decrypt_message(self, sequence, message, password):
|
||||
def decrypt_message(self, sequence, message, password) -> bytes:
|
||||
privkey, compressed = self.get_private_key(sequence, password)
|
||||
ec = ecc.ECPrivkey(privkey)
|
||||
decrypted = ec.decrypt_message(message)
|
||||
|
|
|
@ -8,7 +8,7 @@ from electrum import constants
|
|||
# If set, unit tests that would normally test functions with multiple implementations,
|
||||
# will only be run once, using the fastest implementation.
|
||||
# e.g. libsecp256k1 vs python-ecdsa. pycryptodomex vs pyaes.
|
||||
FAST_TESTS = False
|
||||
FAST_TESTS = 1
|
||||
|
||||
|
||||
# some unit tests are modifying globals; sorry.
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
import unittest
|
||||
from unittest import mock
|
||||
from decimal import Decimal
|
||||
|
||||
from electrum.commands import Commands, eval_bool
|
||||
from electrum import storage
|
||||
from electrum.wallet import restore_wallet_from_text
|
||||
|
||||
from . import TestCaseForTestnet
|
||||
|
||||
|
@ -62,6 +65,16 @@ class TestCommands(unittest.TestCase):
|
|||
for xkey2, xtype2 in xprvs:
|
||||
self.assertEqual(xkey2, cmds.convert_xkey(xkey1, xtype2))
|
||||
|
||||
@mock.patch.object(storage.WalletStorage, '_write')
|
||||
def test_encrypt_decrypt(self, mock_write):
|
||||
wallet = restore_wallet_from_text('p2wpkh:L4rYY5QpfN6wJEF4SEKDpcGhTPnCe9zcGs6hiSnhpprZqVywFifN',
|
||||
path='if_this_exists_mocking_failed_648151893')['wallet']
|
||||
cmds = Commands(config=None, wallet=wallet, network=None)
|
||||
cleartext = "asdasd this is the message"
|
||||
pubkey = "021f110909ded653828a254515b58498a6bafc96799fb0851554463ed44ca7d9da"
|
||||
ciphertext = cmds.encrypt(pubkey, cleartext)
|
||||
self.assertEqual(cleartext, cmds.decrypt(pubkey, ciphertext))
|
||||
|
||||
|
||||
class TestCommandsTestnet(TestCaseForTestnet):
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
import binascii
|
||||
import os, sys, re, json
|
||||
from collections import defaultdict, OrderedDict
|
||||
from typing import NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable
|
||||
from typing import NamedTuple, Union, TYPE_CHECKING, Tuple, Optional, Callable, Any
|
||||
from datetime import datetime
|
||||
import decimal
|
||||
from decimal import Decimal
|
||||
|
@ -493,9 +493,14 @@ def is_valid_email(s):
|
|||
return re.match(regexp, s) is not None
|
||||
|
||||
|
||||
def is_hash256_str(text: str) -> bool:
|
||||
def is_hash256_str(text: Any) -> bool:
|
||||
if not isinstance(text, str): return False
|
||||
if len(text) != 64: return False
|
||||
return is_hex_str(text)
|
||||
|
||||
|
||||
def is_hex_str(text: Any) -> bool:
|
||||
if not isinstance(text, str): return False
|
||||
try:
|
||||
bytes.fromhex(text)
|
||||
except:
|
||||
|
|
|
@ -1252,7 +1252,7 @@ class Abstract_Wallet(AddressSynchronizer):
|
|||
index = self.get_address_index(address)
|
||||
return self.keystore.sign_message(index, message, password)
|
||||
|
||||
def decrypt_message(self, pubkey, message, password):
|
||||
def decrypt_message(self, pubkey, message, password) -> bytes:
|
||||
addr = self.pubkeys_to_address(pubkey)
|
||||
index = self.get_address_index(addr)
|
||||
return self.keystore.decrypt_message(index, message, password)
|
||||
|
@ -1889,7 +1889,8 @@ def create_new_wallet(*, path, passphrase=None, password=None, encrypt_file=True
|
|||
return {'seed': seed, 'wallet': wallet, 'msg': msg}
|
||||
|
||||
|
||||
def restore_wallet_from_text(text, *, path, network, passphrase=None, password=None, encrypt_file=True):
|
||||
def restore_wallet_from_text(text, *, path, network=None,
|
||||
passphrase=None, password=None, encrypt_file=True):
|
||||
"""Restore a wallet from text. Text can be a seed phrase, a master
|
||||
public key, a master private key, a list of bitcoin addresses
|
||||
or bitcoin private keys."""
|
||||
|
|
Loading…
Add table
Reference in a new issue