mirror of
https://github.com/LBRYFoundation/lbry-sdk.git
synced 2025-09-03 12:30:13 +00:00
make header component use torba headers
This commit is contained in:
parent
2481305a1a
commit
d0607b6fec
2 changed files with 29 additions and 45 deletions
|
@ -4,7 +4,6 @@ import logging
|
||||||
import math
|
import math
|
||||||
import binascii
|
import binascii
|
||||||
import typing
|
import typing
|
||||||
from hashlib import sha256
|
|
||||||
import base58
|
import base58
|
||||||
|
|
||||||
from aioupnp import __version__ as aioupnp_version
|
from aioupnp import __version__ as aioupnp_version
|
||||||
|
@ -12,7 +11,6 @@ from aioupnp.upnp import UPnP
|
||||||
from aioupnp.fault import UPnPError
|
from aioupnp.fault import UPnPError
|
||||||
|
|
||||||
from lbry import utils
|
from lbry import utils
|
||||||
from lbry.conf import HEADERS_FILE_SHA256_CHECKSUM
|
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
from lbry.dht.blob_announcer import BlobAnnouncer
|
from lbry.dht.blob_announcer import BlobAnnouncer
|
||||||
from lbry.blob.blob_manager import BlobManager
|
from lbry.blob.blob_manager import BlobManager
|
||||||
|
@ -22,7 +20,7 @@ from lbry.extras.daemon.Component import Component
|
||||||
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
from lbry.wallet import LbryWalletManager
|
from lbry.wallet import LbryWalletManager
|
||||||
|
from lbry.wallet.header import Headers
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -110,6 +108,7 @@ class HeadersComponent(Component):
|
||||||
self.headers_dir = os.path.join(self.conf.wallet_dir, 'lbc_mainnet')
|
self.headers_dir = os.path.join(self.conf.wallet_dir, 'lbc_mainnet')
|
||||||
self.headers_file = os.path.join(self.headers_dir, 'headers')
|
self.headers_file = os.path.join(self.headers_dir, 'headers')
|
||||||
self.old_file = os.path.join(self.conf.wallet_dir, 'blockchain_headers')
|
self.old_file = os.path.join(self.conf.wallet_dir, 'blockchain_headers')
|
||||||
|
self.headers = Headers(self.headers_file)
|
||||||
self._downloading_headers = None
|
self._downloading_headers = None
|
||||||
self._headers_progress_percent = 0
|
self._headers_progress_percent = 0
|
||||||
|
|
||||||
|
@ -137,7 +136,7 @@ class HeadersComponent(Component):
|
||||||
} if progress < 100 else {}
|
} if progress < 100 else {}
|
||||||
|
|
||||||
async def fetch_headers_from_s3(self):
|
async def fetch_headers_from_s3(self):
|
||||||
local_header_size = self.local_header_file_size()
|
local_header_size = self.headers.bytes_size
|
||||||
resume_header = {"Range": f"bytes={local_header_size}-"}
|
resume_header = {"Range": f"bytes={local_header_size}-"}
|
||||||
async with utils.aiohttp_request('get', HEADERS_URL, headers=resume_header) as response:
|
async with utils.aiohttp_request('get', HEADERS_URL, headers=resume_header) as response:
|
||||||
if response.status == 406 or response.content_length < HEADER_SIZE: # our file is bigger
|
if response.status == 406 or response.content_length < HEADER_SIZE: # our file is bigger
|
||||||
|
@ -147,24 +146,21 @@ class HeadersComponent(Component):
|
||||||
log.warning("s3 appears to have corrupted header")
|
log.warning("s3 appears to have corrupted header")
|
||||||
return
|
return
|
||||||
final_size_after_download = response.content_length + local_header_size
|
final_size_after_download = response.content_length + local_header_size
|
||||||
write_mode = "wb"
|
|
||||||
if local_header_size > 0:
|
if local_header_size > 0:
|
||||||
log.info("Resuming download of %i bytes from s3", response.content_length)
|
log.info("Resuming download of %i bytes from s3", response.content_length)
|
||||||
write_mode = "a+b"
|
buffer, header_size = b'', self.headers.header_size
|
||||||
with open(self.headers_file, write_mode) as fd:
|
async for chunk in response.content.iter_any():
|
||||||
while True:
|
chunk = buffer + chunk
|
||||||
chunk = await response.content.read(512)
|
remaining = len(chunk) % header_size
|
||||||
if not chunk:
|
chunk, buffer = chunk[:-remaining], bytes(chunk[-remaining:])
|
||||||
break
|
if not chunk:
|
||||||
fd.write(chunk)
|
continue
|
||||||
self._headers_progress_percent = math.ceil(
|
if not await self.headers.connect(len(self.headers), chunk):
|
||||||
float(fd.tell()) / float(final_size_after_download) * 100
|
log.warning("Error connecting downloaded headers from at %s.", self.headers.height)
|
||||||
)
|
return
|
||||||
log.info("fetched headers from s3, now verifying integrity after download.")
|
self._headers_progress_percent = math.ceil(
|
||||||
self._check_header_file_integrity()
|
float(self.headers.bytes_size) / float(final_size_after_download) * 100
|
||||||
|
)
|
||||||
def local_header_file_height(self):
|
|
||||||
return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0)
|
|
||||||
|
|
||||||
def local_header_file_size(self):
|
def local_header_file_size(self):
|
||||||
if os.path.isfile(self.headers_file):
|
if os.path.isfile(self.headers_file):
|
||||||
|
@ -181,46 +177,29 @@ class HeadersComponent(Component):
|
||||||
async def should_download_headers_from_s3(self):
|
async def should_download_headers_from_s3(self):
|
||||||
if self.conf.blockchain_name != "lbrycrd_main":
|
if self.conf.blockchain_name != "lbrycrd_main":
|
||||||
return False
|
return False
|
||||||
self._check_header_file_integrity()
|
|
||||||
s3_headers_depth = self.conf.s3_headers_depth
|
s3_headers_depth = self.conf.s3_headers_depth
|
||||||
if not s3_headers_depth:
|
if not s3_headers_depth:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
local_height = self.local_header_file_height()
|
local_height = self.headers.height
|
||||||
remote_height = await self.get_download_height()
|
try:
|
||||||
|
remote_height = await self.get_download_height()
|
||||||
|
except OSError:
|
||||||
|
log.warning("Failed to download headers using https.")
|
||||||
|
return False
|
||||||
log.info("remote height: %i, local height: %i", remote_height, local_height)
|
log.info("remote height: %i, local height: %i", remote_height, local_height)
|
||||||
if remote_height > (local_height + s3_headers_depth):
|
if remote_height > (local_height + s3_headers_depth):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _check_header_file_integrity(self):
|
|
||||||
# TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity
|
|
||||||
if self.conf.blockchain_name != "lbrycrd_main":
|
|
||||||
return
|
|
||||||
hashsum = sha256()
|
|
||||||
checksum_height, checksum = HEADERS_FILE_SHA256_CHECKSUM
|
|
||||||
checksum_length_in_bytes = checksum_height * HEADER_SIZE
|
|
||||||
if self.local_header_file_size() < checksum_length_in_bytes:
|
|
||||||
return
|
|
||||||
with open(self.headers_file, "rb") as headers_file:
|
|
||||||
hashsum.update(headers_file.read(checksum_length_in_bytes))
|
|
||||||
current_checksum = hashsum.hexdigest()
|
|
||||||
if current_checksum != checksum:
|
|
||||||
msg = f"Expected checksum {checksum}, got {current_checksum}"
|
|
||||||
log.warning("Headers file corrupted, checksum mismatch. " + msg)
|
|
||||||
log.warning("Deleting header file so it can be downloaded again.")
|
|
||||||
os.unlink(self.headers_file)
|
|
||||||
elif (self.local_header_file_size() % HEADER_SIZE) != 0:
|
|
||||||
log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.")
|
|
||||||
with open(self.headers_file, "rb+") as headers_file:
|
|
||||||
headers_file.truncate(checksum_length_in_bytes)
|
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
if not os.path.exists(self.headers_dir):
|
if not os.path.exists(self.headers_dir):
|
||||||
os.mkdir(self.headers_dir)
|
os.mkdir(self.headers_dir)
|
||||||
if os.path.exists(self.old_file):
|
if os.path.exists(self.old_file):
|
||||||
log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file)
|
log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file)
|
||||||
os.rename(self.old_file, self.headers_file)
|
os.rename(self.old_file, self.headers_file)
|
||||||
|
await self.headers.open()
|
||||||
|
self.headers.repair()
|
||||||
self._downloading_headers = await self.should_download_headers_from_s3()
|
self._downloading_headers = await self.should_download_headers_from_s3()
|
||||||
if self._downloading_headers:
|
if self._downloading_headers:
|
||||||
try:
|
try:
|
||||||
|
@ -229,6 +208,7 @@ class HeadersComponent(Component):
|
||||||
log.error("failed to fetch headers from s3: %s", err)
|
log.error("failed to fetch headers from s3: %s", err)
|
||||||
finally:
|
finally:
|
||||||
self._downloading_headers = False
|
self._downloading_headers = False
|
||||||
|
await self.headers.close()
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -82,6 +82,10 @@ class BaseHeaders:
|
||||||
def height(self) -> int:
|
def height(self) -> int:
|
||||||
return len(self)-1
|
return len(self)-1
|
||||||
|
|
||||||
|
@property
|
||||||
|
def bytes_size(self):
|
||||||
|
return len(self) * self.header_size
|
||||||
|
|
||||||
def hash(self, height=None) -> bytes:
|
def hash(self, height=None) -> bytes:
|
||||||
return self.hash_header(
|
return self.hash_header(
|
||||||
self.get_raw_header(height if height is not None else self.height)
|
self.get_raw_header(height if height is not None else self.height)
|
||||||
|
|
Loading…
Add table
Reference in a new issue