100k filters

This commit is contained in:
Lex Berezhny 2021-01-13 00:44:23 -05:00
parent 970d0ba6e1
commit 5562e84722
7 changed files with 96 additions and 68 deletions

View file

@ -1,11 +1,11 @@
from typing import Dict from typing import Dict
def split_range_into_10k_batches(start, end): def split_range_into_batches(start, end, batch_size=100_000):
batch = [start, end] batch = [start, end]
batches = [batch] batches = [batch]
for block in range(start, end+1): for block in range(start, end+1):
if 0 < block != batch[0] and block % 10_000 == 0: if 0 < block != batch[0] and block % batch_size == 0:
batch = [block, block] batch = [block, block]
batches.append(batch) batches.append(batch)
else: else:
@ -50,6 +50,7 @@ class FilterBuilder:
self.start = start self.start = start
self.end = end self.end = end
self.group_filters = [ self.group_filters = [
GroupFilter(start, end, 5),
GroupFilter(start, end, 4), GroupFilter(start, end, 4),
GroupFilter(start, end, 3), GroupFilter(start, end, 3),
GroupFilter(start, end, 2), GroupFilter(start, end, 2),

View file

@ -16,7 +16,7 @@ from lbry.error import LbrycrdEventSubscriptionError
from . import blocks as block_phase, claims as claim_phase, supports as support_phase from . import blocks as block_phase, claims as claim_phase, supports as support_phase
from .context import uninitialize from .context import uninitialize
from .filter_builder import split_range_into_10k_batches from .filter_builder import split_range_into_batches
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -187,7 +187,7 @@ class BlockchainSync(Sync):
else: else:
blocks = await self.db.run(block_phase.get_block_range_without_filters) blocks = await self.db.run(block_phase.get_block_range_without_filters)
if blocks != (-1, -1): if blocks != (-1, -1):
batches = split_range_into_10k_batches(*blocks) batches = split_range_into_batches(*blocks)
p.step() p.step()
else: else:
p.step() p.step()

View file

@ -46,7 +46,7 @@ def get_filters(start_height, end_height=None, granularity=0):
.order_by(TXFilter.c.height) .order_by(TXFilter.c.height)
) )
else: else:
factor = granularity if granularity <= 4 else log10(granularity) factor = granularity if granularity < 100 else log10(granularity)
if end_height is None: if end_height is None:
height_condition = (BlockFilter.c.height == start_height) height_condition = (BlockFilter.c.height == start_height)
elif end_height == -1: elif end_height == -1:
@ -64,8 +64,11 @@ def get_filters(start_height, end_height=None, granularity=0):
def get_minimal_required_filter_ranges(height) -> Dict[int, Tuple[int, int]]: def get_minimal_required_filter_ranges(height) -> Dict[int, Tuple[int, int]]:
minimal = {} minimal = {}
if height >= 100_000:
minimal[5] = (0, ((height // 100_000)-1) * 100_000)
if height >= 10_000: if height >= 10_000:
minimal[4] = (0, ((height // 10_000)-1) * 10_000) start = height - height % 100_000
minimal[4] = (start, start + (((height - start) // 10_000) - 1) * 10_000)
if height >= 1_000: if height >= 1_000:
start = height - height % 10_000 start = height - height % 10_000
minimal[3] = (start, start+(((height-start) // 1_000)-1) * 1_000) minimal[3] = (start, start+(((height-start) // 1_000)-1) * 1_000)
@ -92,6 +95,9 @@ def get_maximum_known_filters() -> Dict[str, Optional[int]]:
select(func.max(BlockFilter.c.height)) select(func.max(BlockFilter.c.height))
.where(BlockFilter.c.factor == 4) .where(BlockFilter.c.factor == 4)
.scalar_subquery().label('4'), .scalar_subquery().label('4'),
select(func.max(BlockFilter.c.height))
.where(BlockFilter.c.factor == 5)
.scalar_subquery().label('5'),
) )
return context().fetchone(query) return context().fetchone(query)
@ -101,7 +107,7 @@ def get_missing_required_filters(height) -> Set[Tuple[int, int, int]]:
missing_filters = set() missing_filters = set()
for granularity, (start, end) in get_minimal_required_filter_ranges(height).items(): for granularity, (start, end) in get_minimal_required_filter_ranges(height).items():
known_height = known_filters.get(str(granularity)) known_height = known_filters.get(str(granularity))
if known_height is not None and known_height > start: if known_height is not None and known_height >= start:
if granularity == 1: if granularity == 1:
adjusted_height = known_height + 1 adjusted_height = known_height + 1
else: else:

View file

@ -193,6 +193,8 @@ class FilterManager:
await self.download_initial_filters(best_height) await self.download_initial_filters(best_height)
print('generating addresses...') print('generating addresses...')
await self.generate_addresses(best_height, wallets) await self.generate_addresses(best_height, wallets)
print("downloading level 3 filters...")
await self.download_sub_filters(4, wallets)
print("downloading level 2 filters...") print("downloading level 2 filters...")
await self.download_sub_filters(3, wallets) await self.download_sub_filters(3, wallets)
print("downloading level 1 filters...") print("downloading level 1 filters...")

View file

@ -12,7 +12,7 @@ class LightClientTests(IntegrationTestCase):
async def asyncSetUp(self): async def asyncSetUp(self):
await super().asyncSetUp() await super().asyncSetUp()
await self.chain.generate(200) await self.chain.generate(200)
self.full_node_daemon = await self.make_full_node_daemon() self.full_node_daemon = await self.make_full_node_daemon(workers=2)
self.full_node: FullNode = self.full_node_daemon.service self.full_node: FullNode = self.full_node_daemon.service
self.light_client_daemon = await self.make_light_client_daemon(self.full_node_daemon, start=False) self.light_client_daemon = await self.make_light_client_daemon(self.full_node_daemon, start=False)
self.light_client: LightClient = self.light_client_daemon.service self.light_client: LightClient = self.light_client_daemon.service
@ -25,6 +25,7 @@ class LightClientTests(IntegrationTestCase):
await self.light_client.wallets.open() await self.light_client.wallets.open()
await self.light_client.client.start_event_streams() await self.light_client.client.start_event_streams()
self.db = self.light_client.db self.db = self.light_client.db
self.api = self.light_client_daemon.api
self.sync = self.light_client.sync self.sync = self.light_client.sync
self.client = self.light_client.client self.client = self.light_client.client
self.account = self.light_client.wallets.default.accounts.default self.account = self.light_client.wallets.default.accounts.default
@ -44,3 +45,9 @@ class LightClientTests(IntegrationTestCase):
await self.assertBalance(self.account, '0.0') await self.assertBalance(self.account, '0.0')
await self.sync.on_synced.first await self.sync.on_synced.first
await self.assertBalance(self.account, '5.0') await self.assertBalance(self.account, '5.0')
await self.api.channel_create('@foo', '1.0')
await self.chain.generate(1)
await self.sync.on_synced.first
channels = await self.api.channel_list()
self.assertEqual(len(channels), 1)

View file

@ -1,12 +1,13 @@
from unittest import TestCase from unittest import TestCase
from lbry.blockchain.sync.filter_builder import ( from lbry.blockchain.sync.filter_builder import (
FilterBuilder as FB, GroupFilter as GF, split_range_into_10k_batches as split FilterBuilder as FB, GroupFilter as GF, split_range_into_batches
) )
class TestFilterGenerationComponents(TestCase): class TestFilterGenerationComponents(TestCase):
def test_split_range_into_10k_batches(self): def test_split_range_into_10k_batches(self):
def split(a, b): return split_range_into_batches(a, b, 10_000)
# single block (same start-end) # single block (same start-end)
self.assertEqual(split(901_123, 901_123), [[901_123, 901_123]]) self.assertEqual(split(901_123, 901_123), [[901_123, 901_123]])
# spans a 10k split # spans a 10k split
@ -70,37 +71,42 @@ class TestFilterGenerationComponents(TestCase):
self.assertEqual(FB(819_913, 819_999).query_heights, (810_000, 819_999)) self.assertEqual(FB(819_913, 819_999).query_heights, (810_000, 819_999))
def test_filter_builder_add(self): def test_filter_builder_add(self):
fb = FB(818_813, 819_999) fb = FB(798_813, 809_999)
self.assertEqual(fb.query_heights, (810_000, 819_999)) self.assertEqual(fb.query_heights, (700_000, 809_999))
self.assertEqual(fb.group_filters[0].coverage, [810_000]) self.assertEqual(fb.group_filters[0].coverage, [700_000])
self.assertEqual(fb.group_filters[1].coverage, [818_000, 819_000]) self.assertEqual(fb.group_filters[1].coverage, [790_000, 800_000])
self.assertEqual(fb.group_filters[2].coverage, [ self.assertEqual(fb.group_filters[2].coverage, list(range(798_000, 809_000+1, 1_000)))
818_800, 818_900, 819_000, 819_100, 819_200, 819_300, self.assertEqual(fb.group_filters[3].coverage, list(range(798_800, 809_900+1, 100)))
819_400, 819_500, 819_600, 819_700, 819_800, 819_900 fb.add(b'beef0', 787_111, ['a'])
]) fb.add(b'beef1', 798_222, ['b'])
fb.add(b'beef0', 810_000, ['a']) fb.add(b'beef2', 798_812, ['c'])
fb.add(b'beef1', 815_001, ['b']) fb.add(b'beef3', 798_813, ['d'])
fb.add(b'beef2', 818_412, ['c']) fb.add(b'beef4', 798_814, ['e'])
fb.add(b'beef3', 818_812, ['d']) fb.add(b'beef5', 809_000, ['f'])
fb.add(b'beef4', 818_813, ['e']) fb.add(b'beef6', 809_999, ['g'])
fb.add(b'beef5', 819_000, ['f']) fb.add(b'beef7', 809_999, ['h'])
fb.add(b'beef6', 819_999, ['g'])
fb.add(b'beef7', 819_999, ['h'])
fb.add(b'beef8', 820_000, ['i']) fb.add(b'beef8', 820_000, ['i'])
self.assertEqual(fb.group_filters[0].groups, { self.assertEqual(fb.group_filters[0].groups, {
810_000: {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'} 700_000: {'a', 'b', 'c', 'd', 'e'}
}) })
self.assertEqual(fb.group_filters[1].groups, { self.assertEqual(fb.group_filters[1].groups, {
818_000: {'c', 'd', 'e'}, 790_000: {'b', 'c', 'd', 'e'},
819_000: {'f', 'g', 'h'} 800_000: {'f', 'g', 'h'}
}) })
self.assertEqual(fb.group_filters[2].groups[818_800], {'d', 'e'}) self.assertEqual(fb.group_filters[2].groups, {
self.assertEqual(fb.group_filters[2].groups[819_000], {'f'}) 798_000: {'b', 'c', 'd', 'e'}, 799_000: set(),
self.assertEqual(fb.group_filters[2].groups[819_900], {'g', 'h'}) 800_000: set(), 801_000: set(), 802_000: set(), 803_000: set(), 804_000: set(),
self.assertEqual(fb.block_filters, {818813: {'e'}, 819000: {'f'}, 819999: {'g', 'h'}}) 805_000: set(), 806_000: set(), 807_000: set(), 808_000: set(),
809_000: {'f', 'g', 'h'}
})
self.assertEqual(fb.group_filters[3].groups[798_800], {'c', 'd', 'e'})
self.assertEqual(fb.group_filters[3].groups[809_000], {'f'})
self.assertEqual(fb.group_filters[3].groups[809_900], {'g', 'h'})
self.assertEqual(fb.block_filters, {798813: {'d'}, 798814: {'e'}, 809000: {'f'}, 809999: {'h', 'g'}})
self.assertEqual(fb.tx_filters, [ self.assertEqual(fb.tx_filters, [
(b'beef4', 818813, ['e']), (b'beef3', 798813, ['d']),
(b'beef5', 819000, ['f']), (b'beef4', 798814, ['e']),
(b'beef6', 819999, ['g']), (b'beef5', 809000, ['f']),
(b'beef7', 819999, ['h']) (b'beef6', 809999, ['g']),
(b'beef7', 809999, ['h'])
]) ])

View file

@ -17,49 +17,55 @@ class TestMissingRequiredFiltersCalculation(UnitDBTestCase):
self.assertEqual(q.get_missing_required_filters(100), {(2, 0, 0)}) self.assertEqual(q.get_missing_required_filters(100), {(2, 0, 0)})
self.assertEqual(q.get_missing_required_filters(199), {(2, 0, 0), (1, 100, 199)}) self.assertEqual(q.get_missing_required_filters(199), {(2, 0, 0), (1, 100, 199)})
self.assertEqual(q.get_missing_required_filters(201), {(2, 0, 100), (1, 200, 201)}) self.assertEqual(q.get_missing_required_filters(201), {(2, 0, 100), (1, 200, 201)})
# all filters missing # all filters missing
self.assertEqual(q.get_missing_required_filters(134_567), { self.assertEqual(q.get_missing_required_filters(234_567), {
(4, 0, 120_000), (5, 0, 100_000),
(3, 130_000, 133_000), (4, 200_000, 220_000),
(2, 134_000, 134_400), (3, 230_000, 233_000),
(1, 134_500, 134_567) (2, 234_000, 234_400),
(1, 234_500, 234_567)
}) })
q.insert_block_filters([(110_000, 4, b'beef')]) q.insert_block_filters([(0, 5, b'beef')])
q.insert_block_filters([(129_000, 3, b'beef')]) q.insert_block_filters([(190_000, 4, b'beef')])
q.insert_block_filters([(133_900, 2, b'beef')]) q.insert_block_filters([(229_000, 3, b'beef')])
q.insert_block_filters([(134_499, 1, b'beef')]) q.insert_block_filters([(233_900, 2, b'beef')])
# we we have some filters, but not recent enough (all except 10k are adjusted) q.insert_block_filters([(234_499, 1, b'beef')])
self.assertEqual(q.get_missing_required_filters(134_567), { # we have some old filters but none useable as initial required (except one 100k filter)
(4, 120_000, 120_000), # 0 -> 120_000 self.assertEqual(q.get_missing_required_filters(234_567), {
(3, 130_000, 133_000), (5, 100_000, 100_000),
(2, 134_000, 134_400), (4, 200_000, 220_000),
(1, 134_500, 134_567) (3, 230_000, 233_000),
(2, 234_000, 234_400),
(1, 234_500, 234_567)
}) })
q.insert_block_filters([(132_000, 3, b'beef')]) q.insert_block_filters([(100_000, 5, b'beef')])
q.insert_block_filters([(134_300, 2, b'beef')]) q.insert_block_filters([(210_000, 4, b'beef')])
q.insert_block_filters([(134_550, 1, b'beef')]) q.insert_block_filters([(232_000, 3, b'beef')])
# all filters get adjusted because we have recent of each q.insert_block_filters([(234_300, 2, b'beef')])
self.assertEqual(q.get_missing_required_filters(134_567), { q.insert_block_filters([(234_550, 1, b'beef')])
(4, 120_000, 120_000), # 0 -> 120_000 # we have some useable initial filters, but not all
(3, 133_000, 133_000), # 130_000 -> 133_000 self.assertEqual(q.get_missing_required_filters(234_567), {
(2, 134_400, 134_400), # 134_000 -> 134_400 (4, 220_000, 220_000),
(1, 134_551, 134_567) # 134_500 -> 134_551 (3, 233_000, 233_000),
(2, 234_400, 234_400),
(1, 234_551, 234_567)
}) })
q.insert_block_filters([(120_000, 4, b'beef')]) q.insert_block_filters([(220_000, 4, b'beef')])
q.insert_block_filters([(133_000, 3, b'beef')]) q.insert_block_filters([(233_000, 3, b'beef')])
q.insert_block_filters([(134_400, 2, b'beef')]) q.insert_block_filters([(234_400, 2, b'beef')])
q.insert_block_filters([(134_566, 1, b'beef')]) q.insert_block_filters([(234_566, 1, b'beef')])
# we have latest filters for all except latest single block # we have latest filters for all except latest single block
self.assertEqual(q.get_missing_required_filters(134_567), { self.assertEqual(q.get_missing_required_filters(234_567), {
(1, 134_567, 134_567) # 134_551 -> 134_567 (1, 234_567, 234_567)
}) })
q.insert_block_filters([(134_567, 1, b'beef')]) q.insert_block_filters([(234_567, 1, b'beef')])
# we have all latest filters # we have all latest filters
self.assertEqual(q.get_missing_required_filters(134_567), set()) self.assertEqual(q.get_missing_required_filters(234_567), set())
class TestAddressGenerationAndTXSync(UnitDBTestCase): class TestAddressGenerationAndTXSync(UnitDBTestCase):