Finalize revertible

This commit is contained in:
Ben van Hartingsveldt 2024-09-12 14:11:10 +02:00
parent a49de200ec
commit d729d86948
No known key found for this signature in database
GPG key ID: 261AA214130CE7AB
7 changed files with 564 additions and 11 deletions

View file

@ -62,4 +62,17 @@ public enum Prefix{
return this.value; return this.value;
} }
public static Prefix getByValue(char value){
return Prefix.getByValue((byte) value);
}
public static Prefix getByValue(byte value){
for(Prefix p : Prefix.values()){
if(p.value==value){
return p;
}
}
return null;
}
} }

View file

@ -7,10 +7,7 @@ import com.lbry.database.revert.RevertiblePut;
import com.lbry.database.rows.*; import com.lbry.database.rows.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
@ -103,7 +100,20 @@ public class PrefixDB{
this.database = RocksDB.open(new DBOptions(options),path,columnFamilyDescriptors,this.columnFamilyHandles); this.database = RocksDB.open(new DBOptions(options),path,columnFamilyDescriptors,this.columnFamilyHandles);
this.operationStack = new RevertibleOperationStack(); Set<Byte> unsafePrefixes = new HashSet<>();//TODO
boolean enforceIntegrity = false;//TODO
this.operationStack = new RevertibleOperationStack((byte[] key) -> {
try{
return Optional.of(this.get(key));
}catch(RocksDBException e){}
return Optional.empty();
},(List<byte[]> keys) -> {
List<Optional<byte[]>> optionalKeys = new ArrayList<>();
for(byte[] key : keys){
optionalKeys.add(Optional.of(key));
}
return optionalKeys;
},unsafePrefixes,enforceIntegrity);
this.maxUndoDepth = maxUndoDepth; this.maxUndoDepth = maxUndoDepth;

View file

@ -0,0 +1,9 @@
package com.lbry.database.revert;
public class OperationStackIntegrityException extends RuntimeException{
public OperationStackIntegrityException(String message){
super(message);
}
}

View file

@ -1,5 +1,11 @@
package com.lbry.database.revert; package com.lbry.database.revert;
import com.lbry.database.Prefix;
import com.lbry.database.rows.PrefixRow;
import com.lbry.database.util.Tuple2;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays; import java.util.Arrays;
public abstract class RevertibleOperation{ public abstract class RevertibleOperation{
@ -30,9 +36,32 @@ public abstract class RevertibleOperation{
throw new RuntimeException("Not implemented"); throw new RuntimeException("Not implemented");
} }
//TODO PACK public byte[] pack(){
//TODO UNPACK return ByteBuffer.allocate(1+4+4+this.key.length+this.value.length).order(ByteOrder.BIG_ENDIAN)
.put((byte) (this.isPut?0x01:0x00))
.putInt(this.key.length)
.putInt(this.value.length)
.put(this.key)
.put(this.value)
.array();
}
public static Tuple2<RevertibleOperation,byte[]> unpack(byte[] packed){
ByteBuffer bb = ByteBuffer.wrap(packed).order(ByteOrder.BIG_ENDIAN);
boolean isPut = (bb.get() & 0xFF)!=0x00;
int keyLength = bb.getInt();
int valueLength = bb.getInt();
byte[] keyBytes = new byte[keyLength];
bb.get(keyBytes);
byte[] valueBytes = new byte[valueLength];
bb.get(valueBytes);
byte[] remainingPacked = new byte[bb.remaining()];
bb.get(remainingPacked);
if(isPut){
return new Tuple2<>(new RevertiblePut(keyBytes,valueBytes),remainingPacked);
}
return new Tuple2<>(new RevertibleDelete(keyBytes,valueBytes),remainingPacked);
}
@Override @Override
public boolean equals(Object obj){ public boolean equals(Object obj){
@ -43,7 +72,17 @@ public abstract class RevertibleOperation{
return false; return false;
} }
//TODO REPR @Override
//TODO STR public String toString() {
Prefix prefix = Prefix.getByValue(this.value[0]);
String prefixStr = (prefix!=null?prefix.name():"?");
String k = "?";
String v = "?";
if(prefix!=null){
k = PrefixRow.TYPES.get(prefix).unpackKey(this.key).toString();
v = PrefixRow.TYPES.get(prefix).unpackKey(this.value).toString();
}
return (this.isPut?"PUT":"DELETE")+" "+prefixStr+": "+k+" | "+v;
}
} }

