gotta fetch 'em all

This commit is contained in:
Jack Robison 2019-09-17 10:26:38 -04:00
parent e6c549c457
commit 46b45d0387
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 65 additions and 60 deletions

View file

@ -101,6 +101,9 @@ def _batched_select(transaction, query, parameters, batch_size=900):
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
files = [] files = []
signed_claims = {} signed_claims = {}
stream_hashes = tuple(
stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file").fetchall()
)
for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, _, for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, _,
sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select(
transaction, "select file.rowid, file.*, stream.*, c.* " transaction, "select file.rowid, file.*, stream.*, c.* "
@ -108,9 +111,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
"inner join content_claim cc on file.stream_hash=cc.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash "
"inner join claim c on cc.claim_outpoint=c.claim_outpoint " "inner join claim c on cc.claim_outpoint=c.claim_outpoint "
"where file.stream_hash in {} " "where file.stream_hash in {} "
"order by c.rowid desc", [ "order by c.rowid desc", stream_hashes):
stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file")]):
claim = StoredStreamClaim(stream_hash, *claim_args) claim = StoredStreamClaim(stream_hash, *claim_args)
if claim.channel_claim_id: if claim.channel_claim_id:
if claim.channel_claim_id not in signed_claims: if claim.channel_claim_id not in signed_claims:
@ -137,7 +138,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
) )
for claim_name, claim_id in _batched_select( for claim_name, claim_id in _batched_select(
transaction, "select c.claim_name, c.claim_id from claim c where c.claim_id in {}", transaction, "select c.claim_name, c.claim_id from claim c where c.claim_id in {}",
list(signed_claims.keys())): tuple(signed_claims.keys())):
for claim in signed_claims[claim_id]: for claim in signed_claims[claim_id]:
claim.channel_name = claim_name claim.channel_name = claim_name
return files return files
@ -147,35 +148,35 @@ def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descripto
# add all blobs, except the last one, which is empty # add all blobs, except the last one, which is empty
transaction.executemany( transaction.executemany(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
[(blob.blob_hash, blob.length, 0, 0, "pending", 0, 0) ((blob.blob_hash, blob.length, 0, 0, "pending", 0, 0)
for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob]] for blob in (descriptor.blobs[:-1] if len(descriptor.blobs) > 1 else descriptor.blobs) + [sd_blob])
) ).fetchall()
# associate the blobs to the stream # associate the blobs to the stream
transaction.execute("insert or ignore into stream values (?, ?, ?, ?, ?)", transaction.execute("insert or ignore into stream values (?, ?, ?, ?, ?)",
(descriptor.stream_hash, sd_blob.blob_hash, descriptor.key, (descriptor.stream_hash, sd_blob.blob_hash, descriptor.key,
binascii.hexlify(descriptor.stream_name.encode()).decode(), binascii.hexlify(descriptor.stream_name.encode()).decode(),
binascii.hexlify(descriptor.suggested_file_name.encode()).decode())) binascii.hexlify(descriptor.suggested_file_name.encode()).decode())).fetchall()
# add the stream # add the stream
transaction.executemany( transaction.executemany(
"insert or ignore into stream_blob values (?, ?, ?, ?)", "insert or ignore into stream_blob values (?, ?, ?, ?)",
[(descriptor.stream_hash, blob.blob_hash, blob.blob_num, blob.iv) ((descriptor.stream_hash, blob.blob_hash, blob.blob_num, blob.iv)
for blob in descriptor.blobs] for blob in descriptor.blobs)
) ).fetchall()
# ensure should_announce is set regardless if insert was ignored # ensure should_announce is set regardless if insert was ignored
transaction.execute( transaction.execute(
"update blob set should_announce=1 where blob_hash in (?, ?)", "update blob set should_announce=1 where blob_hash in (?, ?)",
(sd_blob.blob_hash, descriptor.blobs[0].blob_hash,) (sd_blob.blob_hash, descriptor.blobs[0].blob_hash,)
) ).fetchall()
def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'): def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'):
blob_hashes = [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]] blob_hashes = [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]]
blob_hashes.append((descriptor.sd_hash, )) blob_hashes.append((descriptor.sd_hash, ))
transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,)) transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,)).fetchall()
transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash,)) transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash,)).fetchall()
transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash,)) transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash,)).fetchall()
transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash,)) transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash,)).fetchall()
transaction.executemany("delete from blob where blob_hash=?", blob_hashes) transaction.executemany("delete from blob where blob_hash=?", blob_hashes).fetchall()
def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str], def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
@ -191,7 +192,7 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status, (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0, 1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
None if not content_fee else binascii.hexlify(content_fee.raw).decode()) None if not content_fee else binascii.hexlify(content_fee.raw).decode())
) ).fetchall()
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
@ -293,17 +294,17 @@ class SQLiteStorage(SQLiteMixin):
def _add_blobs(transaction: sqlite3.Connection): def _add_blobs(transaction: sqlite3.Connection):
transaction.executemany( transaction.executemany(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
[ (
(blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0) (blob_hash, length, 0, 0, "pending" if not finished else "finished", 0, 0)
for blob_hash, length in blob_hashes_and_lengths for blob_hash, length in blob_hashes_and_lengths
] )
) ).fetchall()
if finished: if finished:
transaction.executemany( transaction.executemany(
"update blob set status='finished' where blob.blob_hash=?", [ "update blob set status='finished' where blob.blob_hash=?", (
(blob_hash, ) for blob_hash, _ in blob_hashes_and_lengths (blob_hash, ) for blob_hash, _ in blob_hashes_and_lengths
] )
) ).fetchall()
return await self.db.run(_add_blobs) return await self.db.run(_add_blobs)
def get_blob_status(self, blob_hash: str): def get_blob_status(self, blob_hash: str):
@ -317,9 +318,9 @@ class SQLiteStorage(SQLiteMixin):
return transaction.executemany( return transaction.executemany(
"update blob set next_announce_time=?, last_announced_time=?, single_announce=0 " "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 "
"where blob_hash=?", "where blob_hash=?",
[(int(last_announced + (data_expiration / 2)), int(last_announced), blob_hash) ((int(last_announced + (data_expiration / 2)), int(last_announced), blob_hash)
for blob_hash in blob_hashes] for blob_hash in blob_hashes)
) ).fetchall()
return self.db.run(_update_last_announced_blobs) return self.db.run(_update_last_announced_blobs)
def should_single_announce_blobs(self, blob_hashes, immediate=False): def should_single_announce_blobs(self, blob_hashes, immediate=False):
@ -330,11 +331,11 @@ class SQLiteStorage(SQLiteMixin):
transaction.execute( transaction.execute(
"update blob set single_announce=1, next_announce_time=? " "update blob set single_announce=1, next_announce_time=? "
"where blob_hash=? and status='finished'", (int(now), blob_hash) "where blob_hash=? and status='finished'", (int(now), blob_hash)
) ).fetchall()
else: else:
transaction.execute( transaction.execute(
"update blob set single_announce=1 where blob_hash=? and status='finished'", (blob_hash,) "update blob set single_announce=1 where blob_hash=? and status='finished'", (blob_hash,)
) ).fetchall()
return self.db.run(set_single_announce) return self.db.run(set_single_announce)
def get_blobs_to_announce(self): def get_blobs_to_announce(self):
@ -347,22 +348,22 @@ class SQLiteStorage(SQLiteMixin):
"(should_announce=1 or single_announce=1) and next_announce_time<? and status='finished' " "(should_announce=1 or single_announce=1) and next_announce_time<? and status='finished' "
"order by next_announce_time asc limit ?", "order by next_announce_time asc limit ?",
(timestamp, int(self.conf.concurrent_blob_announcers * 10)) (timestamp, int(self.conf.concurrent_blob_announcers * 10))
) ).fetchall()
else: else:
r = transaction.execute( r = transaction.execute(
"select blob_hash from blob where blob_hash is not null " "select blob_hash from blob where blob_hash is not null "
"and next_announce_time<? and status='finished' " "and next_announce_time<? and status='finished' "
"order by next_announce_time asc limit ?", "order by next_announce_time asc limit ?",
(timestamp, int(self.conf.concurrent_blob_announcers * 10)) (timestamp, int(self.conf.concurrent_blob_announcers * 10))
) ).fetchall()
return [b[0] for b in r.fetchall()] return [b[0] for b in r]
return self.db.run(get_and_update) return self.db.run(get_and_update)
def delete_blobs_from_db(self, blob_hashes): def delete_blobs_from_db(self, blob_hashes):
def delete_blobs(transaction): def delete_blobs(transaction):
transaction.executemany( transaction.executemany(
"delete from blob where blob_hash=?;", [(blob_hash,) for blob_hash in blob_hashes] "delete from blob where blob_hash=?;", ((blob_hash,) for blob_hash in blob_hashes)
) ).fetchall()
return self.db.run_with_foreign_keys_disabled(delete_blobs) return self.db.run_with_foreign_keys_disabled(delete_blobs)
def get_all_blob_hashes(self): def get_all_blob_hashes(self):
@ -480,7 +481,7 @@ class SQLiteStorage(SQLiteMixin):
transaction.executemany( transaction.executemany(
"update file set file_name=null, download_directory=null, saved_file=0 where stream_hash=?", "update file set file_name=null, download_directory=null, saved_file=0 where stream_hash=?",
removed removed
) ).fetchall()
return await self.db.run(update_manually_removed_files) return await self.db.run(update_manually_removed_files)
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
@ -488,7 +489,7 @@ class SQLiteStorage(SQLiteMixin):
def change_file_status(self, stream_hash: str, new_status: str): def change_file_status(self, stream_hash: str, new_status: str):
log.debug("update file status %s -> %s", stream_hash, new_status) log.debug("update file status %s -> %s", stream_hash, new_status)
return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash)) return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str], async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str],
file_name: typing.Optional[str]): file_name: typing.Optional[str]):
@ -497,22 +498,22 @@ class SQLiteStorage(SQLiteMixin):
else: else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode() encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_dir.encode()).decode() encoded_download_dir = binascii.hexlify(download_dir.encode()).decode()
return await self.db.execute("update file set download_directory=?, file_name=? where stream_hash=?", ( return await self.db.execute_fetchall("update file set download_directory=?, file_name=? where stream_hash=?", (
encoded_download_dir, encoded_file_name, stream_hash, encoded_download_dir, encoded_file_name, stream_hash,
)) ))
async def save_content_fee(self, stream_hash: str, content_fee: Transaction): async def save_content_fee(self, stream_hash: str, content_fee: Transaction):
return await self.db.execute("update file set content_fee=? where stream_hash=?", ( return await self.db.execute_fetchall("update file set content_fee=? where stream_hash=?", (
binascii.hexlify(content_fee.raw), stream_hash, binascii.hexlify(content_fee.raw), stream_hash,
)) ))
async def set_saved_file(self, stream_hash: str): async def set_saved_file(self, stream_hash: str):
return await self.db.execute("update file set saved_file=1 where stream_hash=?", ( return await self.db.execute_fetchall("update file set saved_file=1 where stream_hash=?", (
stream_hash, stream_hash,
)) ))
async def clear_saved_file(self, stream_hash: str): async def clear_saved_file(self, stream_hash: str):
return await self.db.execute("update file set saved_file=0 where stream_hash=?", ( return await self.db.execute_fetchall("update file set saved_file=0 where stream_hash=?", (
stream_hash, stream_hash,
)) ))
@ -533,13 +534,13 @@ class SQLiteStorage(SQLiteMixin):
transaction.execute("insert or ignore into content_claim values (?, ?)", content_claim) transaction.execute("insert or ignore into content_claim values (?, ?)", content_claim)
transaction.executemany( transaction.executemany(
"update file set status='stopped' where stream_hash=?", "update file set status='stopped' where stream_hash=?",
[(stream_hash, ) for stream_hash in stream_hashes] ((stream_hash, ) for stream_hash in stream_hashes)
) ).fetchall()
download_dir = binascii.hexlify(self.conf.download_dir.encode()).decode() download_dir = binascii.hexlify(self.conf.download_dir.encode()).decode()
transaction.executemany( transaction.executemany(
f"update file set download_directory=? where stream_hash=?", f"update file set download_directory=? where stream_hash=?",
[(download_dir, stream_hash) for stream_hash in stream_hashes] ((download_dir, stream_hash) for stream_hash in stream_hashes)
) ).fetchall()
await self.db.run_with_foreign_keys_disabled(_recover) await self.db.run_with_foreign_keys_disabled(_recover)
def get_all_stream_hashes(self): def get_all_stream_hashes(self):
@ -551,14 +552,16 @@ class SQLiteStorage(SQLiteMixin):
# TODO: add 'address' to support items returned for a claim from lbrycrdd and lbryum-server # TODO: add 'address' to support items returned for a claim from lbrycrdd and lbryum-server
def _save_support(transaction): def _save_support(transaction):
bind = "({})".format(','.join(['?'] * len(claim_id_to_supports))) bind = "({})".format(','.join(['?'] * len(claim_id_to_supports)))
transaction.execute(f"delete from support where claim_id in {bind}", list(claim_id_to_supports.keys())) transaction.execute(
f"delete from support where claim_id in {bind}", tuple(claim_id_to_supports.keys())
).fetchall()
for claim_id, supports in claim_id_to_supports.items(): for claim_id, supports in claim_id_to_supports.items():
for support in supports: for support in supports:
transaction.execute( transaction.execute(
"insert into support values (?, ?, ?, ?)", "insert into support values (?, ?, ?, ?)",
("%s:%i" % (support['txid'], support['nout']), claim_id, lbc_to_dewies(support['amount']), ("%s:%i" % (support['txid'], support['nout']), claim_id, lbc_to_dewies(support['amount']),
support.get('address', "")) support.get('address', ""))
) ).fetchall()
return self.db.run(_save_support) return self.db.run(_save_support)
def get_supports(self, *claim_ids): def get_supports(self, *claim_ids):
@ -577,7 +580,7 @@ class SQLiteStorage(SQLiteMixin):
for support_info in _batched_select( for support_info in _batched_select(
transaction, transaction,
"select * from support where claim_id in {}", "select * from support where claim_id in {}",
tuple(claim_ids) claim_ids
) )
] ]
@ -608,7 +611,7 @@ class SQLiteStorage(SQLiteMixin):
transaction.execute( transaction.execute(
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence) (outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence)
) ).fetchall()
# if this response doesn't have support info don't overwrite the existing # if this response doesn't have support info don't overwrite the existing
# support info # support info
if 'supports' in claim_info: if 'supports' in claim_info:
@ -695,7 +698,9 @@ class SQLiteStorage(SQLiteMixin):
) )
# update the claim associated to the file # update the claim associated to the file
transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) transaction.execute(
"insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)
).fetchall()
async def save_content_claim(self, stream_hash, claim_outpoint): async def save_content_claim(self, stream_hash, claim_outpoint):
await self.db.run(self._save_content_claim, claim_outpoint, stream_hash) await self.db.run(self._save_content_claim, claim_outpoint, stream_hash)
@ -718,11 +723,11 @@ class SQLiteStorage(SQLiteMixin):
def update_reflected_stream(self, sd_hash, reflector_address, success=True): def update_reflected_stream(self, sd_hash, reflector_address, success=True):
if success: if success:
return self.db.execute( return self.db.execute_fetchall(
"insert or replace into reflected_stream values (?, ?, ?)", "insert or replace into reflected_stream values (?, ?, ?)",
(sd_hash, reflector_address, self.time_getter()) (sd_hash, reflector_address, self.time_getter())
) )
return self.db.execute( return self.db.execute_fetchall(
"delete from reflected_stream where sd_hash=? and reflector_address=?", "delete from reflected_stream where sd_hash=? and reflector_address=?",
(sd_hash, reflector_address) (sd_hash, reflector_address)
) )

