diff --git a/src/main/java/com/lbry/database/PrefixDB.java b/src/main/java/com/lbry/database/PrefixDB.java index a7edf41..4e12933 100644 --- a/src/main/java/com/lbry/database/PrefixDB.java +++ b/src/main/java/com/lbry/database/PrefixDB.java @@ -1,5 +1,6 @@ package com.lbry.database; +import com.lbry.database.keys.UndoKey; import com.lbry.database.revert.RevertibleDelete; import com.lbry.database.revert.RevertibleOperation; import com.lbry.database.revert.RevertibleOperationStack; @@ -7,16 +8,10 @@ import com.lbry.database.revert.RevertiblePut; import com.lbry.database.rows.*; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.*; -import org.rocksdb.ColumnFamilyDescriptor; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.DBOptions; -import org.rocksdb.Options; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; +import org.rocksdb.*; /** * Class for a revertible RocksDB database: A RocksDB database where each set of applied changes can be undone. @@ -81,6 +76,14 @@ public class PrefixDB{ } public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth) throws RocksDBException{ + this(path,maxOpenFiles,secondaryPath,maxUndoDepth,null); + } + + public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth,Set unsafePrefixes) throws RocksDBException{ + this(path,maxOpenFiles,secondaryPath,maxUndoDepth,unsafePrefixes,true); + } + + public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth,Set unsafePrefixes,boolean enforceIntegrity) throws RocksDBException{ List columnFamilyDescriptors = new ArrayList<>(); columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); for(Prefix prefix : Prefix.values()){ @@ -100,8 +103,6 @@ public class PrefixDB{ this.database = RocksDB.open(new DBOptions(options),path,columnFamilyDescriptors,this.columnFamilyHandles); - Set unsafePrefixes = new HashSet<>();//TODO - boolean enforceIntegrity = false;//TODO this.operationStack = new RevertibleOperationStack((byte[] key) -> { try{ return Optional.of(this.get(key)); @@ -161,28 +162,98 @@ public class PrefixDB{ /** * Write staged changes to the database without keeping undo information. Changes written cannot be undone. */ - public void unsafeCommit(){ + public void unsafeCommit() throws RocksDBException{ this.applyStash(); + WriteOptions writeOptions = new WriteOptions().setSync(true); try{ - //TODO + if(this.operationStack.length()!=0){ + return; + } + WriteBatch batch = new WriteBatch(); + for(RevertibleOperation stagedChange : this.operationStack.interate()){ + ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0])); + if(!stagedChange.isDelete()){ + batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue()); + }else{ + batch.delete(columnFamily,stagedChange.getKey()); + } + this.database.write(writeOptions,batch); + } + }finally{ + writeOptions.close(); + this.operationStack.clear(); + } + } + + public void commit(int height,byte[] blockHash) throws RocksDBException{ + this.applyStash(); + byte[] undoOperations = this.operationStack.getUndoOperations(); + List deleteUndos = new ArrayList<>(); + if(height>this.maxUndoDepth){ + byte[] upperBound = ByteBuffer.allocate(1+8).order(ByteOrder.BIG_ENDIAN).put(Prefix.UNDO.getValue()).putLong(height-this.maxUndoDepth).array(); + RocksIterator iterator = this.database.newIterator(new ReadOptions().setIterateUpperBound(new Slice(upperBound))); + iterator.seek(ByteBuffer.allocate(1+8).order(ByteOrder.BIG_ENDIAN).put(Prefix.UNDO.getValue()).array()); + while(iterator.isValid()){ + deleteUndos.add(iterator.key()); + iterator.next(); + } + } + try{ + ColumnFamilyHandle undoColumnFamily = this.getColumnFamilyByPrefix(Prefix.UNDO); + WriteOptions writeOptions = new WriteOptions().setSync(true); + try{ + WriteBatch batch = new WriteBatch(); + for(RevertibleOperation stagedChange : this.operationStack.interate()){ + ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0])); + if(!stagedChange.isDelete()){ + batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue()); + }else{ + batch.delete(columnFamily,stagedChange.getKey()); + } + + } + for(byte[] undoToDelete : deleteUndos){ + batch.delete(undoColumnFamily,undoToDelete); + } + UndoKey undoKey = new UndoKey(); + undoKey.height = height; + undoKey.block_hash = blockHash; + byte[] undoKeyBytes = this.undo.packKey(undoKey); + batch.put(undoColumnFamily,undoKeyBytes,undoOperations); + this.database.write(writeOptions,batch); + }finally{ + writeOptions.close(); + this.operationStack.clear(); + } }finally{ this.operationStack.clear(); } } - public void commit(){ - this.applyStash(); + public void rollback(int height,byte[] blockHash) throws RocksDBException{ + UndoKey undoKey = new UndoKey(); + undoKey.height = height; + undoKey.block_hash = blockHash; + byte[] undoKeyBytes = this.undo.packKey(undoKey); + ColumnFamilyHandle undoColumnFamily = this.getColumnFamilyByPrefix(Prefix.UNDO); + byte[] undoInfo = this.database.get(undoColumnFamily,undoKeyBytes); + this.operationStack.applyPackedUndoOperations(undoInfo); + this.operationStack.validateAndApplyStashedOperations(); + WriteOptions writeOptions = new WriteOptions().setSync(true); try{ - //TODO - }finally{ - this.operationStack.clear(); - } - } - - public void rollback(int height,byte[] blockHash){ - try{ - //TODO + WriteBatch batch = new WriteBatch(); + for(RevertibleOperation stagedChange : this.operationStack.interate()){ + ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0])); + if(!stagedChange.isDelete()){ + batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue()); + }else{ + batch.delete(columnFamily,stagedChange.getKey()); + } + this.database.write(writeOptions,batch); + } + // batch.delete(undoKey) }finally{ + writeOptions.close(); this.operationStack.clear(); } } diff --git a/src/main/java/com/lbry/database/revert/RevertibleOperationStack.java b/src/main/java/com/lbry/database/revert/RevertibleOperationStack.java index a1e1a16..e7bcf8e 100644 --- a/src/main/java/com/lbry/database/revert/RevertibleOperationStack.java +++ b/src/main/java/com/lbry/database/revert/RevertibleOperationStack.java @@ -6,6 +6,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; public class RevertibleOperationStack{ @@ -419,6 +420,14 @@ public class RevertibleOperationStack{ this.stashedLastOperationForKey.clear(); } + public int length(){ + return this.items.values().stream().mapToInt(x -> x.length).sum(); + } + + public Iterable interate(){ + return this.items.values().stream().flatMap(Stream::of).collect(Collectors.toList()); + } + /** * Get the serialized bytes to undo all of the changes made by the pending ops */