View file

@ -1,17 +1,474 @@
package com.lbry.database.revert; package com.lbry.database.revert;
import com.lbry.database.util.Tuple2;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class RevertibleOperationStack{ public class RevertibleOperationStack{
private final Function<byte[],Optional<byte[]>> get;
private final Function<List<byte[]>,Iterable<Optional<byte[]>>> multiGet;
private final Map<byte[],RevertibleOperation[]> items;
private final Deque<RevertibleOperation> stash;
private final Map<byte[],RevertibleOperation> stashedLastOperationForKey;
private final Set<Byte> unsafePrefixes;
private final boolean enforceIntegrity;
public RevertibleOperationStack(Function<byte[],Optional<byte[]>> get,Function<List<byte[]>,Iterable<Optional<byte[]>>> multiGet,Set<Byte> unsafePrefixes,boolean enforceIntegrity){
this.get = get;
this.multiGet = multiGet;
this.items = new HashMap<>();
this.stash = new ArrayDeque<>();
this.stashedLastOperationForKey = new HashMap<>();
this.unsafePrefixes = unsafePrefixes!=null?unsafePrefixes:new HashSet<>();
this.enforceIntegrity = enforceIntegrity;
}
public void stashOperations(RevertibleOperation[] operations){ public void stashOperations(RevertibleOperation[] operations){
//TODO this.stash.addAll(Arrays.asList(operations));
for(RevertibleOperation operation : operations){
this.stashedLastOperationForKey.put(operation.key,operation);
}
} }
public void validateAndApplyStashedOperations(){ public void validateAndApplyStashedOperations(){
//TODO if(this.stash.isEmpty()){
return;
}
List<RevertibleOperation> needAppend = new ArrayList<>();
Set<byte[]> uniqueKeys = new HashSet<>();
while(!this.stash.isEmpty()){
RevertibleOperation operation = this.stash.pollFirst();
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}
if(operationArr!=null && operationArr.length>=1 && operation.invert().equals(operationArr[operationArr.length-1])){
this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1));
continue;
}
if(operationArr!=null && operationArr.length>=1 && operation.equals(operationArr[operationArr.length-1])){
continue;
}else{
needAppend.add(operation);
uniqueKeys.add(operation.getKey());
}
}
Map<byte[],byte[]> existing = new HashMap<>();
if(this.enforceIntegrity && !uniqueKeys.isEmpty()){
List<byte[]> uniqueKeysList = new ArrayList<>(uniqueKeys);
for(int idx=0;idx<uniqueKeys.size();idx+=10000){
List<byte[]> batch = uniqueKeysList.subList(idx,idx+10000);
Iterator<Optional<byte[]>> iterator = this.multiGet.apply(batch).iterator();
for(byte[] k : batch){
byte[] v = iterator.next().get();
existing.put(k,v);
}
}
}
for(RevertibleOperation operation : needAppend){
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}
if(operationArr!=null && operationArr.length>=1 && operationArr[operationArr.length-1].equals(operation)){
this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1));
RevertibleOperation[] operationArrX = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrX = e.getValue();
}
}
if(operationArrX==null || operationArrX.length==0){
this.items.remove(operation.getKey());
}
}
if(!this.enforceIntegrity){
RevertibleOperation[] operationArrX = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrX = e.getValue();
}
}
RevertibleOperation[] newArr = new RevertibleOperation[operationArrX==null?1:operationArrX.length+1];
newArr[newArr.length-1] = operation;
this.items.put(newArr[0].getKey(),newArr);
}
RevertibleOperation[] operationArrX = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrX = e.getValue();
}
}
byte[] storedValue = existing.get(operation.getKey());
boolean hasStoredValue = storedValue!=null;
RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue):null;
boolean deleteStoredOperationInOperationList = false;
if(operationArr!=null){
for(RevertibleOperation o : operationArr){
if(o.equals(deleteStoredOperation)){
deleteStoredOperationInOperationList = true;
break;
}
}
}
boolean willDeleteExistingRecord = deleteStoredOperation!=null && deleteStoredOperationInOperationList;
try{
if(operation.isDelete()){
if(hasStoredValue && !Arrays.equals(storedValue,operation.value) && !willDeleteExistingRecord){
// There is a value and we're not deleting it in this operation.
// Check that a delete for the stored value is in the stack.
throw new OperationStackIntegrityException("Database operation tries to delete with incorrect existing value "+operation+"\nvs\n"+new String(storedValue));
}else if(!hasStoredValue){
throw new OperationStackIntegrityException("Database operation tries to delete nonexistent key: "+operation);
}else if(!Arrays.equals(storedValue,operation.value)){
throw new OperationStackIntegrityException("Database operation tries to delete with incorrect value: "+operation);
}
}else{
if(hasStoredValue && !willDeleteExistingRecord){
throw new OperationStackIntegrityException("Database operation tries to overwrite before deleting existing: "+operation);
}
RevertibleOperation[] operationArrY = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrY = e.getValue();
}
}
if(operationArrY!=null && operationArrY.length>=1 && operationArrY[operationArrY.length-1].isPut){
throw new OperationStackIntegrityException("Database operation tries to overwrite with "+operation+" before deleting pending: "+operationArrY[operationArrY.length-1]);
}
}
}catch(OperationStackIntegrityException e){
if(this.unsafePrefixes.contains(operation.getKey()[0])){
System.err.println("Skipping over integrity error: "+e);
}else{
throw e;
}
}
RevertibleOperation[] newArr = new RevertibleOperation[operationArrX==null?1:operationArrX.length+1];
newArr[newArr.length-1] = operation;
this.items.put(newArr[0].getKey(),newArr);
}
this.stashedLastOperationForKey.clear();
}
public void appendOperation(RevertibleOperation operation){
RevertibleOperation inverted = operation.invert();
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}
if(operationArr!=null && operationArr.length>=1 && inverted.equals(operationArr[operationArr.length-1])){
this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1));
}
Optional<byte[]> storedValue = this.get.apply(operation.getKey());
boolean hasStoredValue = storedValue.isPresent();
RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue.get()):null;
boolean deleteStoredOperationInOperationList = false;
if(operationArr!=null){
for(RevertibleOperation o : operationArr){
if(o.equals(deleteStoredOperation)){
deleteStoredOperationInOperationList = true;
break;
}
}
}
boolean willDeleteExistingRecord = deleteStoredOperation!=null && deleteStoredOperationInOperationList;
try{
if(operation.isPut && hasStoredValue && !willDeleteExistingRecord){
throw new OperationStackIntegrityException("Database operation tries to add on top of existing key without deleting first: "+operation);
}else if(operation.isDelete() && hasStoredValue && !Arrays.equals(storedValue.get(),operation.getValue()) && !willDeleteExistingRecord){
// There is a value and we're not deleting it in this operation.
// Check that a delete for the stored value is in the stack.
throw new OperationStackIntegrityException("Database operation tries to delete with incorrect existing value "+operation);
}else if(operation.isDelete() && !hasStoredValue){
throw new OperationStackIntegrityException("Database operation tries to delete nonexistent key: "+operation);
}else if(operation.isDelete() && !Arrays.equals(storedValue.get(),operation.getValue())){
throw new OperationStackIntegrityException("Database operation tries to delete with incorrect value: "+operation);
}
}catch(OperationStackIntegrityException e){
if(this.unsafePrefixes.contains(operation.getKey()[0])){
System.err.println("Skipping over integrity error: "+e);
}else{
throw e;
}
}
RevertibleOperation[] operationArrX = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrX = e.getValue();
}
}
RevertibleOperation[] newArr = new RevertibleOperation[operationArrX==null?0:operationArrX.length];
newArr[newArr.length-1] = operation;
this.items.put(newArr[0].getKey(),newArr);
}
/**
* Apply a put or delete op, checking that it introduces no integrity errors
* @param operations
*/
public void multiPut(List<RevertiblePut> operations){
if(operations==null){
return;
}
for(RevertibleOperation op : operations){
if(!op.isPut){
throw new RuntimeException("List must contain only put operations.");
}
}
Map<byte[],RevertibleOperation> keys = new HashMap<>();
for(RevertibleOperation operation : operations){
keys.put(operation.getKey(),operation);
}
if(keys.keySet().size()!=operations.size()){
throw new RuntimeException("List must contain unique keys.");
}
List<RevertibleOperation> needPut = new ArrayList<>();
for(RevertibleOperation operation : operations){
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}
if(operationArr!=null && operationArr.length>=1 && operation.invert().equals(operationArr[operationArr.length-1])){
this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1));
continue;
}else if(operationArr!=null && operationArr.length>=1 && operation.equals(operationArr[operationArr.length-1])){
continue; // Raise an error?
}else{
needPut.add(operation);
}
}
Iterator<Optional<byte[]>> storedValues = this.multiGet.apply(needPut.stream().map(RevertibleOperation::getKey).collect(Collectors.toList())).iterator();
for(RevertibleOperation operation : needPut){
Optional<byte[]> storedValue = storedValues.next();
boolean hasStoredValue = storedValue.isPresent();
RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue.get()):null;
RevertibleOperation[] operationArrX = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrX = e.getValue();
}
}
boolean deleteStoredOperationInOperationList = false;
if(operationArrX!=null){
for(RevertibleOperation o : operationArrX){
if(o.equals(deleteStoredOperation)){
deleteStoredOperationInOperationList = true;
break;
}
}
}
boolean willDeleteExistingRecord = deleteStoredOperation!=null && deleteStoredOperationInOperationList;
try{
if(hasStoredValue && !willDeleteExistingRecord){
throw new OperationStackIntegrityException("Database operation tries to overwrite before deleting existing: "+operation);
}
}catch(OperationStackIntegrityException e){
if(this.unsafePrefixes.contains(operation.getKey()[0])){
System.err.println("Skipping over integrity error: "+e);
}else{
throw e;
}
}
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}
RevertibleOperation[] newArr = new RevertibleOperation[operationArr==null?1:operationArr.length+1];
newArr[newArr.length-1] = operation;
this.items.put(newArr[0].getKey(),newArr);
}
}
/**
* Apply a put or delete op, checking that it introduces no integrity errors
* @param operations
*/
public void multiDelete(List<RevertibleDelete> operations){
if(operations==null){
return;
}
for(RevertibleOperation op : operations){
if(op.isDelete()){
throw new RuntimeException("List must contain only delete operations.");
}
}
Map<byte[],RevertibleOperation> keys = new HashMap<>();
for(RevertibleOperation operation : operations){
keys.put(operation.getKey(),operation);
}
if(keys.keySet().size()!=operations.size()){
throw new RuntimeException("List must contain unique keys.");
}
List<RevertibleOperation> needDelete = new ArrayList<>();
for(RevertibleOperation operation : operations){
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}
if(operationArr!=null && operationArr.length>=1 && operation.invert().equals(operationArr[operationArr.length-1])){
this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1));
continue;
}else if(operationArr!=null && operationArr.length>=1 && operation.equals(operationArr[operationArr.length-1])){
continue; // Raise an error?
}else{
needDelete.add(operation);
}
}
Iterator<Optional<byte[]>> storedValues = this.multiGet.apply(needDelete.stream().map(RevertibleOperation::getKey).collect(Collectors.toList())).iterator();
for(RevertibleOperation operation : needDelete){
Optional<byte[]> storedValue = storedValues.next();
boolean hasStoredValue = storedValue.isPresent();
RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue.get()):null;
RevertibleOperation[] operationArrX = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArrX = e.getValue();
}
}
boolean deleteStoredOperationInOperationList = false;
if(operationArrX!=null){
for(RevertibleOperation o : operationArrX){
if(o.equals(deleteStoredOperation)){
deleteStoredOperationInOperationList = true;
break;
}
}
}
boolean willDeleteExistingRecord = deleteStoredOperation!=null && deleteStoredOperationInOperationList;
try{
if(operation.isDelete() && hasStoredValue && Arrays.equals(storedValue.get(),operation.getValue()) && !willDeleteExistingRecord){
// There is a value and we're not deleting it in this operation.
// Check that a delete for the stored value is in the stack.
throw new OperationStackIntegrityException("Database operation tries to delete with incorrect existing value "+operation);
}else if(!storedValue.isPresent()){
throw new OperationStackIntegrityException("Database operation tries to delete nonexistent key: "+operation);
}else if(operation.isDelete() && Arrays.equals(storedValue.get(),operation.getValue())){
throw new OperationStackIntegrityException("Database operation tries to delete with incorrect value: "+operation);
}
}catch(OperationStackIntegrityException e){
if(this.unsafePrefixes.contains(operation.getKey()[0])){
System.err.println("Skipping over integrity error: "+e);
}else{
throw e;
}
}
RevertibleOperation[] operationArr = null;
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),operation.getKey())){
operationArr = e.getValue();
}
}
RevertibleOperation[] newArr = new RevertibleOperation[operationArr==null?1:operationArr.length+1];
newArr[newArr.length-1] = operation;
this.items.put(newArr[0].getKey(),newArr);
}
} }
public void clear(){ public void clear(){
this.items.clear();
this.stash.clear();
this.stashedLastOperationForKey.clear();
}
/**
* Get the serialized bytes to undo all of the changes made by the pending ops
*/
public byte[] getUndoOperations(){
List<RevertibleOperation> reversed = new ArrayList<>();
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
List<RevertibleOperation> operations = Arrays.asList(e.getValue());
Collections.reverse(operations);
reversed.addAll(operations);
}
List<byte[]> invertedAndPacked = new ArrayList<>();
int size = 0;
for(RevertibleOperation operation : reversed){
byte[] undoOperation = operation.invert().pack();
invertedAndPacked.add(undoOperation);
size += undoOperation.length;
}
ByteBuffer bb = ByteBuffer.allocate(size);
for(byte[] packed : invertedAndPacked){
bb.put(packed);
}
return bb.array();
}
/**
* Unpack and apply a sequence of undo ops from serialized undo bytes
* @param packed
*/
public void applyPackedUndoOperations(byte[] packed){
while(packed.length>0){
Tuple2<RevertibleOperation,byte[]> unpacked = RevertibleOperation.unpack(packed);
this.appendOperation(unpacked.getA());
packed = unpacked.getB();
}
}
public Optional<RevertibleOperation> getPendingOperation(byte[] key){
for(Map.Entry<byte[],RevertibleOperation> e : this.stashedLastOperationForKey.entrySet()){
if(Arrays.equals(e.getKey(),key)){
return Optional.of(e.getValue());
}
}
for(Map.Entry<byte[],RevertibleOperation[]> e : this.items.entrySet()){
if(Arrays.equals(e.getKey(),key)){
if(e.getValue().length>=1){
return Optional.of(e.getValue()[e.getValue().length-1]);
}
}
}
return Optional.empty();
} }
} }

View file

@ -5,6 +5,7 @@ import com.lbry.database.PrefixDB;
import com.lbry.database.keys.KeyInterface; import com.lbry.database.keys.KeyInterface;
import com.lbry.database.values.ValueInterface; import com.lbry.database.values.ValueInterface;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -17,10 +18,13 @@ import org.rocksdb.RocksIterator;
public abstract class PrefixRow<K extends KeyInterface,V extends ValueInterface>{ public abstract class PrefixRow<K extends KeyInterface,V extends ValueInterface>{
public static final Map<Prefix,PrefixRow<?,?>> TYPES = new HashMap<>();
private final PrefixDB database; private final PrefixDB database;
public PrefixRow(PrefixDB database){ public PrefixRow(PrefixDB database){
this.database = database; this.database = database;
PrefixRow.TYPES.put(this.prefix(),this);
} }
public RocksIterator iterate() throws RocksDBException{ public RocksIterator iterate() throws RocksDBException{

View file

@ -0,0 +1,21 @@
package com.lbry.database.util;
public class Tuple2<A,B>{
private final A a;
private final B b;
public Tuple2(A a,B b){
this.a = a;
this.b = b;
}
public A getA() {
return this.a;
}
public B getB() {
return this.b;
}
}