Implement commit and rollback

This commit is contained in:
Ben van Hartingsveldt 2024-09-12 21:24:21 +02:00
parent d729d86948
commit 7d2ca00a88
No known key found for this signature in database
GPG key ID: 261AA214130CE7AB
2 changed files with 103 additions and 23 deletions

View file

@ -1,5 +1,6 @@
package com.lbry.database;
import com.lbry.database.keys.UndoKey;
import com.lbry.database.revert.RevertibleDelete;
import com.lbry.database.revert.RevertibleOperation;
import com.lbry.database.revert.RevertibleOperationStack;
@ -7,16 +8,10 @@ import com.lbry.database.revert.RevertiblePut;
import com.lbry.database.rows.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.*;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.*;
/**
* Class for a revertible RocksDB database: A RocksDB database where each set of applied changes can be undone.
@ -81,6 +76,14 @@ public class PrefixDB{
}
public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth) throws RocksDBException{
this(path,maxOpenFiles,secondaryPath,maxUndoDepth,null);
}
public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth,Set<Byte> unsafePrefixes) throws RocksDBException{
this(path,maxOpenFiles,secondaryPath,maxUndoDepth,unsafePrefixes,true);
}
public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth,Set<Byte> unsafePrefixes,boolean enforceIntegrity) throws RocksDBException{
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
for(Prefix prefix : Prefix.values()){
@ -100,8 +103,6 @@ public class PrefixDB{
this.database = RocksDB.open(new DBOptions(options),path,columnFamilyDescriptors,this.columnFamilyHandles);
Set<Byte> unsafePrefixes = new HashSet<>();//TODO
boolean enforceIntegrity = false;//TODO
this.operationStack = new RevertibleOperationStack((byte[] key) -> {
try{
return Optional.of(this.get(key));
@ -161,28 +162,98 @@ public class PrefixDB{
/**
* Write staged changes to the database without keeping undo information. Changes written cannot be undone.
*/
public void unsafeCommit(){
public void unsafeCommit() throws RocksDBException{
this.applyStash();
WriteOptions writeOptions = new WriteOptions().setSync(true);
try{
//TODO
if(this.operationStack.length()!=0){
return;
}
WriteBatch batch = new WriteBatch();
for(RevertibleOperation stagedChange : this.operationStack.interate()){
ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0]));
if(!stagedChange.isDelete()){
batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue());
}else{
batch.delete(columnFamily,stagedChange.getKey());
}
this.database.write(writeOptions,batch);
}
}finally{
writeOptions.close();
this.operationStack.clear();
}
}
public void commit(int height,byte[] blockHash) throws RocksDBException{
this.applyStash();
byte[] undoOperations = this.operationStack.getUndoOperations();
List<byte[]> deleteUndos = new ArrayList<>();
if(height>this.maxUndoDepth){
byte[] upperBound = ByteBuffer.allocate(1+8).order(ByteOrder.BIG_ENDIAN).put(Prefix.UNDO.getValue()).putLong(height-this.maxUndoDepth).array();
RocksIterator iterator = this.database.newIterator(new ReadOptions().setIterateUpperBound(new Slice(upperBound)));
iterator.seek(ByteBuffer.allocate(1+8).order(ByteOrder.BIG_ENDIAN).put(Prefix.UNDO.getValue()).array());
while(iterator.isValid()){
deleteUndos.add(iterator.key());
iterator.next();
}
}
try{
ColumnFamilyHandle undoColumnFamily = this.getColumnFamilyByPrefix(Prefix.UNDO);
WriteOptions writeOptions = new WriteOptions().setSync(true);
try{
WriteBatch batch = new WriteBatch();
for(RevertibleOperation stagedChange : this.operationStack.interate()){
ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0]));
if(!stagedChange.isDelete()){
batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue());
}else{
batch.delete(columnFamily,stagedChange.getKey());
}
}
for(byte[] undoToDelete : deleteUndos){
batch.delete(undoColumnFamily,undoToDelete);
}
UndoKey undoKey = new UndoKey();
undoKey.height = height;
undoKey.block_hash = blockHash;
byte[] undoKeyBytes = this.undo.packKey(undoKey);
batch.put(undoColumnFamily,undoKeyBytes,undoOperations);
this.database.write(writeOptions,batch);
}finally{
writeOptions.close();
this.operationStack.clear();
}
}finally{
this.operationStack.clear();
}
}
public void commit(){
this.applyStash();
public void rollback(int height,byte[] blockHash) throws RocksDBException{
UndoKey undoKey = new UndoKey();
undoKey.height = height;
undoKey.block_hash = blockHash;
byte[] undoKeyBytes = this.undo.packKey(undoKey);
ColumnFamilyHandle undoColumnFamily = this.getColumnFamilyByPrefix(Prefix.UNDO);
byte[] undoInfo = this.database.get(undoColumnFamily,undoKeyBytes);
this.operationStack.applyPackedUndoOperations(undoInfo);
this.operationStack.validateAndApplyStashedOperations();
WriteOptions writeOptions = new WriteOptions().setSync(true);
try{
//TODO
}finally{
this.operationStack.clear();
}
}
public void rollback(int height,byte[] blockHash){
try{
//TODO
WriteBatch batch = new WriteBatch();
for(RevertibleOperation stagedChange : this.operationStack.interate()){
ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0]));
if(!stagedChange.isDelete()){
batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue());
}else{
batch.delete(columnFamily,stagedChange.getKey());
}
this.database.write(writeOptions,batch);
}
// batch.delete(undoKey)
}finally{
writeOptions.close();
this.operationStack.clear();
}
}

View file

@ -6,6 +6,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class RevertibleOperationStack{
@ -419,6 +420,14 @@ public class RevertibleOperationStack{
this.stashedLastOperationForKey.clear();
}
public int length(){
return this.items.values().stream().mapToInt(x -> x.length).sum();
}
public Iterable<RevertibleOperation> interate(){
return this.items.values().stream().flatMap(Stream::of).collect(Collectors.toList());
}
/**
* Get the serialized bytes to undo all of the changes made by the pending ops
*/