From 00bc7995c4426825ed99200bc141aff911f1767c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 17 Sep 2018 17:16:46 -0300 Subject: [PATCH] move lbryschema/tests/ into tests/unit/schema/ --- tests/unit/schema/__init__.py | 0 tests/unit/schema/test_data.py | 219 ++++++++++++++ tests/unit/schema/test_lbryschema.py | 423 +++++++++++++++++++++++++++ 3 files changed, 642 insertions(+) create mode 100644 tests/unit/schema/__init__.py create mode 100644 tests/unit/schema/test_data.py create mode 100644 tests/unit/schema/test_lbryschema.py diff --git a/tests/unit/schema/__init__.py b/tests/unit/schema/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/schema/test_data.py b/tests/unit/schema/test_data.py new file mode 100644 index 000000000..b564daf25 --- /dev/null +++ b/tests/unit/schema/test_data.py @@ -0,0 +1,219 @@ + + +claim_id_1 = "63f2da17b0d90042c559cc73b6b17f853945c43e" + +claim_address_2 = "bDtL6qriyimxz71DSYjojTBsm6cpM1bqmj" + +claim_address_1 = "bUG7VaMzLEqqyZQAyg9srxQzvf1wwnJ48w" + +nist256p_private_key = """-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIBhixPFinjHmG94r00VBjmE73XZmlSHag5Bg3BFdCeQgoAoGCCqGSM49 +AwEHoUQDQgAEtSfatRTR6ppwoDVJ94hbvhFDF42mACkWSc2Tao6zzYW4xaRPbI7j +IBUL+6prbDM+GXZ8X2mtmeaNIgjWTT7YFw== +-----END EC PRIVATE KEY----- +""" + +nist384p_private_key = """-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDD5PPbcgT62WADeVBkDFsKCTCwQULHD7eE0iZz7c9Xk+6gZazMFgsGp +O0Rs9n+lmACgBwYFK4EEACKhZANiAASzpp0t4nIxoedhQN+J2pZ/EmwZl/x4dwdd +AjY4ZwKBdhfWIWgtcET9PBJlda0EvxR+CTwrt1em26VNS/57eH3yNFJQdCQiMSFY +mTtML6D/rctN1oztTSQdwHPA9x99FcU= +-----END EC PRIVATE KEY----- +""" + +secp256k1_private_key = """-----BEGIN EC PRIVATE KEY----- +MHQCAQEEIPbjaEfCCCy5HHvGHkEw3X/dTJXlr4jcEJHV1OmcBDPmoAcGBSuBBAAK +oUQDQgAElLPrkVIapvtKrv0DkgQb9vAXtCQDBIu+iHlsQC5dx1ZnOWZwpYKQuM4i +LNbuTlfxCHWYwovwLjYnao8iwgp0og== +-----END EC PRIVATE KEY----- +""" + +nist256p_cert = { + "version": "_0_0_1", + "claimType": "certificateType", + "certificate": { + "publicKey": "3059301306072a8648ce3d020106082a8648ce3d03010703420004b527dab514d1ea9a70a03549f7885bbe1143178da600291649cd936a8eb3cd85b8c5a44f6c8ee320150bfbaa6b6c333e19767c5f69ad99e68d2208d64d3ed817", + "keyType": "NIST256p", + "version": "_0_0_1" + } +} + +nist384p_cert = { + "version": "_0_0_1", + "claimType": "certificateType", + "certificate": { + "publicKey": "3076301006072a8648ce3d020106052b8104002203620004b3a69d2de27231a1e76140df89da967f126c1997fc7877075d0236386702817617d621682d7044fd3c126575ad04bf147e093c2bb757a6dba54d4bfe7b787df2345250742422312158993b4c2fa0ffadcb4dd68ced4d241dc073c0f71f7d15c5", + "keyType": "NIST384p", + "version": "_0_0_1" + } +} + +secp256k1_cert = { + "version": "_0_0_1", + "claimType": "certificateType", + "certificate": { + "publicKey": "3056301006072a8648ce3d020106052b8104000a0342000494b3eb91521aa6fb4aaefd0392041bf6f017b42403048bbe88796c402e5dc75667396670a58290b8ce222cd6ee4e57f1087598c28bf02e36276a8f22c20a74a2", + "keyType": "SECP256k1", + "version": "_0_0_1" + } +} + +malformed_secp256k1_cert = { + "version": "_0_0_1", + "claimType": "certificateType", + "certificate": { + "publicKey": "3056301006072a8648ce3d020106052b8104000a0342000494b3eb91521aa6fb4aaefd0392041bf6f017b42403048bbe88796c402e5dc75667396670a58290b8ce222cd6ee4e57f1087598c28bf02e36276a8f22c20a74a2", + "keyType": "NIST256p", + "version": "_0_0_1" + } +} + +example_003 = { + "language": "en", + "license": "LBRY Inc", + "nsfw": False, + "description": "What is LBRY? An introduction with Alex Tabarrok", + "content_type": "video/mp4", + "author": "Samuel Bryan", + "ver": "0.0.3", + "title": "What is LBRY?", + "sources": { + "lbry_sd_hash": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b" + }, + "thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png" +} + +example_010 = { + "version": "_0_0_1", + "claimType": "streamType", + "stream": { + "source": { + "source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b", + "version": "_0_0_1", + "contentType": "video/mp4", + "sourceType": "lbry_sd_hash" + }, + "version": "_0_0_1", + "metadata": { + "license": "LBRY Inc", + "description": "What is LBRY? An introduction with Alex Tabarrok", + "language": "en", + "title": "What is LBRY?", + "author": "Samuel Bryan", + "version": "_0_1_0", + "nsfw": False, + "licenseUrl": "", + "preview": "", + "thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png" + } + } +} + +example_010_serialized = "080110011adc010801129401080410011a0d57686174206973204c4252593f223057686174206973204c4252593f20416e20696e74726f64756374696f6e207769746820416c6578205461626172726f6b2a0c53616d75656c20427279616e32084c42525920496e6338004a2f68747470733a2f2f73332e616d617a6f6e6177732e636f6d2f66696c65732e6c6272792e696f2f6c6f676f2e706e6752005a001a41080110011a30d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b2209766964656f2f6d7034" + +claim_010_signed_nist256p = { + "version": "_0_0_1", + "publisherSignature": { + "certificateId": "63f2da17b0d90042c559cc73b6b17f853945c43e", + "signatureType": "NIST256p", + "version": "_0_0_1", + "signature": "ec117f5e16a911f704aab8efa9178b1cdfcad0ba8e571ba86a56ecdade129fdff60ff7dcf00355bda788020a43a40fbd55aaaa080c3555fd8f0a87612b62936a" + }, + "claimType": "streamType", + "stream": { + "source": { + "source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b", + "version": "_0_0_1", + "contentType": "video/mp4", + "sourceType": "lbry_sd_hash" + }, + "version": "_0_0_1", + "metadata": { + "license": "LBRY Inc", + "description": "What is LBRY? An introduction with Alex Tabarrok", + "language": "en", + "title": "What is LBRY?", + "author": "Samuel Bryan", + "version": "_0_1_0", + "nsfw": False, + "licenseUrl": "", + "preview": "", + "thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png" + } + } +} + +claim_010_signed_nist384p = { + "version": "_0_0_1", + "publisherSignature": { + "certificateId": "63f2da17b0d90042c559cc73b6b17f853945c43e", + "signatureType": "NIST384p", + "version": "_0_0_1", + "signature": "18e56bb52872809ac598c366c5f0fa9ecbcadb01198b7150b0c4518049086b6b4f552f01d16eaf9cbbf061d8ee35520f8fe22f278a4d0aab5f9c8a4cadd38b6bd4bdbb3b4368e24c6e966ebc24684d24f3d19f5a3e4c7bf69273b0f94aa1c51b" + }, + "claimType": "streamType", + "stream": { + "source": { + "source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b", + "version": "_0_0_1", + "contentType": "video/mp4", + "sourceType": "lbry_sd_hash" + }, + "version": "_0_0_1", + "metadata": { + "license": "LBRY Inc", + "description": "What is LBRY? An introduction with Alex Tabarrok", + "language": "en", + "title": "What is LBRY?", + "author": "Samuel Bryan", + "version": "_0_1_0", + "nsfw": False, + "licenseUrl": "", + "preview": "", + "thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png" + } + } +} + +claim_010_signed_secp256k1 = { + "version": "_0_0_1", + "publisherSignature": { + "certificateId": "63f2da17b0d90042c559cc73b6b17f853945c43e", + "signatureType": "SECP256k1", + "version": "_0_0_1", + "signature": "798a37bd4310339e6a9b424ebc3fd2b3263280c13c0d08b1d1fa5e53d29c102b2d340cedecc5018988819db0ac6eb61bf67dbeec4ebee7231668fd13931e6320" + }, + "claimType": "streamType", + "stream": { + "source": { + "source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b", + "version": "_0_0_1", + "contentType": "video/mp4", + "sourceType": "lbry_sd_hash" + }, + "version": "_0_0_1", + "metadata": { + "license": "LBRY Inc", + "description": "What is LBRY? An introduction with Alex Tabarrok", + "language": "en", + "title": "What is LBRY?", + "author": "Samuel Bryan", + "version": "_0_1_0", + "nsfw": False, + "licenseUrl": "", + "preview": "", + "thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png" + } + } +} + +hex_encoded_003="7b22766572223a2022302e302e33222c20226465736372697074696f6e223a202274657374222c20226c6963656e7365223a2022437265617469766520436f6d6d6f6e73204174747269627574696f6e20342e3020496e7465726e6174696f6e616c222c2022617574686f72223a202274657374222c20227469746c65223a202274657374222c20226c616e6775616765223a2022656e222c2022736f7572636573223a207b226c6272795f73645f68617368223a2022323961643231386336316335393934393962323263313732323833373162356665396136653732356564633965663639316137383139623365373430363530303436373835323932303632396662636464626361636631336433313537396434227d2c2022636f6e74656e745f74797065223a2022696d6167652f706e67222c20226e736677223a2066616c73657d" + +decoded_hex_encoded_003={u'version': u'_0_0_1', u'claimType': u'streamType', u'stream': {u'source': {u'source': '29ad218c61c599499b22c17228371b5fe9a6e725edc9ef691a7819b3e7406500467852920629fbcddbcacf13d31579d4', u'version': u'_0_0_1', u'contentType': u'image/png', u'sourceType': u'lbry_sd_hash'}, u'version': u'_0_0_1', u'metadata': {u'license': u'Creative Commons Attribution 4.0 International', u'description': u'test', u'language': u'en', u'title': u'test', u'author': u'test', u'version': u'_0_1_0', u'nsfw': False, u'licenseUrl': u'', u'preview': u'', u'thumbnail': u''}}} + +binary_claim = b'\x08\x01\x10\x02"^\x08\x01\x10\x03"X0V0\x10\x06\x07*\x86H\xce=\x02\x01\x06\x05+\x81\x04\x00\n\x03B\x00\x04\x89U\x97\x1dk\xbc\xd4\xf7\xe2\xb5\xa9a7\xbc\xa4;\xda\x9a\x13\x84<\x05"\xa5\xc3\no;u\xb6\x8co\x10\x81\x8c\x1d\xf2\xe7\t\x9c.\xc8\x9b\x84\xabz:6\x15\xa5\xb3\x16\n\x03YT&M\x98\xec+\xef\x89;' +expected_binary_claim_decoded = {u'certificate': {u'keyType': u'SECP256k1', + u'publicKey': u'3056301006072a8648ce3d020106052b8104000a034200048955971d6bbcd4f7e2b5a96137bca43bda9a13843c0522a5c30a6f3b75b68c6f10818c1df2e7099c2ec89b84ab7a3a3615a5b3160a035954264d98ec2bef893b', + u'version': u'_0_0_1'}, + u'claimType': u'certificateType', + u'version': u'_0_0_1'} \ No newline at end of file diff --git a/tests/unit/schema/test_lbryschema.py b/tests/unit/schema/test_lbryschema.py new file mode 100644 index 000000000..350650fcb --- /dev/null +++ b/tests/unit/schema/test_lbryschema.py @@ -0,0 +1,423 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import ecdsa +import binascii +from copy import deepcopy +import unittest + +from test_data import example_003, example_010, example_010_serialized +from test_data import claim_id_1, claim_address_1, claim_address_2 +from test_data import binary_claim, expected_binary_claim_decoded +from test_data import nist256p_private_key, claim_010_signed_nist256p, nist256p_cert +from test_data import nist384p_private_key, claim_010_signed_nist384p, nist384p_cert +from test_data import secp256k1_private_key, claim_010_signed_secp256k1, secp256k1_cert +from test_data import hex_encoded_003, decoded_hex_encoded_003, malformed_secp256k1_cert +import lbryschema +from lbryschema.claim import ClaimDict +from lbryschema.schema import NIST256p, NIST384p, SECP256k1 +from lbryschema.legacy.migrate import migrate +from lbryschema.signer import get_signer +from lbryschema.uri import URI, URIParseError +from lbryschema.decode import smart_decode +from lbryschema.error import DecodeError, InvalidAddress +from lbryschema.address import decode_address, encode_address + + +parsed_uri_matches = [ + ("test", URI("test"), False), + ("test#%s" % claim_id_1, URI("test", claim_id=claim_id_1), False), + ("test:1", URI("test", claim_sequence=1), False), + ("test$1", URI("test", bid_position=1), False), + ("lbry://test", URI("test"), False), + ("lbry://test#%s" % claim_id_1, URI("test", claim_id=claim_id_1), False), + ("lbry://test:1", URI("test", claim_sequence=1), False), + ("lbry://test$1", URI("test", bid_position=1), False), + ("@test", URI("@test"), True), + ("@test#%s" % claim_id_1, URI("@test", claim_id=claim_id_1), True), + ("@test:1", URI("@test", claim_sequence=1), True), + ("@test$1", URI("@test", bid_position=1), True), + ("lbry://@test1:1/fakepath", URI("@test1", claim_sequence=1, path="fakepath"), True), + ("lbry://@test1$1/fakepath", URI("@test1", bid_position=1, path="fakepath"), True), + ("lbry://@test1#abcdef/fakepath", URI("@test1", claim_id="abcdef", path="fakepath"), True), + ("@z", URI("@z"), True), + ("@yx", URI("@yx"), True), + ("@abc", URI("@abc"), True) +] + +parsed_uri_raises = [ + ("lbry://", URIParseError), + ("lbry://test:3$1", URIParseError), + ("lbry://test$1:1", URIParseError), + ("lbry://test#x", URIParseError), + ("lbry://test#x/page", URIParseError), + ("lbry://test$", URIParseError), + ("lbry://test#", URIParseError), + ("lbry://test:", URIParseError), + ("lbry://test$x", URIParseError), + ("lbry://test:x", URIParseError), + ("lbry://@test@", URIParseError), + ("lbry://@test:", URIParseError), + ("lbry://test@", URIParseError), + ("lbry://tes@t", URIParseError), + ("lbry://test:1#%s" % claim_id_1, URIParseError), + ("lbry://test:0", URIParseError), + ("lbry://test$0", URIParseError), + ("lbry://test/path", URIParseError), + ("lbry://@test1#abcdef/fakepath:1", URIParseError), + ("lbry://@test1:1/fakepath:1", URIParseError), + ("lbry://@test1:1ab/fakepath", URIParseError), + ("lbry://test:1:1:1", URIParseError), + ("whatever/lbry://test", URIParseError), + ("lbry://lbry://test", URIParseError), + ("lbry://❀", URIParseError), + ("lbry://@/what", URIParseError), + ("lbry://abc:0x123", URIParseError), + ("lbry://abc:0x123/page", URIParseError), + ("lbry://@test1#ABCDEF/fakepath", URIParseError), + ("test:0001", URIParseError), + ("lbry://@test1$1/fakepath?arg1&arg2&arg3", URIParseError) +] + + +class UnitTest(unittest.TestCase): + maxDiff = 4000 + + +class TestURIParser(UnitTest): + def setUp(self): + self.longMessage = True + + def test_uri_parse(self): + for test_string, expected_uri_obj, is_channel in parsed_uri_matches: + try: + # string -> URI + self.assertEqual(URI.from_uri_string(test_string), expected_uri_obj, test_string) + # URI -> dict -> URI + self.assertEqual(URI.from_dict(expected_uri_obj.to_dict()), expected_uri_obj, + test_string) + # is_channel + self.assertEqual(URI.from_uri_string(test_string).is_channel, is_channel, + test_string) + + # convert-to-string test only works if protocol is present in test_string + if test_string.startswith('lbry://'): + # string -> URI -> string + self.assertEqual(URI.from_uri_string(test_string).to_uri_string(), test_string, + test_string) + # string -> URI -> dict -> URI -> string + uri_dict = URI.from_uri_string(test_string).to_dict() + self.assertEqual(URI.from_dict(uri_dict).to_uri_string(), test_string, + test_string) + # URI -> dict -> URI -> string + self.assertEqual(URI.from_dict(expected_uri_obj.to_dict()).to_uri_string(), + test_string, test_string) + except URIParseError as err: + print("ERROR: " + test_string) + raise + + def test_uri_errors(self): + for test_str, err in parsed_uri_raises: + try: + URI.from_uri_string(test_str) + except URIParseError: + pass + else: + print("\nSuccessfully parsed invalid url: " + test_str) + self.assertRaises(err, URI.from_uri_string, test_str) + + +class TestEncoderAndDecoder(UnitTest): + def test_encode_decode(self): + test_claim = ClaimDict.load_dict(example_010) + self.assertEqual(test_claim.is_certificate, False) + self.assertDictEqual(test_claim.claim_dict, example_010) + test_pb = test_claim.protobuf + self.assertDictEqual(ClaimDict.load_protobuf(test_pb).claim_dict, example_010) + self.assertEqual(test_pb.ByteSize(), ClaimDict.load_protobuf(test_pb).protobuf_len) + self.assertEqual(test_claim.json_len, ClaimDict.load_protobuf(test_pb).json_len) + + def test_deserialize(self): + deserialized_claim = ClaimDict.deserialize(binascii.unhexlify(example_010_serialized)) + self.assertDictEqual(ClaimDict.load_dict(example_010).claim_dict, + deserialized_claim.claim_dict) + + def test_stream_is_not_certificate(self): + deserialized_claim = ClaimDict.deserialize(binascii.unhexlify(example_010_serialized)) + self.assertEqual(deserialized_claim.is_certificate, False) + + +class TestISO639(UnitTest): + def test_alpha2(self): + prefixes = ['en', 'aa', 'ab', 'ae', 'af', 'ak', 'am', 'an', 'ar', 'as', 'av', 'ay', 'az', + 'ba', 'be', 'bg', 'bh', 'bi', 'bm', 'bn', 'bo', 'br', 'bs', 'ca', 'ce', 'ch', + 'co', 'cr', 'cs', 'cu', 'cv', 'cy', 'da', 'de', 'dv', 'dz', 'ee', 'el', 'eo', + 'es', 'et', 'eu', 'fa', 'ff', 'fi', 'fj', 'fo', 'fr', 'fy', 'ga', 'gd', 'gl', + 'gn', 'gu', 'gv', 'ha', 'he', 'hi', 'ho', 'hr', 'ht', 'hu', 'hy', 'hz', 'ia', + 'id', 'ie', 'ig', 'ii', 'ik', 'io', 'is', 'it', 'iu', 'ja', 'jv', 'ka', 'kg', + 'ki', 'kj', 'kk', 'kl', 'km', 'kn', 'ko', 'kr', 'ks', 'ku', 'kv', 'kw', 'ky', + 'la', 'lb', 'lg', 'li', 'ln', 'lo', 'lt', 'lu', 'lv', 'mg', 'mh', 'mi', 'mk', + 'ml', 'mn', 'mr', 'ms', 'mt', 'my', 'na', 'nb', 'nd', 'ne', 'ng', 'nl', 'nn', + 'no', 'nr', 'nv', 'ny', 'oc', 'oj', 'om', 'or', 'os', 'pa', 'pi', 'pl', 'ps', + 'pt', 'qu', 'rm', 'rn', 'ro', 'ru', 'rw', 'sa', 'sc', 'sd', 'se', 'sg', 'si', + 'sk', 'sl', 'sm', 'sn', 'so', 'sq', 'sr', 'ss', 'st', 'su', 'sv', 'sw', 'ta', + 'te', 'tg', 'th', 'ti', 'tk', 'tl', 'tn', 'to', 'tr', 'ts', 'tt', 'tw', 'ty', + 'ug', 'uk', 'ur', 'uz', 've', 'vi', 'vo', 'wa', 'wo', 'xh', 'yi', 'yo', 'za', + 'zh', 'zu'] + for prefix in prefixes: + metadata = deepcopy(example_010) + metadata['stream']['metadata']['language'] = prefix + claim = ClaimDict.load_dict(metadata) + serialized = claim.serialized + self.assertDictEqual(metadata, dict(ClaimDict.deserialize(serialized).claim_dict)) + + def test_fake_alpha2(self): + fake_codes = ["bb", "zz"] + for fake_code in fake_codes: + metadata = deepcopy(example_010) + metadata['stream']['metadata']['language'] = fake_code + self.assertRaises(DecodeError, ClaimDict.load_dict, metadata) + + +class TestMigration(UnitTest): + def test_migrate_to_010(self): + migrated_0_1_0 = migrate(example_003) + self.assertDictEqual(migrated_0_1_0.claim_dict, example_010) + self.assertEqual(migrated_0_1_0.is_certificate, False) + + +class TestNIST256pSignatures(UnitTest): + def test_make_ecdsa_cert(self): + cert = ClaimDict.generate_certificate(nist256p_private_key, curve=NIST256p) + self.assertEqual(cert.is_certificate, True) + self.assertDictEqual(cert.claim_dict, nist256p_cert) + + def test_validate_ecdsa_signature(self): + cert = ClaimDict.generate_certificate(nist256p_private_key, curve=NIST256p) + signed = ClaimDict.load_dict(example_010).sign(nist256p_private_key, + claim_address_2, claim_id_1, curve=NIST256p) + self.assertDictEqual(signed.claim_dict, claim_010_signed_nist256p) + signed_copy = ClaimDict.load_protobuf(signed.protobuf) + self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True) + + def test_remove_signature_equals_unsigned(self): + unsigned = ClaimDict.load_dict(example_010) + signed = unsigned.sign(nist256p_private_key, claim_address_1, claim_id_1, curve=NIST256p) + self.assertEqual(unsigned.serialized, signed.serialized_no_signature) + + def test_fail_to_validate_fake_ecdsa_signature(self): + signed = ClaimDict.load_dict(example_010).sign(nist256p_private_key, claim_address_1, + claim_id_1, curve=NIST256p) + signed_copy = ClaimDict.load_protobuf(signed.protobuf) + fake_key = get_signer(NIST256p).generate().private_key.to_pem() + fake_cert = ClaimDict.generate_certificate(fake_key, curve=NIST256p) + self.assertRaises(ecdsa.keys.BadSignatureError, signed_copy.validate_signature, + claim_address_2, fake_cert) + + def test_fail_to_validate_ecdsa_sig_for_altered_claim(self): + cert = ClaimDict.generate_certificate(nist256p_private_key, curve=NIST256p) + altered = ClaimDict.load_dict(example_010).sign(nist256p_private_key, claim_address_1, + claim_id_1, curve=NIST256p) + sd_hash = altered['stream']['source']['source'] + altered['stream']['source']['source'] = sd_hash[::-1] + altered_copy = ClaimDict.load_dict(altered.claim_dict) + self.assertRaises(ecdsa.keys.BadSignatureError, altered_copy.validate_signature, + claim_address_1, cert) + + +class TestNIST384pSignatures(UnitTest): + def test_make_ecdsa_cert(self): + cert = ClaimDict.generate_certificate(nist384p_private_key, curve=NIST384p) + self.assertEqual(cert.is_certificate, True) + self.assertDictEqual(cert.claim_dict, nist384p_cert) + + def test_validate_ecdsa_signature(self): + cert = ClaimDict.generate_certificate(nist384p_private_key, curve=NIST384p) + signed = ClaimDict.load_dict(example_010).sign(nist384p_private_key, + claim_address_2, claim_id_1, curve=NIST384p) + self.assertDictEqual(signed.claim_dict, claim_010_signed_nist384p) + signed_copy = ClaimDict.load_protobuf(signed.protobuf) + self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True) + + def test_remove_signature_equals_unsigned(self): + unsigned = ClaimDict.load_dict(example_010) + signed = unsigned.sign(nist384p_private_key, claim_address_1, claim_id_1, curve=NIST384p) + self.assertEqual(unsigned.serialized, signed.serialized_no_signature) + + def test_fail_to_validate_fake_ecdsa_signature(self): + signed = ClaimDict.load_dict(example_010).sign(nist384p_private_key, claim_address_1, + claim_id_1, curve=NIST384p) + signed_copy = ClaimDict.load_protobuf(signed.protobuf) + fake_key = get_signer(NIST384p).generate().private_key.to_pem() + fake_cert = ClaimDict.generate_certificate(fake_key, curve=NIST384p) + self.assertRaises(ecdsa.keys.BadSignatureError, signed_copy.validate_signature, + claim_address_2, fake_cert) + + def test_fail_to_validate_ecdsa_sig_for_altered_claim(self): + cert = ClaimDict.generate_certificate(nist384p_private_key, curve=NIST384p) + altered = ClaimDict.load_dict(example_010).sign(nist384p_private_key, claim_address_1, + claim_id_1, curve=NIST384p) + sd_hash = altered['stream']['source']['source'] + altered['stream']['source']['source'] = sd_hash[::-1] + altered_copy = ClaimDict.load_dict(altered.claim_dict) + self.assertRaises(ecdsa.keys.BadSignatureError, altered_copy.validate_signature, + claim_address_1, cert) + + +class TestSECP256k1Signatures(UnitTest): + def test_make_ecdsa_cert(self): + cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) + self.assertEqual(cert.is_certificate, True) + self.assertDictEqual(cert.claim_dict, secp256k1_cert) + + def test_validate_ecdsa_signature(self): + cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) + self.assertDictEqual(cert.claim_dict, secp256k1_cert) + signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, + claim_id_1, curve=SECP256k1) + self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) + signed_copy = ClaimDict.load_protobuf(signed.protobuf) + self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True) + + def test_fail_to_sign_with_no_claim_address(self): + cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) + self.assertDictEqual(cert.claim_dict, secp256k1_cert) + self.assertRaises(Exception, ClaimDict.load_dict(example_010).sign, secp256k1_private_key, + None, claim_id_1, curve=SECP256k1) + + def test_fail_to_validate_with_no_claim_address(self): + cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) + self.assertDictEqual(cert.claim_dict, secp256k1_cert) + signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, + claim_id_1, curve=SECP256k1) + self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) + signed_copy = ClaimDict.load_protobuf(signed.protobuf) + self.assertRaises(Exception, signed_copy.validate_signature, None, cert) + + def test_remove_signature_equals_unsigned(self): + unsigned = ClaimDict.load_dict(example_010) + signed = unsigned.sign(secp256k1_private_key, claim_address_1, claim_id_1, curve=SECP256k1) + self.assertEqual(unsigned.serialized, signed.serialized_no_signature) + + def test_fail_to_validate_fake_ecdsa_signature(self): + signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, + claim_id_1, curve=SECP256k1) + signed_copy = ClaimDict.load_protobuf(signed.protobuf) + fake_key = get_signer(SECP256k1).generate().private_key.to_pem() + fake_cert = ClaimDict.generate_certificate(fake_key, curve=SECP256k1) + self.assertRaises(ecdsa.keys.BadSignatureError, signed_copy.validate_signature, + claim_address_2, fake_cert) + + def test_fail_to_validate_ecdsa_sig_for_altered_claim(self): + cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) + altered = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, + claim_id_1, curve=SECP256k1) + sd_hash = altered['stream']['source']['source'] + altered['stream']['source']['source'] = sd_hash[::-1] + altered_copy = ClaimDict.load_dict(altered.claim_dict) + self.assertRaises(ecdsa.keys.BadSignatureError, altered_copy.validate_signature, + claim_address_1, cert) + + +class TestMetadata(UnitTest): + def test_fail_with_fake_sd_hash(self): + claim = deepcopy(example_010) + sd_hash = claim['stream']['source']['source'][:-2] + claim['stream']['source']['source'] = sd_hash + self.assertRaises(AssertionError, ClaimDict.load_dict, claim) + + +class TestSmartDecode(UnitTest): + def test_hex_decode(self): + self.assertEqual(decoded_hex_encoded_003, smart_decode(hex_encoded_003).claim_dict) + + def test_binary_decode(self): + self.assertEqual(expected_binary_claim_decoded, smart_decode(binary_claim).claim_dict) + + def test_smart_decode_raises(self): + with self.assertRaises(TypeError): + smart_decode(1) + + with self.assertRaises(DecodeError): + smart_decode("aaab") + + with self.assertRaises(DecodeError): + smart_decode("{'bogus_dict':1}") + + +class TestMainnetAddressValidation(UnitTest): + def test_mainnet_address_encode_decode(self): + valid_addr_hex = "55be482f953ed0feda4fc5c4d012681b6119274993dc96bf10" + self.assertEqual(encode_address(binascii.unhexlify(valid_addr_hex)), + b"bW5PZEvEBNPQRVhwpYXSjabFgbSw1oaHyR") + self.assertEqual(decode_address("bW5PZEvEBNPQRVhwpYXSjabFgbSw1oaHyR"), + binascii.unhexlify(valid_addr_hex)) + + def test_mainnet_address_encode_error(self): + invalid_prefix = "54be482f953ed0feda4fc5c4d012681b6119274993dc96bf10" + invalid_checksum = "55be482f953ed0feda4fc5c4d012681b6119274993dc96bf11" + invalid_length = "55482f953ed0feda4fc5c4d012681b6119274993dc96bf10" + + with self.assertRaises(InvalidAddress): + encode_address(binascii.unhexlify(invalid_prefix)) + encode_address(binascii.unhexlify(invalid_checksum)) + encode_address(binascii.unhexlify(invalid_length)) + + def test_mainnet_address_decode_error(self): + with self.assertRaises(InvalidAddress): + decode_address("bW5PZEvEBNPQRVhwpYXSjabFgbSw1oaHR") + with self.assertRaises(InvalidAddress): + decode_address("mzGSynizDwSgURdnFjosZwakSVuZrdE8V4") + + +class TestRegtestAddressValidation(UnitTest): + def setUp(self): + lbryschema.BLOCKCHAIN_NAME = "lbrycrd_regtest" + + def tearDown(self): + lbryschema.BLOCKCHAIN_NAME = "lbrycrd_main" + + def test_regtest_address_encode_decode(self): + valid_addr_hex = "6fcdac187757dbf05500f613ada6fdd953d59b9acbf3c9343f" + self.assertEqual(encode_address(binascii.unhexlify(valid_addr_hex)), + b"mzGSynizDwSgURdnFjosZwakSVuZrdE8V4") + self.assertEqual(decode_address("mzGSynizDwSgURdnFjosZwakSVuZrdE8V4"), + binascii.unhexlify(valid_addr_hex)) + + def test_regtest_address_encode_error(self): + invalid_prefix = "6dcdac187757dbf05500f613ada6fdd953d59b9acbf3c9343f" + invalid_checksum = "6fcdac187757dbf05500f613ada6fdd953d59b9acbf3c9343d" + invalid_length = "6fcdac187757dbf05500f613ada6fdd953d59b9acbf3c934" + + with self.assertRaises(InvalidAddress): + encode_address(binascii.unhexlify(invalid_prefix)) + encode_address(binascii.unhexlify(invalid_checksum)) + encode_address(binascii.unhexlify(invalid_length)) + + def test_regtest_address_decode_error(self): + with self.assertRaises(InvalidAddress): + decode_address("bW5PZEvEBNPQRVhwpYXSjabFgbSw1oaHyR") + with self.assertRaises(InvalidAddress): + decode_address("mzGSynizDwSgURdnFjosZwakSVuZrdE8V5") + + +class TestInvalidCertificateCurve(UnitTest): + def test_invalid_cert_curve(self): + with self.assertRaises(Exception): + ClaimDict.load_dict(malformed_secp256k1_cert) + + +class TestValidatePrivateKey(UnitTest): + def test_valid_private_key_for_cert(self): + cert_claim = ClaimDict.load_dict(secp256k1_cert) + self.assertEqual(cert_claim.validate_private_key(secp256k1_private_key, claim_id_1), + True) + + def test_fail_to_load_wrong_private_key_for_cert(self): + cert_claim = ClaimDict.load_dict(secp256k1_cert) + self.assertEqual(cert_claim.validate_private_key(nist256p_private_key, claim_id_1), + False) + + +if __name__ == '__main__': + unittest.main()