# Electrum - lightweight Bitcoin client # Copyright (C) 2012 thomasv@ecdsa.org # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation files # (the "Software"), to deal in the Software without restriction, # including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, # and to permit persons to whom the Software is furnished to do so, # subject to the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import os import threading from typing import Optional, Dict, Mapping, Sequence import hashlib import hmac from . import util from .bitcoin import hash_encode, int_to_hex, rev_hex from .crypto import sha256d from . import constants from .util import bfh, bh2u from .simple_config import SimpleConfig from .logging import get_logger, Logger _logger = get_logger(__name__) HEADER_SIZE = 112 # bytes MAX_TARGET = 0x0000FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF GENESIS_BITS = 0x1f00ffff N_TARGET_TIMESPAN = 150 class MissingHeader(Exception): pass class InvalidHeader(Exception): pass def serialize_header(header_dict: dict) -> str: s = int_to_hex(header_dict['version'], 4) \ + rev_hex(header_dict['prev_block_hash']) \ + rev_hex(header_dict['merkle_root']) \ + rev_hex(header_dict['claim_trie_root']) \ + int_to_hex(int(header_dict['timestamp']), 4) \ + int_to_hex(int(header_dict['bits']), 4) \ + int_to_hex(int(header_dict['nonce']), 4) return s def deserialize_header(s: bytes, height: int) -> dict: if not s: raise InvalidHeader('Invalid header: {}'.format(s)) if len(s) != HEADER_SIZE: raise InvalidHeader('Invalid header length: {}'.format(len(s))) hex_to_int = lambda s: int.from_bytes(s, byteorder='little') h = {} h['version'] = hex_to_int(s[0:4]) h['prev_block_hash'] = hash_encode(s[4:36]) h['merkle_root'] = hash_encode(s[36:68]) h['claim_trie_root'] = hash_encode(s[68:100]) h['timestamp'] = hex_to_int(s[100:104]) h['bits'] = hex_to_int(s[104:108]) h['nonce'] = hex_to_int(s[108:112]) h['block_height'] = height return h def hash_header(header: dict) -> str: if header is None: return '0' * 64 if header.get('prev_block_hash') is None: header['prev_block_hash'] = '00'*32 return hash_raw_header(serialize_header(header)) def pow_hash_header(header: dict) -> str: if header is None: return '0' * 64 return hash_encode(PoWHash(bfh(serialize_header(header)))) def sha256(x): return hashlib.sha256(x).digest() def sha512(x): return hashlib.sha512(x).digest() def ripemd160(x): h = hashlib.new('ripemd160') h.update(x) return h.digest() def Hash(x): return sha256(sha256(x)) def hash_raw_header(header: str) -> str: return hash_encode(sha256d(bfh(header))) def PoWHash(x): r = sha512(Hash(x)) r1 = ripemd160(r[:len(r) // 2]) r2 = ripemd160(r[len(r) // 2:]) r3 = Hash(r1 + r2) return r3 # key: blockhash hex at forkpoint # the chain at some key is the best chain that includes the given hash blockchains = {} # type: Dict[str, Blockchain] blockchains_lock = threading.RLock() def read_blockchains(config: 'SimpleConfig'): best_chain = Blockchain(config=config, forkpoint=0, parent=None, forkpoint_hash=constants.net.GENESIS, prev_hash=None) blockchains[constants.net.GENESIS] = best_chain # consistency checks if best_chain.height() > constants.net.max_checkpoint(): header_after_cp = best_chain.read_header(constants.net.max_checkpoint()+1) if not header_after_cp or not best_chain.can_connect(header_after_cp, check_height=False): _logger.info("[blockchain] deleting best chain. cannot connect header after last cp to last cp.") os.unlink(best_chain.path()) best_chain.update_size() # forks fdir = os.path.join(util.get_headers_dir(config), 'forks') util.make_dir(fdir) # files are named as: fork2_{forkpoint}_{prev_hash}_{first_hash} l = filter(lambda x: x.startswith('fork2_') and '.' not in x, os.listdir(fdir)) l = sorted(l, key=lambda x: int(x.split('_')[1])) # sort by forkpoint def delete_chain(filename, reason): _logger.info(f"[blockchain] deleting chain {filename}: {reason}") os.unlink(os.path.join(fdir, filename)) def instantiate_chain(filename): __, forkpoint, prev_hash, first_hash = filename.split('_') forkpoint = int(forkpoint) prev_hash = (64-len(prev_hash)) * "0" + prev_hash # left-pad with zeroes first_hash = (64-len(first_hash)) * "0" + first_hash # forks below the max checkpoint are not allowed if forkpoint <= constants.net.max_checkpoint(): delete_chain(filename, "deleting fork below max checkpoint") return # find parent (sorting by forkpoint guarantees it's already instantiated) for parent in blockchains.values(): if parent.check_hash(forkpoint - 1, prev_hash): break else: delete_chain(filename, "cannot find parent for chain") return b = Blockchain(config=config, forkpoint=forkpoint, parent=parent, forkpoint_hash=first_hash, prev_hash=prev_hash) # consistency checks h = b.read_header(b.forkpoint) if first_hash != hash_header(h): delete_chain(filename, "incorrect first hash for chain") return if not b.parent.can_connect(h, check_height=False): delete_chain(filename, "cannot connect chain to parent") return chain_id = b.get_id() assert first_hash == chain_id, (first_hash, chain_id) blockchains[chain_id] = b for filename in l: instantiate_chain(filename) def get_best_chain() -> 'Blockchain': return blockchains[constants.net.GENESIS] # block hash -> chain work; up to and including that block _CHAINWORK_CACHE = { "0000000000000000000000000000000000000000000000000000000000000000": 0, # virtual block at height -1 } # type: Dict[str, int] class Blockchain(Logger): """ Manages blockchain headers and their verification """ def __init__(self, config: SimpleConfig, forkpoint: int, parent: Optional['Blockchain'], forkpoint_hash: str, prev_hash: Optional[str]): assert isinstance(forkpoint_hash, str) and len(forkpoint_hash) == 64, forkpoint_hash assert (prev_hash is None) or (isinstance(prev_hash, str) and len(prev_hash) == 64), prev_hash # assert (parent is None) == (forkpoint == 0) if 0 < forkpoint <= constants.net.max_checkpoint(): raise Exception(f"cannot fork below max checkpoint. forkpoint: {forkpoint}") Logger.__init__(self) self.config = config self.forkpoint = forkpoint # height of first header self.parent = parent self._forkpoint_hash = forkpoint_hash # blockhash at forkpoint. "first hash" self._prev_hash = prev_hash # blockhash immediately before forkpoint self.lock = threading.RLock() self.update_size() def with_lock(func): def func_wrapper(self, *args, **kwargs): with self.lock: return func(self, *args, **kwargs) return func_wrapper @property def checkpoints(self): return constants.net.CHECKPOINTS def get_max_child(self) -> Optional[int]: children = self.get_direct_children() return max([x.forkpoint for x in children]) if children else None def get_max_forkpoint(self) -> int: """Returns the max height where there is a fork related to this chain. """ mc = self.get_max_child() return mc if mc is not None else self.forkpoint def get_direct_children(self) -> Sequence['Blockchain']: with blockchains_lock: return list(filter(lambda y: y.parent==self, blockchains.values())) def get_parent_heights(self) -> Mapping['Blockchain', int]: """Returns map: (parent chain -> height of last common block)""" with blockchains_lock: result = {self: self.height()} chain = self while True: parent = chain.parent if parent is None: break result[parent] = chain.forkpoint - 1 chain = parent return result def get_height_of_last_common_block_with_chain(self, other_chain: 'Blockchain') -> int: last_common_block_height = 0 our_parents = self.get_parent_heights() their_parents = other_chain.get_parent_heights() for chain in our_parents: if chain in their_parents: h = min(our_parents[chain], their_parents[chain]) last_common_block_height = max(last_common_block_height, h) return last_common_block_height @with_lock def get_branch_size(self) -> int: return self.height() - self.get_max_forkpoint() + 1 def get_name(self) -> str: return self.get_hash(self.get_max_forkpoint()).lstrip('0')[0:10] def check_header(self, header: dict) -> bool: header_hash = hash_header(header) height = header.get('block_height') return self.check_hash(height, header_hash) def check_hash(self, height: int, header_hash: str) -> bool: """Returns whether the hash of the block at given height is the given hash. """ assert isinstance(header_hash, str) and len(header_hash) == 64, header_hash # hex try: return header_hash == self.get_hash(height) except Exception: return False def fork(parent, header: dict) -> 'Blockchain': if not parent.can_connect(header, check_height=False): raise Exception("forking header does not connect to parent chain") forkpoint = header.get('block_height') self = Blockchain(config=parent.config, forkpoint=forkpoint, parent=parent, forkpoint_hash=hash_header(header), prev_hash=parent.get_hash(forkpoint-1)) self.assert_headers_file_available(parent.path()) open(self.path(), 'w+').close() self.save_header(header) # put into global dict. note that in some cases # save_header might have already put it there but that's OK chain_id = self.get_id() with blockchains_lock: blockchains[chain_id] = self return self @with_lock def height(self) -> int: return self.forkpoint + self.size() - 1 @with_lock def size(self) -> int: return self._size @with_lock def update_size(self) -> None: p = self.path() self._size = os.path.getsize(p)//HEADER_SIZE if os.path.exists(p) else 0 @classmethod def verify_header(self, header: dict, prev_hash: str, target: int, bits: int, expected_header_hash: str=None) -> None: _hash = pow_hash_header(header) if expected_header_hash: _hash2 = hash_header(header) if expected_header_hash != _hash2: raise Exception("hash mismatches with expected: {} vs {}".format(expected_header_hash, _hash2)) if prev_hash != header.get('prev_block_hash'): raise Exception("prev hash mismatch: %s vs %s" % (prev_hash, header.get('prev_block_hash'))) if constants.net.TESTNET: return #if bits != header.get('bits'): # raise Exception("bits mismatch: %s vs %s" % (bits, header.get('bits'))) #if int('0x' + _hash, 16) > target: # raise Exception("insufficient proof of work: %s vs target %s" % (int('0x' + _hash, 16), target)) def verify_chunk(self, index: int, data: bytes) -> None: num = len(data) // HEADER_SIZE start_height = index * 2016 prev_hash = self.get_hash(start_height - 1) for i in range(num): height = start_height + i header = self.read_header(height - 1) #bits, target = self.get_target2(height - 1, header) try: expected_header_hash = self.get_hash(height) except MissingHeader: expected_header_hash = None raw_header = data[i*HEADER_SIZE : (i+1)*HEADER_SIZE] header = deserialize_header(raw_header, index*2016 + i) self.verify_header(header, prev_hash, 0, 0, expected_header_hash) prev_hash = hash_header(header) @with_lock def path(self): d = util.get_headers_dir(self.config) if self.parent is None: filename = 'blockchain_headers' else: assert self.forkpoint > 0, self.forkpoint prev_hash = self._prev_hash.lstrip('0') first_hash = self._forkpoint_hash.lstrip('0') basename = f'fork2_{self.forkpoint}_{prev_hash}_{first_hash}' filename = os.path.join('forks', basename) return os.path.join(d, filename) @with_lock def save_chunk(self, index: int, chunk: bytes): assert index >= 0, index chunk_within_checkpoint_region = index < len(self.checkpoints) # chunks in checkpoint region are the responsibility of the 'main chain' if chunk_within_checkpoint_region and self.parent is not None: main_chain = get_best_chain() main_chain.save_chunk(index, chunk) return delta_height = (index * 2016 - self.forkpoint) delta_bytes = delta_height * HEADER_SIZE # if this chunk contains our forkpoint, only save the part after forkpoint # (the part before is the responsibility of the parent) if delta_bytes < 0: chunk = chunk[-delta_bytes:] delta_bytes = 0 truncate = not chunk_within_checkpoint_region self.write(chunk, delta_bytes, truncate) self.swap_with_parent() def swap_with_parent(self) -> None: with self.lock, blockchains_lock: # do the swap; possibly multiple ones cnt = 0 while True: old_parent = self.parent if not self._swap_with_parent(): break # make sure we are making progress cnt += 1 if cnt > len(blockchains): raise Exception(f'swapping fork with parent too many times: {cnt}') # we might have become the parent of some of our former siblings for old_sibling in old_parent.get_direct_children(): if self.check_hash(old_sibling.forkpoint - 1, old_sibling._prev_hash): old_sibling.parent = self def _swap_with_parent(self) -> bool: """Check if this chain became stronger than its parent, and swap the underlying files if so. The Blockchain instances will keep 'containing' the same headers, but their ids change and so they will be stored in different files.""" if self.parent is None: return False if self.parent.get_chainwork() >= self.get_chainwork(): return False self.logger.info(f"swapping {self.forkpoint} {self.parent.forkpoint}") parent_branch_size = self.parent.height() - self.forkpoint + 1 forkpoint = self.forkpoint # type: Optional[int] parent = self.parent # type: Optional[Blockchain] child_old_id = self.get_id() parent_old_id = parent.get_id() # swap files # child takes parent's name # parent's new name will be something new (not child's old name) self.assert_headers_file_available(self.path()) child_old_name = self.path() with open(self.path(), 'rb') as f: my_data = f.read() self.assert_headers_file_available(parent.path()) assert forkpoint > parent.forkpoint, (f"forkpoint of parent chain ({parent.forkpoint}) " f"should be at lower height than children's ({forkpoint})") with open(parent.path(), 'rb') as f: f.seek((forkpoint - parent.forkpoint)*HEADER_SIZE) parent_data = f.read(parent_branch_size*HEADER_SIZE) self.write(parent_data, 0) parent.write(my_data, (forkpoint - parent.forkpoint)*HEADER_SIZE) # swap parameters self.parent, parent.parent = parent.parent, self # type: Optional[Blockchain], Optional[Blockchain] self.forkpoint, parent.forkpoint = parent.forkpoint, self.forkpoint self._forkpoint_hash, parent._forkpoint_hash = parent._forkpoint_hash, hash_raw_header(bh2u(parent_data[:HEADER_SIZE])) self._prev_hash, parent._prev_hash = parent._prev_hash, self._prev_hash # parent's new name os.replace(child_old_name, parent.path()) self.update_size() parent.update_size() # update pointers blockchains.pop(child_old_id, None) blockchains.pop(parent_old_id, None) blockchains[self.get_id()] = self blockchains[parent.get_id()] = parent return True def get_id(self) -> str: return self._forkpoint_hash def assert_headers_file_available(self, path): if os.path.exists(path): return elif not os.path.exists(util.get_headers_dir(self.config)): raise FileNotFoundError('Electrum headers_dir does not exist. Was it deleted while running?') else: raise FileNotFoundError('Cannot find headers file but headers_dir is there. Should be at {}'.format(path)) @with_lock def write(self, data: bytes, offset: int, truncate: bool=True) -> None: filename = self.path() self.assert_headers_file_available(filename) with open(filename, 'rb+') as f: if truncate and offset != self._size * HEADER_SIZE: f.seek(offset) f.truncate() f.seek(offset) f.write(data) f.flush() os.fsync(f.fileno()) self.update_size() @with_lock def save_header(self, header: dict) -> None: delta = header.get('block_height') - self.forkpoint data = bfh(serialize_header(header)) # headers are only _appended_ to the end: assert delta == self.size(), (delta, self.size()) assert len(data) == HEADER_SIZE self.write(data, delta*HEADER_SIZE) self.swap_with_parent() @with_lock def read_header(self, height: int) -> Optional[dict]: if height < 0: return if height < self.forkpoint: return self.parent.read_header(height) if height > self.height(): return delta = height - self.forkpoint name = self.path() self.assert_headers_file_available(name) with open(name, 'rb') as f: f.seek(delta * HEADER_SIZE) h = f.read(HEADER_SIZE) if len(h) < HEADER_SIZE: raise Exception('Expected to read a full header. This was only {} bytes'.format(len(h))) if h == bytes([0])*HEADER_SIZE: return None return deserialize_header(h, height) def header_at_tip(self) -> Optional[dict]: """Return latest header.""" height = self.height() return self.read_header(height) def get_hash(self, height: int) -> str: def is_height_checkpoint(): within_cp_range = height <= constants.net.max_checkpoint() at_chunk_boundary = (height+1) % 2016 == 0 return within_cp_range and at_chunk_boundary if height == -1: return '0000000000000000000000000000000000000000000000000000000000000000' elif height == 0: return constants.net.GENESIS elif is_height_checkpoint(): index = height // 2016 h, t = self.checkpoints[index] return h else: header = self.read_header(height) if header is None: raise MissingHeader(height) return hash_header(header) def get_target(self, index: int) -> int: # compute target from chunk x, used in chunk x+1 if constants.net.TESTNET: return 0 if index == -1: return MAX_TARGET if index < len(self.checkpoints): h, t = self.checkpoints[index] return t # new target first = self.read_header(index * 2016) last = self.read_header(index * 2016 + 2015) if not first or not last: raise MissingHeader() bits = last.get('bits') target = self.bits_to_target(bits) nActualTimespan = last.get('timestamp') - first.get('timestamp') nTargetTimespan = 150 nModulatedTimespan = nTargetTimespan - (nActualTimespan - nTargetTimespan) / 8 nMinTimespan = nTargetTimespan - (nTargetTimespan / 8) nMaxTimespan = nTargetTimespan + (nTargetTimespan / 2) if nModulatedTimespan < nMinTimespan: nModulatedTimespan = nMinTimespan elif nModulatedTimespan > nMaxTimespan: nModulatedTimespan = nMaxTimespan bnOld = ArithUint256.SetCompact(bits) bnNew = bnOld * nModulatedTimespan # this doesn't work if it is nTargetTimespan even though that # is what it looks like it should be based on reading the code # in lbry.cpp bnNew /= nModulatedTimespan if bnNew > MAX_TARGET: bnNew = ArithUint256(MAX_TARGET) return bnNew.compact(), bnNew._value def get_target2(self, index, last, chain='main'): if index == -1: return GENESIS_BITS, MAX_TARGET if index == 0: return GENESIS_BITS, MAX_TARGET first = self.read_header(index-1) assert last is not None, "Last shouldn't be none" # bits to target bits = last.get('bits') # print_error("Last bits: ", bits) self.check_bits(bits) # new target nActualTimespan = last.get('timestamp') - first.get('timestamp') nTargetTimespan = N_TARGET_TIMESPAN nModulatedTimespan = nTargetTimespan - (nActualTimespan - nTargetTimespan) / 8 nMinTimespan = nTargetTimespan - (nTargetTimespan / 8) nMaxTimespan = nTargetTimespan + (nTargetTimespan / 2) if nModulatedTimespan < nMinTimespan: nModulatedTimespan = nMinTimespan elif nModulatedTimespan > nMaxTimespan: nModulatedTimespan = nMaxTimespan bnOld = ArithUint256.SetCompact(bits) bnNew = bnOld * nModulatedTimespan # this doesn't work if it is nTargetTimespan even though that # is what it looks like it should be based on reading the code # in lbry.cpp bnNew /= nModulatedTimespan if bnNew > MAX_TARGET: bnNew = ArithUint256(MAX_TARGET) return bnNew.compact, bnNew._value def check_bits(self, bits): bitsN = (bits >> 24) & 0xff assert 0x03 <= bitsN <= 0x1f, \ "First part of bits should be in [0x03, 0x1d], but it was {}".format(hex(bitsN)) bitsBase = bits & 0xffffff assert 0x8000 <= bitsBase <= 0x7fffff, \ "Second part of bits should be in [0x8000, 0x7fffff] but it was {}".format(bitsBase) @classmethod def bits_to_target(cls, bits: int) -> int: bitsN = (bits >> 24) & 0xff if not (0x03 <= bitsN <= 0x1f): raise Exception("First part of bits should be in [0x03, 0x1d]") bitsBase = bits & 0xffffff if not (0x8000 <= bitsBase <= 0x7fffff): raise Exception("Second part of bits should be in [0x8000, 0x7fffff]") return bitsBase << (8 * (bitsN-3)) @classmethod def target_to_bits(cls, target: int) -> int: c = ("%064x" % target)[2:] while c[:2] == '00' and len(c) > 6: c = c[2:] bitsN, bitsBase = len(c) // 2, int.from_bytes(bfh(c[:6]), byteorder='big') if bitsBase >= 0x800000: bitsN += 1 bitsBase >>= 8 return bitsN << 24 | bitsBase def chainwork_of_header_at_height(self, height: int) -> int: """work done by single header at given height""" chunk_idx = height // 2016 - 1 target = self.get_target(chunk_idx) work = ((2 ** 256 - target - 1) // (target + 1)) + 1 return work @with_lock def get_chainwork(self, height=None) -> int: if height is None: height = max(0, self.height()) if constants.net.TESTNET: # On testnet/regtest, difficulty works somewhat different. # It's out of scope to properly implement that. return height last_retarget = height // 2016 * 2016 - 1 cached_height = last_retarget while _CHAINWORK_CACHE.get(self.get_hash(cached_height)) is None: if cached_height <= -1: break cached_height -= 2016 assert cached_height >= -1, cached_height running_total = _CHAINWORK_CACHE[self.get_hash(cached_height)] while cached_height < last_retarget: cached_height += 2016 work_in_single_header = self.chainwork_of_header_at_height(cached_height) work_in_chunk = 2016 * work_in_single_header running_total += work_in_chunk _CHAINWORK_CACHE[self.get_hash(cached_height)] = running_total cached_height += 2016 work_in_single_header = self.chainwork_of_header_at_height(cached_height) work_in_last_partial_chunk = (height % 2016 + 1) * work_in_single_header return running_total + work_in_last_partial_chunk def can_connect(self, header: dict, check_height: bool=True) -> bool: if header is None: return False height = header['block_height'] if check_height and self.height() != height - 1: print("cannot connect at height", height) return False if height == 0: return hash_header(header) == constants.net.GENESIS try: prev_hash = self.get_hash(height - 1) except: return False if prev_hash != header.get('prev_block_hash'): return False try: bits, target = self.get_target2(height, header) except MissingHeader: return False try: self.verify_header(header, prev_hash, target, bits) except BaseException as e: print(e) return False return True def connect_chunk(self, idx: int, hexdata: str) -> bool: assert idx >= 0, idx try: data = bfh(hexdata) self.verify_chunk(idx, data) self.save_chunk(idx, data) return True except BaseException as e: self.logger.info(f'verify_chunk idx {idx} failed: {repr(e)}') return False def get_checkpoints(self): # for each chunk, store the hash of the last block and the target after the chunk cp = [] n = self.height() // 2016 for index in range(n): h = self.get_hash((index+1) * 2016 -1) target = self.get_target(index) cp.append((h, target)) return cp def check_header(header: dict) -> Optional[Blockchain]: if type(header) is not dict: return None with blockchains_lock: chains = list(blockchains.values()) for b in chains: if b.check_header(header): return b return None def can_connect(header: dict) -> Optional[Blockchain]: with blockchains_lock: chains = list(blockchains.values()) for b in chains: if b.can_connect(header): return b return None class ArithUint256: # https://github.com/bitcoin/bitcoin/blob/master/src/arith_uint256.cpp __slots__ = '_value', '_compact' def __init__(self, value: int) -> None: self._value = value self._compact: Optional[int] = None @classmethod def SetCompact(cls, nCompact): return (ArithUint256.from_compact(nCompact)) @classmethod def from_compact(cls, compact) -> 'ArithUint256': size = compact >> 24 word = compact & 0x007fffff if size <= 3: return cls(word >> 8 * (3 - size)) else: return cls(word << 8 * (size - 3)) @property def value(self) -> int: return self._value @property def compact(self) -> int: if self._compact is None: self._compact = self._calculate_compact() return self._compact @property def negative(self) -> int: return self._calculate_compact(negative=True) @property def bits(self) -> int: """ Returns the position of the highest bit set plus one. """ bits = bin(self._value)[2:] for i, d in enumerate(bits): if d: return (len(bits) - i) + 1 return 0 @property def low64(self) -> int: return self._value & 0xffffffffffffffff def _calculate_compact(self, negative=False) -> int: size = (self.bits + 7) // 8 if size <= 3: compact = self.low64 << 8 * (3 - size) else: compact = ArithUint256(self._value >> 8 * (size - 3)).low64 # The 0x00800000 bit denotes the sign. # Thus, if it is already set, divide the mantissa by 256 and increase the exponent. if compact & 0x00800000: compact >>= 8 size += 1 assert (compact & ~0x007fffff) == 0 assert size < 256 compact |= size << 24 if negative and compact & 0x007fffff: compact |= 0x00800000 return compact def __mul__(self, x): # Take the mod because we are limited to an unsigned 256 bit number return ArithUint256((self._value * x) % 2 ** 256) def __truediv__(self, x): return ArithUint256(int(self._value / x)) def __gt__(self, other): return self._value > other def __lt__(self, other): return self._value < other