diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 1449d35ca..3ed99ce41 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -1,14 +1,16 @@ import logging import os -import leveldb import time -import json +import sqlite3 from twisted.internet import threads, defer, reactor, task from twisted.python.failure import Failure +from twisted.enterprise import adbapi from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.cryptoutils import get_lbry_hash_obj +from lbrynet.core.Error import NoSuchBlobError +from lbrynet.core.sqlite_helpers import rerun_if_locked class BlobManager(DHTHashSupplier): @@ -68,8 +70,8 @@ class DiskBlobManager(BlobManager): def __init__(self, hash_announcer, blob_dir, db_dir): BlobManager.__init__(self, hash_announcer) self.blob_dir = blob_dir - self.db_dir = db_dir - self.db = None + self.db_file = os.path.join(db_dir, "blobs.db") + self.db_conn = None self.blob_type = BlobFile self.blob_creator_type = BlobFileCreator self.blobs = {} @@ -77,7 +79,7 @@ class DiskBlobManager(BlobManager): self._next_manage_call = None def setup(self): - d = threads.deferToThread(self._open_db) + d = self._open_db() d.addCallback(lambda _: self._manage()) return d @@ -85,7 +87,8 @@ class DiskBlobManager(BlobManager): if self._next_manage_call is not None and self._next_manage_call.active(): self._next_manage_call.cancel() self._next_manage_call = None - self.db = None + #d = self.db_conn.close() + self.db_conn = None return defer.succeed(True) def get_blob(self, blob_hash, upload_allowed, length=None): @@ -101,7 +104,7 @@ class DiskBlobManager(BlobManager): def _make_new_blob(self, blob_hash, upload_allowed, length=None): blob = self.blob_type(self.blob_dir, blob_hash, upload_allowed, length) self.blobs[blob_hash] = blob - d = threads.deferToThread(self._completed_blobs, [blob_hash]) + d = self._completed_blobs([blob_hash]) def check_completed(completed_blobs): @@ -110,7 +113,7 @@ class DiskBlobManager(BlobManager): if len(completed_blobs) == 1 and completed_blobs[0] == blob_hash: blob.verified = True - inner_d = threads.deferToThread(self._get_blob_length, blob_hash) + inner_d = self._get_blob_length(blob_hash) inner_d.addCallback(set_length) inner_d.addCallback(lambda _: blob) else: @@ -123,15 +126,15 @@ class DiskBlobManager(BlobManager): def blob_completed(self, blob, next_announce_time=None): if next_announce_time is None: next_announce_time = time.time() - return threads.deferToThread(self._add_completed_blob, blob.blob_hash, blob.length, - time.time(), next_announce_time) + return self._add_completed_blob(blob.blob_hash, blob.length, + time.time(), next_announce_time) def completed_blobs(self, blobs_to_check): - return threads.deferToThread(self._completed_blobs, blobs_to_check) + return self._completed_blobs(blobs_to_check) def hashes_to_announce(self): next_announce_time = time.time() + self.hash_reannounce_time - return threads.deferToThread(self._get_blobs_to_announce, next_announce_time) + return self._get_blobs_to_announce(next_announce_time) def creator_finished(self, blob_creator): logging.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) @@ -155,18 +158,18 @@ class DiskBlobManager(BlobManager): self.blob_hashes_to_delete[blob_hash] = False def update_all_last_verified_dates(self, timestamp): - return threads.deferToThread(self._update_all_last_verified_dates, timestamp) + return self._update_all_last_verified_dates(timestamp) def immediate_announce_all_blobs(self): - d = threads.deferToThread(self._get_all_verified_blob_hashes) + d = self._get_all_verified_blob_hashes() d.addCallback(self.hash_announcer.immediate_announce) return d def get_blob_length(self, blob_hash): - return threads.deferToThread(self._get_blob_length, blob_hash) + return self._get_blob_length(blob_hash) def check_consistency(self): - return threads.deferToThread(self._check_consistency) + return self._check_consistency() def _manage(self): from twisted.internet import reactor @@ -192,7 +195,7 @@ class DiskBlobManager(BlobManager): def delete_from_db(result): b_hs = [r[1] for r in result if r[0] is True] if b_hs: - d = threads.deferToThread(self._delete_blobs_from_db, b_hs) + d = self._delete_blobs_from_db(b_hs) else: d = defer.succeed(True) @@ -221,72 +224,127 @@ class DiskBlobManager(BlobManager): ######### database calls ######### def _open_db(self): - self.db = leveldb.LevelDB(os.path.join(self.db_dir, "blobs.db")) + # check_same_thread=False is solely to quiet a spurious error that appears to be due + # to a bug in twisted, where the connection is closed by a different thread than the + # one that opened it. The individual connections in the pool are not used in multiple + # threads. + self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) + return self.db_conn.runQuery("create table if not exists blobs (" + + " blob_hash text primary key, " + + " blob_length integer, " + + " last_verified_time real, " + + " next_announce_time real" + ")") + @rerun_if_locked def _add_completed_blob(self, blob_hash, length, timestamp, next_announce_time=None): logging.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length)) if next_announce_time is None: next_announce_time = timestamp - self.db.Put(blob_hash, json.dumps((length, timestamp, next_announce_time)), sync=True) + d = self.db_conn.runQuery("insert into blobs values (?, ?, ?, ?)", + (blob_hash, length, timestamp, next_announce_time)) + d.addErrback(lambda err: err.trap(sqlite3.IntegrityError)) + return d + @rerun_if_locked def _completed_blobs(self, blobs_to_check): - blobs = [] - for b in blobs_to_check: - if is_valid_blobhash(b): - try: - length, verified_time, next_announce_time = json.loads(self.db.Get(b)) - except KeyError: - continue + blobs_to_check = filter(is_valid_blobhash, blobs_to_check) + + def get_blobs_in_db(db_transaction): + blobs_in_db = [] # [(blob_hash, last_verified_time)] + for b in blobs_to_check: + result = db_transaction.execute("select last_verified_time from blobs where blob_hash = ?", + (b,)) + row = result.fetchone() + if row is not None: + blobs_in_db.append((b, row[0])) + return blobs_in_db + + def get_valid_blobs(blobs_in_db): + + def check_blob_verified_date(b, verified_time): file_path = os.path.join(self.blob_dir, b) if os.path.isfile(file_path): if verified_time > os.path.getctime(file_path): - blobs.append(b) - return blobs + return True + return False + def return_valid_blobs(results): + valid_blobs = [] + for (b, verified_date), (success, result) in zip(blobs_in_db, results): + if success is True and result is True: + valid_blobs.append(b) + return valid_blobs + + ds = [] + for b, verified_date in blobs_in_db: + ds.append(threads.deferToThread(check_blob_verified_date, b, verified_date)) + dl = defer.DeferredList(ds) + dl.addCallback(return_valid_blobs) + return dl + + d = self.db_conn.runInteraction(get_blobs_in_db) + d.addCallback(get_valid_blobs) + return d + + @rerun_if_locked def _get_blob_length(self, blob): - length, verified_time, next_announce_time = json.loads(self.db.Get(blob)) - return length + d = self.db_conn.runQuery("select blob_length from blobs where blob_hash = ?", (blob,)) + d.addCallback(lambda r: r[0] if len(r) else Failure(NoSuchBlobError(blob))) + return d + #length, verified_time, next_announce_time = json.loads(self.db.Get(blob)) + #return length + + @rerun_if_locked def _update_blob_verified_timestamp(self, blob, timestamp): - length, old_verified_time, next_announce_time = json.loads(self.db.Get(blob)) - self.db.Put(blob, json.dumps((length, timestamp, next_announce_time)), sync=True) + return self.db_conn.runQuery("update blobs set last_verified_time = ? where blob_hash = ?", + (blob, timestamp)) + @rerun_if_locked def _get_blobs_to_announce(self, next_announce_time): - # TODO: See if the following would be better for handling announce times: - # TODO: Have a separate db for them, and read the whole thing into memory - # TODO: on startup, and then write changes to db when they happen - blobs = [] - batch = leveldb.WriteBatch() - current_time = time.time() - for blob_hash, blob_info in self.db.RangeIter(): - length, verified_time, announce_time = json.loads(blob_info) - if announce_time < current_time: - batch.Put(blob_hash, json.dumps((length, verified_time, next_announce_time))) - blobs.append(blob_hash) - self.db.Write(batch, sync=True) - return blobs + def get_and_update(transaction): + timestamp = time.time() + r = transaction.execute("select blob_hash from blobs " + + "where next_announce_time < ? and blob_hash is not null", + (timestamp,)) + blobs = [b for b, in r.fetchall()] + transaction.execute("update blobs set next_announce_time = ? where next_announce_time < ?", + (next_announce_time, timestamp)) + return blobs + + return self.db_conn.runInteraction(get_and_update) + + @rerun_if_locked def _update_all_last_verified_dates(self, timestamp): - batch = leveldb.WriteBatch() - for blob_hash, blob_info in self.db.RangeIter(): - length, verified_time, announce_time = json.loads(blob_info) - batch.Put(blob_hash, json.dumps((length, timestamp, announce_time))) - self.db.Write(batch, sync=True) + return self.db_conn.runQuery("update blobs set last_verified_date = ?", (timestamp,)) + @rerun_if_locked def _delete_blobs_from_db(self, blob_hashes): - batch = leveldb.WriteBatch() - for blob_hash in blob_hashes: - batch.Delete(blob_hash) - self.db.Write(batch, sync=True) + def delete_blobs(transaction): + for b in blob_hashes: + transaction.execute("delete from blobs where blob_hash = ?", (b,)) + + return self.db_conn.runInteraction(delete_blobs) + + @rerun_if_locked def _check_consistency(self): - batch = leveldb.WriteBatch() + + ALREADY_VERIFIED = 1 + NEWLY_VERIFIED = 2 + INVALID = 3 + current_time = time.time() - for blob_hash, blob_info in self.db.RangeIter(): - length, verified_time, announce_time = json.loads(blob_info) + d = self.db_conn.runQuery("select blob_hash, blob_length, last_verified_time from blobs") + + def check_blob(blob_hash, blob_length, verified_time): file_path = os.path.join(self.blob_dir, blob_hash) if os.path.isfile(file_path): - if verified_time < os.path.getctime(file_path): + if verified_time >= os.path.getctime(file_path): + return ALREADY_VERIFIED + else: h = get_lbry_hash_obj() len_so_far = 0 f = open(file_path) @@ -296,19 +354,63 @@ class DiskBlobManager(BlobManager): break h.update(data) len_so_far += len(data) - if len_so_far == length and h.hexdigest() == blob_hash: - batch.Put(blob_hash, json.dumps((length, current_time, announce_time))) - self.db.Write(batch, sync=True) + if len_so_far == blob_length and h.hexdigest() == blob_hash: + return NEWLY_VERIFIED + return INVALID + def do_check(blobs): + already_verified = [] + newly_verified = [] + invalid = [] + for blob_hash, blob_length, verified_time in blobs: + status = check_blob(blob_hash, blob_length, verified_time) + if status == ALREADY_VERIFIED: + already_verified.append(blob_hash) + elif status == NEWLY_VERIFIED: + newly_verified.append(blob_hash) + else: + invalid.append(blob_hash) + return already_verified, newly_verified, invalid + + def update_newly_verified(transaction, blobs): + for b in blobs: + transaction.execute("update blobs set last_verified_time = ? where blob_hash = ?", + (current_time, b)) + + def check_blobs(blobs): + + @rerun_if_locked + def update_and_return(status_lists): + + already_verified, newly_verified, invalid = status_lists + + d = self.db_conn.runInteraction(update_newly_verified, newly_verified) + d.addCallback(lambda _: status_lists) + return d + + d = threads.deferToThread(do_check, blobs) + + d.addCallback(update_and_return) + return d + + d.addCallback(check_blobs) + return d + + @rerun_if_locked def _get_all_verified_blob_hashes(self): - blob_hashes = [] - for blob_hash, blob_info in self.db.RangeIter(): - length, verified_time, announce_time = json.loads(blob_info) - file_path = os.path.join(self.blob_dir, blob_hash) - if os.path.isfile(file_path): - if verified_time > os.path.getctime(file_path): - blob_hashes.append(blob_hash) - return blob_hashes + d = self.db_conn.runQuery("select blob_hash, last_verified_time from blobs") + + def get_verified_blobs(blobs): + verified_blobs = [] + for blob_hash, verified_time in blobs: + file_path = os.path.join(self.blob_dir, blob_hash) + if os.path.isfile(file_path): + if verified_time > os.path.getctime(file_path): + verified_blobs.append(blob_hash) + return verified_blobs + + d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs)) + return d class TempBlobManager(BlobManager): @@ -389,7 +491,7 @@ class TempBlobManager(BlobManager): if blob_hash in self.blobs: if self.blobs[blob_hash].length is not None: return defer.succeed(self.blobs[blob_hash].length) - return defer.fail(ValueError("No such blob hash is known")) + return defer.fail(NoSuchBlobError(blob_hash)) def immediate_announce_all_blobs(self): return self.hash_announcer.immediate_announce(self.blobs.iterkeys()) @@ -432,7 +534,7 @@ class TempBlobManager(BlobManager): ds.append(d) else: remove_from_list(blob_hash) - d = defer.fail(Failure(ValueError("No such blob known"))) + d = defer.fail(Failure(NoSuchBlobError(blob_hash))) logging.warning("Blob %s cannot be deleted because it is unknown") ds.append(d) return defer.DeferredList(ds) \ No newline at end of file diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 08c03ce05..89d22f13e 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -63,4 +63,12 @@ class NoResponseError(MisbehavingPeerError): class InvalidResponseError(MisbehavingPeerError): + pass + + +class NoSuchBlobError(Exception): + pass + + +class NoSuchStreamHashError(Exception): pass \ No newline at end of file diff --git a/lbrynet/core/PTCWallet.py b/lbrynet/core/PTCWallet.py index 01fdd0ea8..be7213035 100644 --- a/lbrynet/core/PTCWallet.py +++ b/lbrynet/core/PTCWallet.py @@ -1,7 +1,7 @@ from collections import defaultdict import logging -import leveldb import os +import unqlite import time from Crypto.Hash import SHA512 from Crypto.PublicKey import RSA @@ -71,7 +71,7 @@ class PTCWallet(object): def save_key(success, private_key): if success is True: - threads.deferToThread(self.save_private_key, private_key.exportKey()) + self._save_private_key(private_key.exportKey()) return True return False @@ -95,8 +95,8 @@ class PTCWallet(object): def start_manage(): self.manage() return True - d = threads.deferToThread(self._open_db) - d.addCallback(lambda _: threads.deferToThread(self.get_wallet_private_key)) + d = self._open_db() + d.addCallback(lambda _: self._get_wallet_private_key()) d.addCallback(ensure_private_key_exists) d.addCallback(lambda _: start_manage()) return d @@ -211,16 +211,21 @@ class PTCWallet(object): str(peer), self.peer_pub_keys[peer], str(min_expected_balance), str(received_balance)) def _open_db(self): - self.db = leveldb.LevelDB(os.path.join(self.db_dir, "ptcwallet.db")) + def open_db(): + self.db = unqlite.UnQLite(os.path.join(self.db_dir, "ptcwallet.db")) + return threads.deferToThread(open_db) - def save_private_key(self, private_key): - self.db.Put("private_key", private_key) + def _save_private_key(self, private_key): + def save_key(): + self.db['private_key'] = private_key + return threads.deferToThread(save_key) - def get_wallet_private_key(self): - try: - return self.db.Get("private_key") - except KeyError: + def _get_wallet_private_key(self): + def get_key(): + if 'private_key' in self.db: + return self.db['private_key'] return None + return threads.deferToThread(get_key) class PointTraderKeyExchanger(object): diff --git a/lbrynet/core/sqlite_helpers.py b/lbrynet/core/sqlite_helpers.py new file mode 100644 index 000000000..2e37fcf55 --- /dev/null +++ b/lbrynet/core/sqlite_helpers.py @@ -0,0 +1,20 @@ +import sqlite3 +from twisted.internet import task, reactor +import logging + + +def rerun_if_locked(f): + + def rerun(err, *args, **kwargs): + if err.check(sqlite3.OperationalError) and err.value.message == "database is locked": + logging.warning("database was locked. rerunning %s with args %s, kwargs %s", + str(f), str(args), str(kwargs)) + return task.deferLater(reactor, 0, wrapper, *args, **kwargs) + return err + + def wrapper(*args, **kwargs): + d = f(*args, **kwargs) + d.addErrback(rerun, *args, **kwargs) + return d + + return wrapper \ No newline at end of file diff --git a/lbrynet/db_migrator/__init__.py b/lbrynet/db_migrator/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/db_migrator/dbmigrator.py b/lbrynet/db_migrator/dbmigrator.py new file mode 100644 index 000000000..8e61e1c93 --- /dev/null +++ b/lbrynet/db_migrator/dbmigrator.py @@ -0,0 +1,14 @@ +def migrate_db(db_dir, start, end): + current = start + old_dirs = [] + while current < end: + if current == 0: + from lbrynet.db_migrator.migrate0to1 import do_migration + old_dirs.append(do_migration(db_dir)) + current += 1 + return old_dirs + + +def run_migration_script(): + import sys + migrate_db(sys.argv[1], sys.argv[2], sys.argv[3]) \ No newline at end of file diff --git a/lbrynet/db_migrator/migrate0to1.py b/lbrynet/db_migrator/migrate0to1.py new file mode 100644 index 000000000..e39315b31 --- /dev/null +++ b/lbrynet/db_migrator/migrate0to1.py @@ -0,0 +1,304 @@ +import sqlite3 +import unqlite +import leveldb +import shutil +import os +import logging +import json + + +known_dbs = ['lbryfile_desc.db', 'lbryfiles.db', 'valuable_blobs.db', 'blobs.db', + 'lbryfile_blob.db', 'lbryfile_info.db', 'settings.db', 'blind_settings.db', + 'blind_peers.db', 'blind_info.db', 'lbryfile_info.db', 'lbryfile_manager.db', + 'live_stream.db', 'stream_info.db', 'stream_blob.db', 'stream_desc.db'] + + +def do_move(from_dir, to_dir): + for known_db in known_dbs: + known_db_path = os.path.join(from_dir, known_db) + if os.path.exists(known_db_path): + logging.debug("Moving %s to %s", + os.path.abspath(known_db_path), + os.path.abspath(os.path.join(to_dir, known_db))) + shutil.move(known_db_path, os.path.join(to_dir, known_db)) + else: + logging.debug("Did not find %s", os.path.abspath(known_db_path)) + + +def do_migration(db_dir): + old_dir = os.path.join(db_dir, "_0_to_1_old") + new_dir = os.path.join(db_dir, "_0_to_1_new") + try: + logging.info("Moving dbs from the real directory to %s", os.path.abspath(old_dir)) + os.makedirs(old_dir) + do_move(db_dir, old_dir) + except: + logging.error("An error occurred moving the old db files.") + raise + try: + logging.info("Creating the new directory in %s", os.path.abspath(new_dir)) + os.makedirs(new_dir) + + except: + logging.error("An error occurred creating the new directory.") + raise + try: + logging.info("Doing the migration") + migrate_blob_db(old_dir, new_dir) + migrate_lbryfile_db(old_dir, new_dir) + migrate_livestream_db(old_dir, new_dir) + migrate_ptc_db(old_dir, new_dir) + migrate_lbryfile_manager_db(old_dir, new_dir) + migrate_settings_db(old_dir, new_dir) + migrate_repeater_db(old_dir, new_dir) + logging.info("Migration succeeded") + except: + logging.error("An error occurred during the migration. Restoring.") + do_move(old_dir, db_dir) + raise + try: + logging.info("Moving dbs in the new directory to the real directory") + do_move(new_dir, db_dir) + db_revision = open(os.path.join(db_dir, 'db_revision'), mode='w+') + db_revision.write("1") + db_revision.close() + os.rmdir(new_dir) + except: + logging.error("An error occurred moving the new db files.") + raise + return old_dir + + +def migrate_blob_db(old_db_dir, new_db_dir): + old_blob_db_path = os.path.join(old_db_dir, "blobs.db") + if not os.path.exists(old_blob_db_path): + return True + + old_db = leveldb.LevelDB(old_blob_db_path) + new_db_conn = sqlite3.connect(os.path.join(new_db_dir, "blobs.db")) + c = new_db_conn.cursor() + c.execute("create table if not exists blobs (" + + " blob_hash text primary key, " + + " blob_length integer, " + + " last_verified_time real, " + + " next_announce_time real" + ")") + new_db_conn.commit() + c = new_db_conn.cursor() + for blob_hash, blob_info in old_db.RangeIter(): + blob_length, verified_time, announce_time = json.loads(blob_info) + c.execute("insert into blobs values (?, ?, ?, ?)", + (blob_hash, blob_length, verified_time, announce_time)) + new_db_conn.commit() + new_db_conn.close() + + +def migrate_lbryfile_db(old_db_dir, new_db_dir): + old_lbryfile_db_path = os.path.join(old_db_dir, "lbryfiles.db") + if not os.path.exists(old_lbryfile_db_path): + return True + + stream_info_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_info.db")) + stream_blob_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_blob.db")) + stream_desc_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_desc.db")) + + db_conn = sqlite3.connect(os.path.join(new_db_dir, "lbryfile_info.db")) + c = db_conn.cursor() + c.execute("create table if not exists lbry_files (" + + " stream_hash text primary key, " + + " key text, " + + " stream_name text, " + + " suggested_file_name text" + + ")") + c.execute("create table if not exists lbry_file_blobs (" + + " blob_hash text, " + + " stream_hash text, " + + " position integer, " + + " iv text, " + + " length integer, " + + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + c.execute("create table if not exists lbry_file_descriptors (" + + " sd_blob_hash TEXT PRIMARY KEY, " + + " stream_hash TEXT, " + + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + db_conn.commit() + c = db_conn.cursor() + for stream_hash, stream_info in stream_info_db.RangeIter(): + key, name, suggested_file_name = json.loads(stream_info) + c.execute("insert into lbry_files values (?, ?, ?, ?)", + (stream_hash, key, name, suggested_file_name)) + db_conn.commit() + c = db_conn.cursor() + for blob_hash_stream_hash, blob_info in stream_blob_db.RangeIter(): + b_h, s_h = json.loads(blob_hash_stream_hash) + position, iv, length = json.loads(blob_info) + c.execute("insert into lbry_file_blobs values (?, ?, ?, ?, ?)", + (b_h, s_h, position, iv, length)) + db_conn.commit() + c = db_conn.cursor() + for sd_blob_hash, stream_hash in stream_desc_db.RangeIter(): + c.execute("insert into lbry_file_descriptors values (?, ?)", + (sd_blob_hash, stream_hash)) + db_conn.commit() + db_conn.close() + + +def migrate_livestream_db(old_db_dir, new_db_dir): + old_db_path = os.path.join(old_db_dir, "stream_info.db") + if not os.path.exists(old_db_path): + return True + stream_info_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_info.db")) + stream_blob_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_blob.db")) + stream_desc_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_desc.db")) + + db_conn = sqlite3.connect(os.path.join(new_db_dir, "live_stream.db")) + + c = db_conn.cursor() + + c.execute("create table if not exists live_streams (" + + " stream_hash text primary key, " + + " public_key text, " + + " key text, " + + " stream_name text, " + + " next_announce_time real" + + ")") + c.execute("create table if not exists live_stream_blobs (" + + " blob_hash text, " + + " stream_hash text, " + + " position integer, " + + " revision integer, " + + " iv text, " + + " length integer, " + + " signature text, " + + " foreign key(stream_hash) references live_streams(stream_hash)" + + ")") + c.execute("create table if not exists live_stream_descriptors (" + + " sd_blob_hash TEXT PRIMARY KEY, " + + " stream_hash TEXT, " + + " foreign key(stream_hash) references live_streams(stream_hash)" + + ")") + + db_conn.commit() + + c = db_conn.cursor() + for stream_hash, stream_info in stream_info_db.RangeIter(): + public_key, key, name, next_announce_time = json.loads(stream_info) + c.execute("insert into live_streams values (?, ?, ?, ?, ?)", + (stream_hash, public_key, key, name, next_announce_time)) + db_conn.commit() + c = db_conn.cursor() + for blob_hash_stream_hash, blob_info in stream_blob_db.RangeIter(): + b_h, s_h = json.loads(blob_hash_stream_hash) + position, revision, iv, length, signature = json.loads(blob_info) + c.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)", + (b_h, s_h, position, revision, iv, length, signature)) + db_conn.commit() + c = db_conn.cursor() + for sd_blob_hash, stream_hash in stream_desc_db.RangeIter(): + c.execute("insert into live_stream_descriptors values (?, ?)", + (sd_blob_hash, stream_hash)) + db_conn.commit() + db_conn.close() + + +def migrate_ptc_db(old_db_dir, new_db_dir): + old_db_path = os.path.join(old_db_dir, "ptcwallet.db") + if not os.path.exists(old_db_path): + return True + old_db = leveldb.LevelDB(old_db_path) + try: + p_key = old_db.Get("private_key") + new_db = unqlite.UnQLite(os.path.join(new_db_dir, "ptcwallet.db")) + new_db['private_key'] = p_key + except KeyError: + pass + + +def migrate_lbryfile_manager_db(old_db_dir, new_db_dir): + old_db_path = os.path.join(old_db_dir, "lbryfiles.db") + if not os.path.exists(old_db_path): + return True + old_db = leveldb.LevelDB(old_db_path) + new_db = sqlite3.connect(os.path.join(new_db_dir, "lbryfile_info.db")) + c = new_db.cursor() + c.execute("create table if not exists lbry_file_options (" + + " blob_data_rate real, " + + " status text," + + " stream_hash text," + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + new_db.commit() + LBRYFILE_STATUS = "t" + LBRYFILE_OPTIONS = "o" + c = new_db.cursor() + for k, v in old_db.RangeIter(): + key_type, stream_hash = json.loads(k) + if key_type == LBRYFILE_STATUS: + try: + rate = json.loads(old_db.Get(json.dumps((LBRYFILE_OPTIONS, stream_hash))))[0] + except KeyError: + rate = None + c.execute("insert into lbry_file_options values (?, ?, ?)", + (rate, v, stream_hash)) + new_db.commit() + new_db.close() + + +def migrate_settings_db(old_db_dir, new_db_dir): + old_settings_db_path = os.path.join(old_db_dir, "settings.db") + if not os.path.exists(old_settings_db_path): + return True + old_db = leveldb.LevelDB(old_settings_db_path) + new_db = unqlite.UnQLite(os.path.join(new_db_dir, "settings.db")) + for k, v in old_db.RangeIter(): + new_db[k] = v + + +def migrate_repeater_db(old_db_dir, new_db_dir): + old_repeater_db_path = os.path.join(old_db_dir, "valuable_blobs.db") + if not os.path.exists(old_repeater_db_path): + return True + old_db = leveldb.LevelDB(old_repeater_db_path) + info_db = sqlite3.connect(os.path.join(new_db_dir, "blind_info.db")) + peer_db = sqlite3.connect(os.path.join(new_db_dir, "blind_peers.db")) + unql_db = unqlite.UnQLite(os.path.join(new_db_dir, "blind_settings.db")) + BLOB_INFO_TYPE = 'b' + SETTING_TYPE = 's' + PEER_TYPE = 'p' + info_c = info_db.cursor() + info_c.execute("create table if not exists valuable_blobs (" + + " blob_hash text primary key, " + + " blob_length integer, " + + " reference text, " + + " peer_host text, " + + " peer_port integer, " + + " peer_score text" + + ")") + info_db.commit() + peer_c = peer_db.cursor() + peer_c.execute("create table if not exists approved_peers (" + + " ip_address text, " + + " port integer" + + ")") + peer_db.commit() + info_c = info_db.cursor() + peer_c = peer_db.cursor() + for k, v in old_db.RangeIter(): + key_type, key_rest = json.loads(k) + if key_type == PEER_TYPE: + host, port = key_rest + peer_c.execute("insert into approved_peers values (?, ?)", + (host, port)) + elif key_type == SETTING_TYPE: + unql_db[key_rest] = v + elif key_type == BLOB_INFO_TYPE: + blob_hash = key_rest + length, reference, peer_host, peer_port, peer_score = json.loads(v) + info_c.execute("insert into valuable_blobs values (?, ?, ?, ?, ?, ?)", + (blob_hash, length, reference, peer_host, peer_port, peer_score)) + info_db.commit() + peer_db.commit() + info_db.close() + peer_db.close() \ No newline at end of file diff --git a/lbrynet/lbryfile/LBRYFileMetadataManager.py b/lbrynet/lbryfile/LBRYFileMetadataManager.py index 270d1f0e3..0aed13434 100644 --- a/lbrynet/lbryfile/LBRYFileMetadataManager.py +++ b/lbrynet/lbryfile/LBRYFileMetadataManager.py @@ -1,9 +1,11 @@ import logging -import leveldb -import json +import sqlite3 import os from twisted.internet import threads, defer -from lbrynet.core.Error import DuplicateStreamHashError +from twisted.python.failure import Failure +from twisted.enterprise import adbapi +from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError +from lbrynet.core.sqlite_helpers import rerun_if_locked class DBLBRYFileMetadataManager(object): @@ -16,163 +18,217 @@ class DBLBRYFileMetadataManager(object): self.stream_desc_db = None def setup(self): - return threads.deferToThread(self._open_db) + return self._open_db() def stop(self): - self.stream_info_db = None - self.stream_blob_db = None - self.stream_desc_db = None + self.db_conn = None return defer.succeed(True) def get_all_streams(self): - return threads.deferToThread(self._get_all_streams) + return self._get_all_streams() def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs): - d = threads.deferToThread(self._store_stream, stream_hash, file_name, key, suggested_file_name) + d = self._store_stream(stream_hash, file_name, key, suggested_file_name) d.addCallback(lambda _: self.add_blobs_to_stream(stream_hash, blobs)) return d def get_stream_info(self, stream_hash): - return threads.deferToThread(self._get_stream_info, stream_hash) + return self._get_stream_info(stream_hash) def check_if_stream_exists(self, stream_hash): - return threads.deferToThread(self._check_if_stream_exists, stream_hash) + return self._check_if_stream_exists(stream_hash) def delete_stream(self, stream_hash): - return threads.deferToThread(self._delete_stream, stream_hash) + return self._delete_stream(stream_hash) def add_blobs_to_stream(self, stream_hash, blobs): - - def add_blobs(): - self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True) - - return threads.deferToThread(add_blobs) + return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True) def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): logging.info("Getting blobs for a stream. Count is %s", str(count)) def get_positions_of_start_and_end(): if start_blob is not None: - start_num = self._get_blob_num_by_hash(stream_hash, start_blob) + d1 = self._get_blob_num_by_hash(stream_hash, start_blob) else: - start_num = None + d1 = defer.succeed(None) if end_blob is not None: - end_num = self._get_blob_num_by_hash(stream_hash, end_blob) + d2 = self._get_blob_num_by_hash(stream_hash, end_blob) else: + d2 = defer.succeed(None) + + dl = defer.DeferredList([d1, d2]) + + def get_positions(results): + start_num = None end_num = None - return start_num, end_num + if results[0][0] is True: + start_num = results[0][1] + if results[1][0] is True: + end_num = results[1][1] + return start_num, end_num + + dl.addCallback(get_positions) + return dl def get_blob_infos(nums): start_num, end_num = nums - return threads.deferToThread(self._get_further_blob_infos, stream_hash, start_num, end_num, - count, reverse) + return self._get_further_blob_infos(stream_hash, start_num, end_num, + count, reverse) - d = threads.deferToThread(get_positions_of_start_and_end) + d = get_positions_of_start_and_end() d.addCallback(get_blob_infos) return d def get_stream_of_blob(self, blob_hash): - return threads.deferToThread(self._get_stream_of_blobhash, blob_hash) + return self._get_stream_of_blobhash(blob_hash) def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - return threads.deferToThread(self._save_sd_blob_hash_to_stream, stream_hash, sd_blob_hash) + return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) def get_sd_blob_hashes_for_stream(self, stream_hash): - return threads.deferToThread(self._get_sd_blob_hashes_for_stream, stream_hash) + return self._get_sd_blob_hashes_for_stream(stream_hash) def _open_db(self): - self.stream_info_db = leveldb.LevelDB(os.path.join(self.db_dir, "lbryfile_info.db")) - self.stream_blob_db = leveldb.LevelDB(os.path.join(self.db_dir, "lbryfile_blob.db")) - self.stream_desc_db = leveldb.LevelDB(os.path.join(self.db_dir, "lbryfile_desc.db")) + # check_same_thread=False is solely to quiet a spurious error that appears to be due + # to a bug in twisted, where the connection is closed by a different thread than the + # one that opened it. The individual connections in the pool are not used in multiple + # threads. + self.db_conn = adbapi.ConnectionPool("sqlite3", (os.path.join(self.db_dir, "lbryfile_info.db")), + check_same_thread=False) + def create_tables(transaction): + transaction.execute("create table if not exists lbry_files (" + + " stream_hash text primary key, " + + " key text, " + + " stream_name text, " + + " suggested_file_name text" + + ")") + transaction.execute("create table if not exists lbry_file_blobs (" + + " blob_hash text, " + + " stream_hash text, " + + " position integer, " + + " iv text, " + + " length integer, " + + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + transaction.execute("create table if not exists lbry_file_descriptors (" + + " sd_blob_hash TEXT PRIMARY KEY, " + + " stream_hash TEXT, " + + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + + return self.db_conn.runInteraction(create_tables) + + @rerun_if_locked def _delete_stream(self, stream_hash): - desc_batch = leveldb.WriteBatch() - for sd_blob_hash, s_h in self.stream_desc_db.RangeIter(): - if stream_hash == s_h: - desc_batch.Delete(sd_blob_hash) - self.stream_desc_db.Write(desc_batch, sync=True) + d = self.db_conn.runQuery("select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) + d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) - blob_batch = leveldb.WriteBatch() - for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter(): - b_h, s_h = json.loads(blob_hash_stream_hash) - if stream_hash == s_h: - blob_batch.Delete(blob_hash_stream_hash) - self.stream_blob_db.Write(blob_batch, sync=True) + def do_delete(transaction, s_h): + transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) + transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,)) + transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,)) - stream_batch = leveldb.WriteBatch() - for s_h, stream_info in self.stream_info_db.RangeIter(): - if stream_hash == s_h: - stream_batch.Delete(s_h) - self.stream_info_db.Write(stream_batch, sync=True) + d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h)) + return d + @rerun_if_locked def _store_stream(self, stream_hash, name, key, suggested_file_name): - try: - self.stream_info_db.Get(stream_hash) - raise DuplicateStreamHashError("Stream hash %s already exists" % stream_hash) - except KeyError: - pass - self.stream_info_db.Put(stream_hash, json.dumps((key, name, suggested_file_name)), sync=True) + d = self.db_conn.runQuery("insert into lbry_files values (?, ?, ?, ?)", + (stream_hash, key, name, suggested_file_name)) + def check_duplicate(err): + if err.check(sqlite3.IntegrityError): + raise DuplicateStreamHashError(stream_hash) + return err + + d.addErrback(check_duplicate) + return d + + @rerun_if_locked def _get_all_streams(self): - return [stream_hash for stream_hash, stream_info in self.stream_info_db.RangeIter()] + d = self.db_conn.runQuery("select stream_hash from lbry_files") + d.addCallback(lambda results: [r[0] for r in results]) + return d + @rerun_if_locked def _get_stream_info(self, stream_hash): - return json.loads(self.stream_info_db.Get(stream_hash))[:3] + d = self.db_conn.runQuery("select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) + return d + @rerun_if_locked def _check_if_stream_exists(self, stream_hash): - try: - self.stream_info_db.Get(stream_hash) - return True - except KeyError: - return False + d = self.db_conn.runQuery("select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) + d.addCallback(lambda r: True if len(r) else False) + return d + @rerun_if_locked def _get_blob_num_by_hash(self, stream_hash, blob_hash): - blob_hash_stream_hash = json.dumps((blob_hash, stream_hash)) - return json.loads(self.stream_blob_db.Get(blob_hash_stream_hash))[0] + d = self.db_conn.runQuery("select position from lbry_file_blobs where stream_hash = ? and blob_hash = ?", + (stream_hash, blob_hash)) + d.addCallback(lambda r: r[0][0] if len(r) else None) + return d + @rerun_if_locked def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): - blob_infos = [] - for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter(): - b_h, s_h = json.loads(blob_hash_stream_hash) - if stream_hash == s_h: - position, iv, length = json.loads(blob_info) - if (start_num is None) or (position > start_num): - if (end_num is None) or (position < end_num): - blob_infos.append((b_h, position, iv, length)) - blob_infos.sort(key=lambda i: i[1], reverse=reverse) + params = [] + q_string = "select * from (" + q_string += " select blob_hash, position, iv, length from lbry_file_blobs " + q_string += " where stream_hash = ? " + params.append(stream_hash) + if start_num is not None: + q_string += " and position > ? " + params.append(start_num) + if end_num is not None: + q_string += " and position < ? " + params.append(end_num) + q_string += " order by position " + if reverse is True: + q_string += " DESC " if count is not None: - blob_infos = blob_infos[:count] - return blob_infos + q_string += " limit ? " + params.append(count) + q_string += ") order by position" + # Order by position is done twice so that it always returns them from lowest position to + # greatest, but the limit by clause can select the 'count' greatest or 'count' least + return self.db_conn.runQuery(q_string, tuple(params)) + @rerun_if_locked def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False): - batch = leveldb.WriteBatch() - for blob_info in blob_infos: - blob_hash_stream_hash = json.dumps((blob_info.blob_hash, stream_hash)) - try: - self.stream_blob_db.Get(blob_hash_stream_hash) - if ignore_duplicate_error is False: - raise KeyError() # TODO: change this to DuplicateStreamBlobError? - continue - except KeyError: - pass - batch.Put(blob_hash_stream_hash, - json.dumps((blob_info.blob_num, - blob_info.iv, - blob_info.length))) - self.stream_blob_db.Write(batch, sync=True) + def add_blobs(transaction): + for blob_info in blob_infos: + try: + transaction.execute("insert into lbry_file_blobs values (?, ?, ?, ?, ?)", + (blob_info.blob_hash, stream_hash, blob_info.blob_num, + blob_info.iv, blob_info.length)) + except sqlite3.IntegrityError: + if ignore_duplicate_error is False: + raise + + return self.db_conn.runInteraction(add_blobs) + + @rerun_if_locked def _get_stream_of_blobhash(self, blob_hash): - for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter(): - b_h, s_h = json.loads(blob_hash_stream_hash) - if blob_hash == b_h: - return s_h - return None + d = self.db_conn.runQuery("select stream_hash from lbry_file_blobs where blob_hash = ?", + (blob_hash,)) + d.addCallback(lambda r: r[0][0] if len(r) else None) + return d + @rerun_if_locked def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - self.stream_desc_db.Put(sd_blob_hash, stream_hash) + return self.db_conn.runQuery("insert into lbry_file_descriptors values (?, ?)", + (sd_blob_hash, stream_hash)) + @rerun_if_locked def _get_sd_blob_hashes_for_stream(self, stream_hash): - return [sd_blob_hash for sd_blob_hash, s_h in self.stream_desc_db.RangeIter() if stream_hash == s_h] + d = self.db_conn.runQuery("select sd_blob_hash from lbry_file_descriptors where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda results: [r[0] for r in results]) + return d class TempLBRYFileMetadataManager(object): diff --git a/lbrynet/lbryfilemanager/LBRYFileManager.py b/lbrynet/lbryfilemanager/LBRYFileManager.py index 0400269a4..3f95d961d 100644 --- a/lbrynet/lbryfilemanager/LBRYFileManager.py +++ b/lbrynet/lbryfilemanager/LBRYFileManager.py @@ -5,7 +5,7 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi import logging import json -import leveldb +from twisted.enterprise import adbapi import os from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader @@ -15,32 +15,30 @@ from lbrynet.core.PaymentRateManager import PaymentRateManager from twisted.internet import threads, defer, task, reactor from twisted.python.failure import Failure from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError, CurrentlyStoppingError +from lbrynet.core.sqlite_helpers import rerun_if_locked class LBRYFileManager(object): """ Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata. """ - SETTING = "s" - LBRYFILE_STATUS = "t" - LBRYFILE_OPTIONS = "o" def __init__(self, session, stream_info_manager, sd_identifier): self.session = session self.stream_info_manager = stream_info_manager self.sd_identifier = sd_identifier self.lbry_files = [] - self.db = None + self.sql_db = None self.download_directory = os.getcwd() def setup(self): - d = threads.deferToThread(self._open_db) + d = self._open_db() d.addCallback(lambda _: self._add_to_sd_identifier()) d.addCallback(lambda _: self._start_lbry_files()) return d def get_all_lbry_file_stream_hashes_and_options(self): - d = threads.deferToThread(self._get_all_lbry_file_stream_hashes) + d = self._get_all_lbry_file_stream_hashes() def get_options(stream_hashes): ds = [] @@ -60,26 +58,20 @@ class LBRYFileManager(object): return d def get_lbry_file_status(self, stream_hash): - return threads.deferToThread(self._get_lbry_file_status, stream_hash) - - def save_lbry_file_options(self, stream_hash, blob_data_rate): - return threads.deferToThread(self._save_lbry_file_options, stream_hash, blob_data_rate) + return self._get_lbry_file_status(stream_hash) def get_lbry_file_options(self, stream_hash): - return threads.deferToThread(self._get_lbry_file_options, stream_hash) + return self._get_lbry_file_options(stream_hash) def delete_lbry_file_options(self, stream_hash): - return threads.deferToThread(self._delete_lbry_file_options, stream_hash) + return self._delete_lbry_file_options(stream_hash) def set_lbry_file_data_payment_rate(self, stream_hash, new_rate): - return threads.deferToThread(self._set_lbry_file_payment_rate, stream_hash, new_rate) + return self._set_lbry_file_payment_rate(stream_hash, new_rate) def change_lbry_file_status(self, stream_hash, status): logging.debug("Changing status of %s to %s", stream_hash, status) - return threads.deferToThread(self._change_file_status, stream_hash, status) - - def delete_lbry_file_status(self, stream_hash): - return threads.deferToThread(self._delete_lbry_file_status, stream_hash) + return self._change_file_status(stream_hash, status) def get_lbry_file_status_reports(self): ds = [] @@ -129,7 +121,7 @@ class LBRYFileManager(object): self.download_directory, upload_allowed) self.lbry_files.append(lbry_file_downloader) - d = self.save_lbry_file_options(stream_hash, blob_data_rate) + d = self.set_lbry_file_data_payment_rate(stream_hash, blob_data_rate) d.addCallback(lambda _: lbry_file_downloader.set_stream_info()) d.addCallback(lambda _: lbry_file_downloader) return d @@ -161,7 +153,6 @@ class LBRYFileManager(object): d.addCallback(lambda _: remove_from_list()) d.addCallback(lambda _: self.delete_lbry_file_options(stream_hash)) - d.addCallback(lambda _: self.delete_lbry_file_status(stream_hash)) return d def toggle_lbry_file_running(self, stream_hash): @@ -207,47 +198,52 @@ class LBRYFileManager(object): ######### database calls ######### def _open_db(self): - self.db = leveldb.LevelDB(os.path.join(self.session.db_dir, "lbryfiles.db")) + # check_same_thread=False is solely to quiet a spurious error that appears to be due + # to a bug in twisted, where the connection is closed by a different thread than the + # one that opened it. The individual connections in the pool are not used in multiple + # threads. + self.sql_db = adbapi.ConnectionPool("sqlite3", os.path.join(self.session.db_dir, "lbryfile_info.db"), + check_same_thread=False) + #self.unql_db = unqlite.UnQLite(os.path.join(self.session.db_dir, "lbryfile_manager.db")) - def _save_payment_rate(self, rate_type, rate): - if rate is not None: - self.db.Put(json.dumps((self.SETTING, rate_type)), json.dumps(rate), sync=True) - else: - self.db.Delete(json.dumps((self.SETTING, rate_type)), sync=True) - - def _save_lbry_file_options(self, stream_hash, blob_data_rate): - self.db.Put(json.dumps((self.LBRYFILE_OPTIONS, stream_hash)), json.dumps((blob_data_rate,)), - sync=True) + return self.sql_db.runQuery("create table if not exists lbry_file_options (" + + " blob_data_rate real, " + + " status text," + + " stream_hash text," + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + @rerun_if_locked def _get_lbry_file_options(self, stream_hash): - try: - return json.loads(self.db.Get(json.dumps((self.LBRYFILE_OPTIONS, stream_hash)))) - except KeyError: - return None, None + d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda result: result[0] if len(result) else (None, )) + return d + @rerun_if_locked def _delete_lbry_file_options(self, stream_hash): - self.db.Delete(json.dumps((self.LBRYFILE_OPTIONS, stream_hash)), sync=True) + return self.sql_db.runQuery("delete from lbry_file_options where stream_hash = ?", + (stream_hash,)) + @rerun_if_locked def _set_lbry_file_payment_rate(self, stream_hash, new_rate): + return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where stream_hash = ?", + (new_rate, stream_hash)) - self.db.Put(json.dumps((self.LBRYFILE_OPTIONS, stream_hash)), json.dumps((new_rate, )), sync=True) - + @rerun_if_locked def _get_all_lbry_file_stream_hashes(self): - hashes = [] - for k, v in self.db.RangeIter(): - key_type, stream_hash = json.loads(k) - if key_type == self.LBRYFILE_STATUS: - hashes.append(stream_hash) - return hashes + d = self.sql_db.runQuery("select stream_hash from lbry_file_options") + d.addCallback(lambda results: [r[0] for r in results]) + return d + @rerun_if_locked def _change_file_status(self, stream_hash, new_status): - self.db.Put(json.dumps((self.LBRYFILE_STATUS, stream_hash)), new_status, sync=True) + return self.sql_db.runQuery("update lbry_file_options set status = ? where stream_hash = ?", + (new_status, stream_hash)) + @rerun_if_locked def _get_lbry_file_status(self, stream_hash): - try: - return self.db.Get(json.dumps((self.LBRYFILE_STATUS, stream_hash))) - except KeyError: - return ManagedLBRYFileDownloader.STATUS_STOPPED - - def _delete_lbry_file_status(self, stream_hash): - self.db.Delete(json.dumps((self.LBRYFILE_STATUS, stream_hash)), sync=True) \ No newline at end of file + d = self.sql_db.runQuery("select status from lbry_file_options where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda r: r[0][0] if len(r) else ManagedLBRYFileDownloader.STATUS_STOPPED) + return d \ No newline at end of file diff --git a/lbrynet/lbrylive/LiveStreamMetadataManager.py b/lbrynet/lbrylive/LiveStreamMetadataManager.py index 703197493..1ca356421 100644 --- a/lbrynet/lbrylive/LiveStreamMetadataManager.py +++ b/lbrynet/lbrylive/LiveStreamMetadataManager.py @@ -1,11 +1,13 @@ import time import logging -import leveldb -import json +from twisted.enterprise import adbapi import os +import sqlite3 from twisted.internet import threads, defer +from twisted.python.failure import Failure from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier -from lbrynet.core.Error import DuplicateStreamHashError +from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError +from lbrynet.core.sqlite_helpers import rerun_if_locked class DBLiveStreamMetadataManager(DHTHashSupplier): @@ -14,26 +16,22 @@ class DBLiveStreamMetadataManager(DHTHashSupplier): def __init__(self, db_dir, hash_announcer): DHTHashSupplier.__init__(self, hash_announcer) self.db_dir = db_dir - self.stream_info_db = None - self.stream_blob_db = None - self.stream_desc_db = None + self.db_conn = None def setup(self): - return threads.deferToThread(self._open_db) + return self._open_db() def stop(self): - self.stream_info_db = None - self.stream_blob_db = None - self.stream_desc_db = None + self.db_conn = None return defer.succeed(True) def get_all_streams(self): - return threads.deferToThread(self._get_all_streams) + return self._get_all_streams() def save_stream(self, stream_hash, pub_key, file_name, key, blobs): next_announce_time = time.time() + self.hash_reannounce_time - d = threads.deferToThread(self._store_stream, stream_hash, pub_key, file_name, key, - next_announce_time=next_announce_time) + d = self._store_stream(stream_hash, pub_key, file_name, key, + next_announce_time=next_announce_time) def save_blobs(): return self.add_blobs_to_stream(stream_hash, blobs) @@ -48,169 +46,229 @@ class DBLiveStreamMetadataManager(DHTHashSupplier): return d def get_stream_info(self, stream_hash): - return threads.deferToThread(self._get_stream_info, stream_hash) + return self._get_stream_info(stream_hash) def check_if_stream_exists(self, stream_hash): - return threads.deferToThread(self._check_if_stream_exists, stream_hash) + return self._check_if_stream_exists(stream_hash) def delete_stream(self, stream_hash): - return threads.deferToThread(self._delete_stream, stream_hash) + return self._delete_stream(stream_hash) def add_blobs_to_stream(self, stream_hash, blobs): - - def add_blobs(): - self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True) - - return threads.deferToThread(add_blobs) + return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True) def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): logging.info("Getting blobs for a stream. Count is %s", str(count)) def get_positions_of_start_and_end(): if start_blob is not None: - start_num = self._get_blob_num_by_hash(stream_hash, start_blob) + d1 = self._get_blob_num_by_hash(stream_hash, start_blob) else: - start_num = None + d1 = defer.succeed(None) if end_blob is not None: - end_num = self._get_blob_num_by_hash(stream_hash, end_blob) + d2 = self._get_blob_num_by_hash(stream_hash, end_blob) else: + d2 = defer.succeed(None) + + dl = defer.DeferredList([d1, d2]) + + def get_positions(results): + start_num = None end_num = None - return start_num, end_num + if results[0][0] is True: + start_num = results[0][1] + if results[1][0] is True: + end_num = results[1][1] + return start_num, end_num + + dl.addCallback(get_positions) + return dl def get_blob_infos(nums): start_num, end_num = nums - return threads.deferToThread(self._get_further_blob_infos, stream_hash, start_num, end_num, - count, reverse) + return self._get_further_blob_infos(stream_hash, start_num, end_num, + count, reverse) - d = threads.deferToThread(get_positions_of_start_and_end) + d = get_positions_of_start_and_end() d.addCallback(get_blob_infos) return d def get_stream_of_blob(self, blob_hash): - return threads.deferToThread(self._get_stream_of_blobhash, blob_hash) + return self._get_stream_of_blobhash(blob_hash) def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - return threads.deferToThread(self._save_sd_blob_hash_to_stream, stream_hash, sd_blob_hash) + return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) def get_sd_blob_hashes_for_stream(self, stream_hash): - return threads.deferToThread(self._get_sd_blob_hashes_for_stream, stream_hash) + return self._get_sd_blob_hashes_for_stream(stream_hash) def hashes_to_announce(self): next_announce_time = time.time() + self.hash_reannounce_time - return threads.deferToThread(self._get_streams_to_announce, next_announce_time) + return self._get_streams_to_announce(next_announce_time) ######### database calls ######### def _open_db(self): - self.stream_info_db = leveldb.LevelDB(os.path.join(self.db_dir, "stream_info.db")) - self.stream_blob_db = leveldb.LevelDB(os.path.join(self.db_dir, "stream_blob.db")) - self.stream_desc_db = leveldb.LevelDB(os.path.join(self.db_dir, "stream_desc.db")) + # check_same_thread=False is solely to quiet a spurious error that appears to be due + # to a bug in twisted, where the connection is closed by a different thread than the + # one that opened it. The individual connections in the pool are not used in multiple + # threads. + self.db_conn = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "live_stream.db"), + check_same_thread=False) + def create_tables(transaction): + transaction.execute("create table if not exists live_streams (" + + " stream_hash text primary key, " + + " public_key text, " + + " key text, " + + " stream_name text, " + + " next_announce_time real" + + ")") + transaction.execute("create table if not exists live_stream_blobs (" + + " blob_hash text, " + + " stream_hash text, " + + " position integer, " + + " revision integer, " + + " iv text, " + + " length integer, " + + " signature text, " + + " foreign key(stream_hash) references live_streams(stream_hash)" + + ")") + transaction.execute("create table if not exists live_stream_descriptors (" + + " sd_blob_hash TEXT PRIMARY KEY, " + + " stream_hash TEXT, " + + " foreign key(stream_hash) references live_streams(stream_hash)" + + ")") + + return self.db_conn.runInteraction(create_tables) + + @rerun_if_locked def _delete_stream(self, stream_hash): - desc_batch = leveldb.WriteBatch() - for sd_blob_hash, s_h in self.stream_desc_db.RangeIter(): - if stream_hash == s_h: - desc_batch.Delete(sd_blob_hash) - self.stream_desc_db.Write(desc_batch, sync=True) - blob_batch = leveldb.WriteBatch() - for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter(): - b_h, s_h = json.loads(blob_hash_stream_hash) - if stream_hash == s_h: - blob_batch.Delete(blob_hash_stream_hash) - self.stream_blob_db.Write(blob_batch, sync=True) + d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) + d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) - stream_batch = leveldb.WriteBatch() - for s_h, stream_info in self.stream_info_db.RangeIter(): - if stream_hash == s_h: - stream_batch.Delete(s_h) - self.stream_info_db.Write(stream_batch, sync=True) + def do_delete(transaction, s_h): + transaction.execute("delete from live_streams where stream_hash = ?", (s_h,)) + transaction.execute("delete from live_stream_blobs where stream_hash = ?", (s_h,)) + transaction.execute("delete from live_stream_descriptors where stream_hash = ?", (s_h,)) + d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h)) + return d + + @rerun_if_locked def _store_stream(self, stream_hash, public_key, name, key, next_announce_time=None): - try: - self.stream_info_db.Get(stream_hash) - raise DuplicateStreamHashError("Stream hash %s already exists" % stream_hash) - except KeyError: - pass - self.stream_info_db.Put(stream_hash, json.dumps((public_key, key, name, next_announce_time)), sync=True) + d = self.db_conn.runQuery("insert into live_streams values (?, ?, ?, ?, ?)", + (stream_hash, public_key, key, name, next_announce_time)) + def check_duplicate(err): + if err.check(sqlite3.IntegrityError): + raise DuplicateStreamHashError(stream_hash) + return err + + d.addErrback(check_duplicate) + return d + + @rerun_if_locked def _get_all_streams(self): - return [stream_hash for stream_hash, stream_info in self.stream_info_db.RangeIter()] + d = self.db_conn.runQuery("select stream_hash from live_streams") + d.addCallback(lambda results: [r[0] for r in results]) + return d + @rerun_if_locked def _get_stream_info(self, stream_hash): - return json.loads(self.stream_info_db.Get(stream_hash))[:3] + d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) + return d + @rerun_if_locked def _check_if_stream_exists(self, stream_hash): - try: - self.stream_info_db.Get(stream_hash) - return True - except KeyError: - return False + d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) + d.addCallback(lambda r: True if len(r) else False) + return d + @rerun_if_locked def _get_streams_to_announce(self, next_announce_time): - # TODO: See if the following would be better for handling announce times: - # TODO: Have a separate db for them, and read the whole thing into memory - # TODO: on startup, and then write changes to db when they happen - stream_hashes = [] - batch = leveldb.WriteBatch() - current_time = time.time() - for stream_hash, stream_info in self.stream_info_db.RangeIter(): - public_key, key, name, announce_time = json.loads(stream_info) - if announce_time < current_time: - batch.Put(stream_hash, json.dumps((public_key, key, name, next_announce_time))) - stream_hashes.append(stream_hash) - self.stream_info_db.Write(batch, sync=True) - return stream_hashes + def get_and_update(transaction): + timestamp = time.time() + r = transaction.execute("select stream_hash from live_streams where" + + " (next_announce_time is null or next_announce_time < ?) " + + " and stream_hash is not null", (timestamp, )) + s_hs = [s_h for s_h, in r.fetchall()] + transaction.execute("update live_streams set next_announce_time = ? where " + + " (next_announce_time is null or next_announce_time < ?)", + (next_announce_time, timestamp)) + return s_hs + + return self.db_conn.runInteraction(get_and_update) + + @rerun_if_locked def _get_blob_num_by_hash(self, stream_hash, blob_hash): - blob_hash_stream_hash = json.dumps((blob_hash, stream_hash)) - return json.loads(self.stream_blob_db.Get(blob_hash_stream_hash))[0] + d = self.db_conn.runQuery("select position from live_stream_blobs where stream_hash = ? and blob_hash = ?", + (stream_hash, blob_hash)) + d.addCallback(lambda r: r[0][0] if len(r) else None) + return d + @rerun_if_locked def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): - blob_infos = [] - for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter(): - b_h, s_h = json.loads(blob_hash_stream_hash) - if stream_hash == s_h: - position, revision, iv, length, signature = json.loads(blob_info) - if (start_num is None) or (position > start_num): - if (end_num is None) or (position < end_num): - blob_infos.append((b_h, position, revision, iv, length, signature)) - blob_infos.sort(key=lambda i: i[1], reverse=reverse) + params = [] + q_string = "select * from (" + q_string += " select blob_hash, position, revision, iv, length, signature from live_stream_blobs " + q_string += " where stream_hash = ? " + params.append(stream_hash) + if start_num is not None: + q_string += " and position > ? " + params.append(start_num) + if end_num is not None: + q_string += " and position < ? " + params.append(end_num) + q_string += " order by position " + if reverse is True: + q_string += " DESC " if count is not None: - blob_infos = blob_infos[:count] - return blob_infos + q_string += " limit ? " + params.append(count) + q_string += ") order by position" + # Order by position is done twice so that it always returns them from lowest position to + # greatest, but the limit by clause can select the 'count' greatest or 'count' least + return self.db_conn.runQuery(q_string, tuple(params)) + @rerun_if_locked def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False): - batch = leveldb.WriteBatch() - for blob_info in blob_infos: - blob_hash_stream_hash = json.dumps((blob_info.blob_hash, stream_hash)) - try: - self.stream_blob_db.Get(blob_hash_stream_hash) - if ignore_duplicate_error is False: - raise KeyError() # TODO: change this to DuplicateStreamBlobError? - continue - except KeyError: - pass - batch.Put(blob_hash_stream_hash, - json.dumps((blob_info.blob_num, - blob_info.revision, - blob_info.iv, - blob_info.length, - blob_info.signature))) - self.stream_blob_db.Write(batch, sync=True) + def add_blobs(transaction): + for blob_info in blob_infos: + try: + transaction.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)", + (blob_info.blob_hash, stream_hash, blob_info.blob_num, + blob_info.revision, blob_info.iv, blob_info.length, + blob_info.signature)) + except sqlite3.IntegrityError: + if ignore_duplicate_error is False: + raise + + return self.db_conn.runInteraction(add_blobs) + + @rerun_if_locked def _get_stream_of_blobhash(self, blob_hash): - for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter(): - b_h, s_h = json.loads(blob_hash_stream_hash) - if blob_hash == b_h: - return s_h - return None + d = self.db_conn.runQuery("select stream_hash from live_stream_blobs where blob_hash = ?", + (blob_hash,)) + d.addCallback(lambda r: r[0][0] if len(r) else None) + return d + @rerun_if_locked def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - self.stream_desc_db.Put(sd_blob_hash, stream_hash) + return self.db_conn.runQuery("insert into live_stream_descriptors values (?, ?)", + (sd_blob_hash, stream_hash)) + @rerun_if_locked def _get_sd_blob_hashes_for_stream(self, stream_hash): - return [sd_blob_hash for sd_blob_hash, s_h in self.stream_desc_db.RangeIter() if stream_hash == s_h] + d = self.db_conn.runQuery("select sd_blob_hash from live_stream_descriptors where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda results: [r[0] for r in results]) + return d class TempLiveStreamMetadataManager(DHTHashSupplier): diff --git a/lbrynet/lbrynet_console/LBRYConsole.py b/lbrynet/lbrynet_console/LBRYConsole.py index d15ae3892..02a5cf29a 100644 --- a/lbrynet/lbrynet_console/LBRYConsole.py +++ b/lbrynet/lbrynet_console/LBRYConsole.py @@ -59,6 +59,8 @@ class LBRYConsole(): self.lbry_file_metadata_manager = None self.lbry_file_manager = None self.conf_dir = conf_dir + self.created_db_dir = False + self.current_db_revision = 1 self.data_dir = data_dir self.plugin_manager = PluginManager() self.plugin_manager.setPluginPlaces([ @@ -72,10 +74,13 @@ class LBRYConsole(): self.blob_request_payment_rate_manager = None self.lbryid = None self.sd_identifier = StreamDescriptorIdentifier() + self.plugin_objects = [] + self.db_migration_revisions = None def start(self): """Initialize the session and restore everything to its saved state""" d = threads.deferToThread(self._create_directory) + d.addCallback(lambda _: self._check_db_migration()) d.addCallback(lambda _: self._get_settings()) d.addCallback(lambda _: self._get_session()) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) @@ -95,7 +100,10 @@ class LBRYConsole(): def shut_down(self): """Stop the session, all currently running streams, and stop the server""" - d = self.session.shut_down() + if self.session is not None: + d = self.session.shut_down() + else: + d = defer.succeed(True) d.addCallback(lambda _: self._shut_down()) return d @@ -121,11 +129,38 @@ class LBRYConsole(): def _create_directory(self): if not os.path.exists(self.conf_dir): os.makedirs(self.conf_dir) + db_revision = open(os.path.join(self.conf_dir, "db_revision"), mode='w') + db_revision.write(str(self.current_db_revision)) + db_revision.close() logging.debug("Created the configuration directory: %s", str(self.conf_dir)) if not os.path.exists(self.data_dir): os.makedirs(self.data_dir) logging.debug("Created the data directory: %s", str(self.data_dir)) + def _check_db_migration(self): + old_revision = 0 + db_revision_file = os.path.join(self.conf_dir, "db_revision") + if os.path.exists(db_revision_file): + old_revision = int(open(db_revision_file).read().strip()) + if old_revision < self.current_db_revision: + from lbrynet.db_migrator import dbmigrator + print "Upgrading your databases..." + d = threads.deferToThread(dbmigrator.migrate_db, self.conf_dir, old_revision, self.current_db_revision) + + def print_success(old_dirs): + success_string = "Finished upgrading the databases. It is now safe to delete the" + success_string += " following directories, if you feel like it. It won't make any" + success_string += " difference.\nAnyway here they are: " + for i, old_dir in enumerate(old_dirs): + success_string += old_dir + if i + 1 < len(old_dir): + success_string += ", " + print success_string + + d.addCallback(print_success) + return d + return defer.succeed(True) + def _get_settings(self): d = self.settings.start() d.addCallback(lambda _: self.settings.get_lbryid()) @@ -312,12 +347,19 @@ class LBRYConsole(): def setup_plugins(): ds = [] for plugin in self.plugin_manager.getAllPlugins(): + self.plugin_objects.append(plugin.plugin_object) ds.append(plugin.plugin_object.setup(self)) return defer.DeferredList(ds) d.addCallback(lambda _: setup_plugins()) return d + def _stop_plugins(self): + ds = [] + for plugin_object in self.plugin_objects: + ds.append(defer.maybeDeferred(plugin_object.stop)) + return defer.DeferredList(ds) + def _setup_server(self): def restore_running_status(running): @@ -359,6 +401,7 @@ class LBRYConsole(): d.addCallback(lambda _: self.lbry_file_manager.stop()) ds.append(d) ds.append(self.stop_server()) + ds.append(self._stop_plugins()) dl = defer.DeferredList(ds) return dl diff --git a/lbrynet/lbrynet_console/LBRYPlugin.py b/lbrynet/lbrynet_console/LBRYPlugin.py index 2a22f6bfc..c68ed68d5 100644 --- a/lbrynet/lbrynet_console/LBRYPlugin.py +++ b/lbrynet/lbrynet_console/LBRYPlugin.py @@ -7,4 +7,7 @@ class LBRYPlugin(IPlugin): IPlugin.__init__(self) def setup(self, lbry_console): + raise NotImplementedError + + def stop(self): raise NotImplementedError \ No newline at end of file diff --git a/lbrynet/lbrynet_console/LBRYSettings.py b/lbrynet/lbrynet_console/LBRYSettings.py index a1909044f..5610a6656 100644 --- a/lbrynet/lbrynet_console/LBRYSettings.py +++ b/lbrynet/lbrynet_console/LBRYSettings.py @@ -1,6 +1,6 @@ import binascii import json -import leveldb +import unqlite import logging import os from twisted.internet import threads, defer @@ -12,7 +12,7 @@ class LBRYSettings(object): self.db = None def start(self): - return threads.deferToThread(self._open_db) + return self._open_db() def stop(self): self.db = None @@ -20,21 +20,22 @@ class LBRYSettings(object): def _open_db(self): logging.debug("Opening %s as the settings database", str(os.path.join(self.db_dir, "settings.db"))) - self.db = leveldb.LevelDB(os.path.join(self.db_dir, "settings.db")) + self.db = unqlite.UnQLite(os.path.join(self.db_dir, "settings.db")) + return defer.succeed(True) def save_lbryid(self, lbryid): def save_lbryid(): - self.db.Put("lbryid", binascii.hexlify(lbryid), sync=True) + self.db['lbryid'] = binascii.hexlify(lbryid) return threads.deferToThread(save_lbryid) def get_lbryid(self): def get_lbryid(): - try: - return binascii.unhexlify(self.db.Get("lbryid")) - except KeyError: + if 'lbryid' in self.db: + return binascii.unhexlify(self.db['lbryid']) + else: return None return threads.deferToThread(get_lbryid) @@ -42,9 +43,9 @@ class LBRYSettings(object): def get_server_running_status(self): def get_status(): - try: - return json.loads(self.db.Get("server_running")) - except KeyError: + if 'server_running' in self.db: + return json.loads(self.db['server_running']) + else: return True return threads.deferToThread(get_status) @@ -52,7 +53,7 @@ class LBRYSettings(object): def save_server_running_status(self, running): def save_status(): - self.db.Put("server_running", json.dumps(running), sync=True) + self.db['server_running'] = json.dumps(running) return threads.deferToThread(save_status) @@ -77,9 +78,9 @@ class LBRYSettings(object): def _get_payment_rate(self, rate_type): def get_rate(): - try: - return json.loads(self.db.Get(rate_type)) - except KeyError: + if rate_type in self.db: + return json.loads(self.db['rate_type']) + else: return None return threads.deferToThread(get_rate) @@ -88,18 +89,18 @@ class LBRYSettings(object): def save_rate(): if rate is not None: - self.db.Put(rate_type, json.dumps(rate), sync=True) - else: - self.db.Delete(rate_type, sync=True) + self.db[rate_type] = json.dumps(rate) + elif rate_type in self.db: + del self.db[rate_type] return threads.deferToThread(save_rate) def get_query_handler_status(self, query_identifier): def get_status(): - try: - return json.loads(self.db.Get(json.dumps(('q_h', query_identifier)))) - except KeyError: + if json.dumps(('q_h', query_identifier)) in self.db: + return json.loads(self.db[(json.dumps(('q_h', query_identifier)))]) + else: return True return threads.deferToThread(get_status) @@ -112,5 +113,5 @@ class LBRYSettings(object): def _set_query_handler_status(self, query_identifier, status): def set_status(): - self.db.Put(json.dumps(('q_h', query_identifier)), json.dumps(status), sync=True) + self.db[json.dumps(('q_h', query_identifier))] = json.dumps(status) return threads.deferToThread(set_status) \ No newline at end of file diff --git a/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindInfoManager.py b/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindInfoManager.py index 7815789c0..61051065c 100644 --- a/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindInfoManager.py +++ b/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindInfoManager.py @@ -1,25 +1,43 @@ -from twisted.internet import threads, defer +from twisted.internet import defer from ValuableBlobInfo import ValuableBlobInfo -from db_keys import BLOB_INFO_TYPE -import json -import leveldb +import os +import sqlite3 +from twisted.enterprise import adbapi +from lbrynet.core.sqlite_helpers import rerun_if_locked class BlindInfoManager(object): - def __init__(self, db, peer_manager): - self.db = db + def __init__(self, db_dir, peer_manager): + self.db_dir = db_dir + self.db_conn = None self.peer_manager = peer_manager def setup(self): - return defer.succeed(True) + # check_same_thread=False is solely to quiet a spurious error that appears to be due + # to a bug in twisted, where the connection is closed by a different thread than the + # one that opened it. The individual connections in the pool are not used in multiple + # threads. + self.db_conn = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blind_info.db"), + check_same_thread=False) + + def set_up_table(transaction): + transaction.execute("create table if not exists valuable_blobs (" + + " blob_hash text primary key, " + + " blob_length integer, " + + " reference text, " + + " peer_host text, " + + " peer_port integer, " + + " peer_score text" + + ")") + return self.db_conn.runInteraction(set_up_table) def stop(self): self.db = None return defer.succeed(True) def get_all_blob_infos(self): - d = threads.deferToThread(self._get_all_blob_infos) + d = self._get_all_blob_infos() def make_blob_infos(blob_data): blob_infos = [] @@ -42,21 +60,19 @@ class BlindInfoManager(object): peer_port = blob_info.peer.port peer_score = blob_info.peer_score blobs.append((blob_hash, length, reference, peer_host, peer_port, peer_score)) - return threads.deferToThread(self._save_blob_infos, blobs) + return self._save_blob_infos(blobs) + @rerun_if_locked def _get_all_blob_infos(self): - blob_infos = [] - for key, blob_info in self.db.RangeIter(): - key_type, blob_hash = json.loads(key) - if key_type == BLOB_INFO_TYPE: - blob_infos.append([blob_hash] + json.loads(blob_info)) - return blob_infos + return self.db_conn.runQuery("select * from valuable_blobs") + @rerun_if_locked def _save_blob_infos(self, blobs): - batch = leveldb.WriteBatch() - for blob in blobs: - try: - self.db.Get(json.dumps((BLOB_INFO_TYPE, blob[0]))) - except KeyError: - batch.Put(json.dumps((BLOB_INFO_TYPE, blob[0])), json.dumps(blob[1:])) - self.db.Write(batch, sync=True) \ No newline at end of file + def save_infos(transaction): + for blob in blobs: + try: + transaction.execute("insert into valuable_blobs values (?, ?, ?, ?, ?, ?)", + blob) + except sqlite3.IntegrityError: + pass + return self.db_conn.runInteraction(save_infos) \ No newline at end of file diff --git a/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindRepeaterSettings.py b/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindRepeaterSettings.py index c7f4353d8..c20402b6f 100644 --- a/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindRepeaterSettings.py +++ b/lbrynet/lbrynet_console/plugins/BlindRepeater/BlindRepeaterSettings.py @@ -1,67 +1,79 @@ -from db_keys import SETTING_TYPE, PEER_TYPE -from twisted.internet import threads +from twisted.internet import threads, defer import json - +import unqlite +import os +from twisted.enterprise import adbapi +from lbrynet.core.sqlite_helpers import rerun_if_locked class BlindRepeaterSettings(object): - def __init__(self, db): - self.db = db + def __init__(self, db_dir): + self.db_dir = db_dir + self.unq_db = None + self.sql_db = None + + def setup(self): + self.unq_db = unqlite.UnQLite(os.path.join(self.db_dir, "blind_settings.db")) + # check_same_thread=False is solely to quiet a spurious error that appears to be due + # to a bug in twisted, where the connection is closed by a different thread than the + # one that opened it. The individual connections in the pool are not used in multiple + # threads. + self.sql_db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blind_peers.db"), + check_same_thread=False) + + return self.sql_db.runQuery("create table if not exists approved_peers (" + + " ip_address text, " + + " port integer" + + ")") + + def stop(self): + self.unq_db = None + self.sql_db = None + return defer.succeed(True) def save_repeater_status(self, running): def save_status(): - self.db.Put(json.dumps((SETTING_TYPE, "running")), json.dumps(running), sync=True) + self.unq_db["running"] = json.dumps(running) return threads.deferToThread(save_status) def get_repeater_saved_status(self): def get_status(): - try: - return json.loads(self.db.Get(json.dumps((SETTING_TYPE, "running")))) - except KeyError: + if "running" in self.unq_db: + return json.loads(self.unq_db['running']) + else: return False return threads.deferToThread(get_status) def save_max_space(self, max_space): def save_space(): - self.db.Put(json.dumps((SETTING_TYPE, "max_space")), str(max_space), sync=True) + self.unq_db['max_space'] = json.dumps(max_space) return threads.deferToThread(save_space) def get_saved_max_space(self): def get_space(): - try: - return int(self.db.Get(json.dumps((SETTING_TYPE, "max_space")))) - except KeyError: + if 'max_space' in self.unq_db: + return json.loads(self.unq_db['max_space']) + else: return 0 return threads.deferToThread(get_space) + @rerun_if_locked def save_approved_peer(self, host, port): - def add_peer(): - peer_string = json.dumps((PEER_TYPE, (host, port))) - self.db.Put(peer_string, "", sync=True) - - return threads.deferToThread(add_peer) + return self.sql_db.runQuery("insert into approved_peers values (?, ?)", + (host, port)) + @rerun_if_locked def remove_approved_peer(self, host, port): - def remove_peer(): - peer_string = json.dumps((PEER_TYPE, (host, port))) - self.db.Delete(peer_string, sync=True) - - return threads.deferToThread(remove_peer) + return self.sql_db.runQuery("delete from approved_peers where ip_address = ? and port = ?", + (host, port)) + @rerun_if_locked def get_approved_peers(self): - def get_peers(): - peers = [] - for k, v in self.db.RangeIter(): - key_type, peer_info = json.loads(k) - if key_type == PEER_TYPE: - peers.append(peer_info) - return peers - - return threads.deferToThread(get_peers) + return self.sql_db.runQuery("select * from approved_peers") def get_data_payment_rate(self): return threads.deferToThread(self._get_rate, "data_payment_rate") @@ -82,13 +94,13 @@ class BlindRepeaterSettings(object): return threads.deferToThread(self._save_rate, "valuable_hash_rate", rate) def _get_rate(self, rate_type): - try: - return json.loads(self.db.Get(json.dumps((SETTING_TYPE, rate_type)))) - except KeyError: + if rate_type in self.unq_db: + return json.loads(self.unq_db[rate_type]) + else: return None def _save_rate(self, rate_type, rate): if rate is not None: - self.db.Put(json.dumps((SETTING_TYPE, rate_type)), json.dumps(rate), sync=True) - else: - self.db.Delete(json.dumps((SETTING_TYPE, rate_type)), sync=True) \ No newline at end of file + self.unq_db[rate_type] = json.dumps(rate) + elif rate_type in self.unq_db: + del self.unq_db[rate_type] \ No newline at end of file diff --git a/lbrynet/lbrynet_console/plugins/BlindRepeater/__init__.py b/lbrynet/lbrynet_console/plugins/BlindRepeater/__init__.py index f78730573..769c737cd 100644 --- a/lbrynet/lbrynet_console/plugins/BlindRepeater/__init__.py +++ b/lbrynet/lbrynet_console/plugins/BlindRepeater/__init__.py @@ -1,7 +1,5 @@ -import leveldb -import os from lbrynet.lbrynet_console import LBRYPlugin -from twisted.internet import defer, threads +from twisted.internet import defer from lbrynet.conf import MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE, MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE from BlindRepeater import BlindRepeater from BlindInfoManager import BlindInfoManager @@ -26,14 +24,12 @@ class BlindRepeaterPlugin(LBRYPlugin.LBRYPlugin): self.control_handlers = None self.payment_rate_manager = None self.settings = None - self.db = None def setup(self, lbry_console): lbry_session = lbry_console.session - d = threads.deferToThread(self._setup_db, lbry_session.db_dir) - d.addCallback(lambda _: self._setup_settings()) + d = self._setup_settings(lbry_session.db_dir) d.addCallback(lambda _: self._get_payment_rate_manager(lbry_session.base_payment_rate_manager)) - d.addCallback(lambda _: self._setup_blind_info_manager(lbry_session.peer_manager)) + d.addCallback(lambda _: self._setup_blind_info_manager(lbry_session.peer_manager, lbry_session.db_dir)) d.addCallback(lambda _: self._setup_blind_repeater(lbry_session)) d.addCallback(lambda _: self._setup_valuable_blob_query_handler(lbry_session)) d.addCallback(lambda _: self._create_control_handlers(lbry_session)) @@ -41,11 +37,12 @@ class BlindRepeaterPlugin(LBRYPlugin.LBRYPlugin): d.addCallback(lambda _: self._add_to_lbry_console(lbry_console)) return d - def _setup_db(self, db_dir): - self.db = leveldb.LevelDB(os.path.join(db_dir, "valuable_blobs.db")) + def stop(self): + return self.settings.stop() - def _setup_settings(self): - self.settings = BlindRepeaterSettings(self.db) + def _setup_settings(self, db_dir): + self.settings = BlindRepeaterSettings(db_dir) + return self.settings.setup() def _get_payment_rate_manager(self, default_payment_rate_manager): d1 = self.settings.get_data_payment_rate() @@ -67,8 +64,8 @@ class BlindRepeaterPlugin(LBRYPlugin.LBRYPlugin): dl.addCallback(get_payment_rate_manager) return dl - def _setup_blind_info_manager(self, peer_manager): - self.blind_info_manager = BlindInfoManager(self.db, peer_manager) + def _setup_blind_info_manager(self, peer_manager, db_dir): + self.blind_info_manager = BlindInfoManager(db_dir, peer_manager) return self.blind_info_manager.setup() def _setup_valuable_blob_query_handler(self, lbry_session): diff --git a/lbrynet/lbrynet_console/plugins/BlindRepeater/db_keys.py b/lbrynet/lbrynet_console/plugins/BlindRepeater/db_keys.py deleted file mode 100644 index 1b87e5ba7..000000000 --- a/lbrynet/lbrynet_console/plugins/BlindRepeater/db_keys.py +++ /dev/null @@ -1,3 +0,0 @@ -BLOB_INFO_TYPE = 'b' -SETTING_TYPE = 's' -PEER_TYPE = 'p' \ No newline at end of file diff --git a/lbrynet/lbrynet_downloader_gui/LBRYDownloader.py b/lbrynet/lbrynet_downloader_gui/LBRYDownloader.py index 3d79bdd71..a81af8f8e 100644 --- a/lbrynet/lbrynet_downloader_gui/LBRYDownloader.py +++ b/lbrynet/lbrynet_downloader_gui/LBRYDownloader.py @@ -34,6 +34,7 @@ class LBRYDownloader(object): self.dht_node_port = 4444 self.run_server = True self.first_run = False + self.current_db_revision = 1 if os.name == "nt": from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle self.download_directory = get_path(FOLDERID.Downloads, UserHandle.current) @@ -54,6 +55,7 @@ class LBRYDownloader(object): def start(self): d = self._load_configuration_file() d.addCallback(lambda _: threads.deferToThread(self._create_directory)) + d.addCallback(lambda _: self._check_db_migration()) d.addCallback(lambda _: self._get_session()) d.addCallback(lambda _: self._setup_stream_info_manager()) d.addCallback(lambda _: self._setup_stream_identifier()) @@ -72,6 +74,37 @@ class LBRYDownloader(object): def get_new_address(self): return self.session.wallet.get_new_address() + def _check_db_migration(self): + old_revision = 0 + db_revision_file = os.path.join(self.conf_dir, "db_revision") + if os.path.exists(db_revision_file): + old_revision = int(open(db_revision_file).read().strip()) + if old_revision < self.current_db_revision: + if os.name == "nt": + import subprocess + import sys + + def run_migrator(): + migrator_exe = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), + "dmigrator", "migrator.exe") + print "trying to find the migrator at", migrator_exe + si = subprocess.STARTUPINFO + si.dwFlags = subprocess.STARTF_USESHOWWINDOW + si.wShowWindow = subprocess.SW_HIDE + print "trying to run the migrator" + migrator_proc = subprocess.Popen([migrator_exe, self.conf_dir, old_revision, + self.current_db_revision], startupinfo=si) + print "started the migrator" + migrator_proc.wait() + print "migrator has returned" + + return threads.deferToThread(run_migrator) + else: + from lbrynet.db_migrator import dbmigrator + return threads.deferToThread(dbmigrator.migrate_db, self.conf_dir, old_revision, + self.current_db_revision) + return defer.succeed(True) + def _load_configuration_file(self): def get_configuration(): @@ -194,6 +227,9 @@ class LBRYDownloader(object): def _create_directory(self): if not os.path.exists(self.conf_dir): os.makedirs(self.conf_dir) + db_revision = open(os.path.join(self.conf_dir, "db_revision"), mode='w') + db_revision.write(str(self.current_db_revision)) + db_revision.close() logging.debug("Created the configuration directory: %s", str(self.conf_dir)) if not os.path.exists(self.data_dir): os.makedirs(self.data_dir) diff --git a/setup.py b/setup.py index 1322aff33..7fba58ea6 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ from setuptools import setup, find_packages setup(name='lbrynet', version='0.0.4', packages=find_packages(), - install_requires=['pycrypto', 'twisted', 'miniupnpc', 'yapsy', 'seccure', 'python-bitcoinrpc', 'leveldb', 'txJSON-RPC', 'requests'], + install_requires=['pycrypto', 'twisted', 'miniupnpc', 'yapsy', 'seccure', 'python-bitcoinrpc', 'txJSON-RPC', 'requests', 'unqlite', 'leveldb'], entry_points={ 'console_scripts': [ 'lbrynet-console = lbrynet.lbrynet_console.LBRYConsole:launch_lbry_console',