mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
crypto: more type annotations
This commit is contained in:
parent
a6a003a345
commit
aceb022f9d
1 changed files with 27 additions and 25 deletions
|
@ -44,13 +44,13 @@ class InvalidPadding(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def append_PKCS7_padding(data):
|
||||
def append_PKCS7_padding(data: bytes) -> bytes:
|
||||
assert_bytes(data)
|
||||
padlen = 16 - (len(data) % 16)
|
||||
return data + bytes([padlen]) * padlen
|
||||
|
||||
|
||||
def strip_PKCS7_padding(data):
|
||||
def strip_PKCS7_padding(data: bytes) -> bytes:
|
||||
assert_bytes(data)
|
||||
if len(data) % 16 != 0 or len(data) == 0:
|
||||
raise InvalidPadding("invalid length")
|
||||
|
@ -63,7 +63,7 @@ def strip_PKCS7_padding(data):
|
|||
return data[0:-padlen]
|
||||
|
||||
|
||||
def aes_encrypt_with_iv(key, iv, data):
|
||||
def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
|
||||
assert_bytes(key, iv, data)
|
||||
data = append_PKCS7_padding(data)
|
||||
if AES:
|
||||
|
@ -75,7 +75,7 @@ def aes_encrypt_with_iv(key, iv, data):
|
|||
return e
|
||||
|
||||
|
||||
def aes_decrypt_with_iv(key, iv, data):
|
||||
def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
|
||||
assert_bytes(key, iv, data)
|
||||
if AES:
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
|
@ -90,36 +90,38 @@ def aes_decrypt_with_iv(key, iv, data):
|
|||
raise InvalidPassword()
|
||||
|
||||
|
||||
def EncodeAES(secret, s):
|
||||
assert_bytes(s)
|
||||
def EncodeAES(secret: bytes, msg: bytes) -> bytes:
|
||||
"""Returns base64 encoded ciphertext."""
|
||||
assert_bytes(msg)
|
||||
iv = bytes(os.urandom(16))
|
||||
ct = aes_encrypt_with_iv(secret, iv, s)
|
||||
ct = aes_encrypt_with_iv(secret, iv, msg)
|
||||
e = iv + ct
|
||||
return base64.b64encode(e)
|
||||
|
||||
def DecodeAES(secret, e):
|
||||
e = bytes(base64.b64decode(e))
|
||||
|
||||
def DecodeAES(secret: bytes, ciphertext_b64: Union[bytes, str]) -> bytes:
|
||||
e = bytes(base64.b64decode(ciphertext_b64))
|
||||
iv, e = e[:16], e[16:]
|
||||
s = aes_decrypt_with_iv(secret, iv, e)
|
||||
return s
|
||||
|
||||
def pw_encode(s, password):
|
||||
if password:
|
||||
secret = sha256d(password)
|
||||
return EncodeAES(secret, to_bytes(s, "utf8")).decode('utf8')
|
||||
else:
|
||||
return s
|
||||
|
||||
def pw_decode(s, password):
|
||||
if password is not None:
|
||||
secret = sha256d(password)
|
||||
try:
|
||||
d = to_string(DecodeAES(secret, s), "utf8")
|
||||
except Exception:
|
||||
raise InvalidPassword()
|
||||
return d
|
||||
else:
|
||||
return s
|
||||
def pw_encode(data: str, password: Union[bytes, str]) -> str:
|
||||
if not password:
|
||||
return data
|
||||
secret = sha256d(password)
|
||||
return EncodeAES(secret, to_bytes(data, "utf8")).decode('utf8')
|
||||
|
||||
|
||||
def pw_decode(data: str, password: Union[bytes, str]) -> str:
|
||||
if password is None:
|
||||
return data
|
||||
secret = sha256d(password)
|
||||
try:
|
||||
d = to_string(DecodeAES(secret, data), "utf8")
|
||||
except Exception:
|
||||
raise InvalidPassword()
|
||||
return d
|
||||
|
||||
|
||||
def sha256(x: Union[bytes, str]) -> bytes:
|
||||
|
|
Loading…
Add table
Reference in a new issue