View file

@ -134,11 +134,11 @@ class WalletDatabase(BaseDatabase):
return self.get_utxo_count(**constraints) return self.get_utxo_count(**constraints)
async def release_all_outputs(self, account): async def release_all_outputs(self, account):
await self.db.execute( await self.db.execute_fetchall(
"UPDATE txo SET is_reserved = 0 WHERE" "UPDATE txo SET is_reserved = 0 WHERE"
" is_reserved = 1 AND txo.address IN (" " is_reserved = 1 AND txo.address IN ("
" SELECT address from pubkey_address WHERE account = ?" " SELECT address from pubkey_address WHERE account = ?"
" )", [account.public_key.address] " )", (account.public_key.address, )
) )
def get_supports_summary(self, account_id): def get_supports_summary(self, account_id):

View file

@ -86,10 +86,10 @@ class AIOSQLite:
if not foreign_keys_enabled: if not foreign_keys_enabled:
raise sqlite3.IntegrityError("foreign keys are disabled, use `AIOSQLite.run` instead") raise sqlite3.IntegrityError("foreign keys are disabled, use `AIOSQLite.run` instead")
try: try:
self.connection.execute('pragma foreign_keys=off') self.connection.execute('pragma foreign_keys=off').fetchone()
return self.__run_transaction(fun, *args, **kwargs) return self.__run_transaction(fun, *args, **kwargs)
finally: finally:
self.connection.execute('pragma foreign_keys=on') self.connection.execute('pragma foreign_keys=on').fetchone()
def constraints_to_sql(constraints, joiner=' AND ', prepend_key=''): def constraints_to_sql(constraints, joiner=' AND ', prepend_key=''):
@ -376,10 +376,10 @@ class BaseDatabase(SQLiteMixin):
} }
async def insert_transaction(self, tx): async def insert_transaction(self, tx):
await self.db.execute(*self._insert_sql('tx', self.tx_to_row(tx))) await self.db.execute_fetchall(*self._insert_sql('tx', self.tx_to_row(tx)))
async def update_transaction(self, tx): async def update_transaction(self, tx):
await self.db.execute(*self._update_sql("tx", { await self.db.execute_fetchall(*self._update_sql("tx", {
'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified 'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified
}, 'txid = ?', (tx.id,))) }, 'txid = ?', (tx.id,)))
@ -390,7 +390,7 @@ class BaseDatabase(SQLiteMixin):
if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash: if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash:
conn.execute(*self._insert_sql( conn.execute(*self._insert_sql(
"txo", self.txo_to_row(tx, address, txo), ignore_duplicate=True "txo", self.txo_to_row(tx, address, txo), ignore_duplicate=True
)) )).fetchall()
elif txo.script.is_pay_script_hash: elif txo.script.is_pay_script_hash:
# TODO: implement script hash payments # TODO: implement script hash payments
log.warning('Database.save_transaction_io: pay script hash is not implemented!') log.warning('Database.save_transaction_io: pay script hash is not implemented!')
@ -403,7 +403,7 @@ class BaseDatabase(SQLiteMixin):
'txid': tx.id, 'txid': tx.id,
'txoid': txo.id, 'txoid': txo.id,
'address': address, 'address': address,
}, ignore_duplicate=True)) }, ignore_duplicate=True)).fetchall()
conn.execute( conn.execute(
"UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?", "UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?",
@ -618,7 +618,7 @@ class BaseDatabase(SQLiteMixin):
) )
async def _set_address_history(self, address, history): async def _set_address_history(self, address, history):
await self.db.execute( await self.db.execute_fetchall(
"UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?", "UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?",
(history, history.count(':')//2, address) (history, history.count(':')//2, address)
) )