diff --git a/src/main/java/com/lbry/database/Prefix.java b/src/main/java/com/lbry/database/Prefix.java index d380bab..e4d3573 100644 --- a/src/main/java/com/lbry/database/Prefix.java +++ b/src/main/java/com/lbry/database/Prefix.java @@ -62,4 +62,17 @@ public enum Prefix{ return this.value; } + public static Prefix getByValue(char value){ + return Prefix.getByValue((byte) value); + } + + public static Prefix getByValue(byte value){ + for(Prefix p : Prefix.values()){ + if(p.value==value){ + return p; + } + } + return null; + } + } \ No newline at end of file diff --git a/src/main/java/com/lbry/database/PrefixDB.java b/src/main/java/com/lbry/database/PrefixDB.java index 8244f39..a7edf41 100644 --- a/src/main/java/com/lbry/database/PrefixDB.java +++ b/src/main/java/com/lbry/database/PrefixDB.java @@ -7,10 +7,7 @@ import com.lbry.database.revert.RevertiblePut; import com.lbry.database.rows.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -103,7 +100,20 @@ public class PrefixDB{ this.database = RocksDB.open(new DBOptions(options),path,columnFamilyDescriptors,this.columnFamilyHandles); - this.operationStack = new RevertibleOperationStack(); + Set unsafePrefixes = new HashSet<>();//TODO + boolean enforceIntegrity = false;//TODO + this.operationStack = new RevertibleOperationStack((byte[] key) -> { + try{ + return Optional.of(this.get(key)); + }catch(RocksDBException e){} + return Optional.empty(); + },(List keys) -> { + List> optionalKeys = new ArrayList<>(); + for(byte[] key : keys){ + optionalKeys.add(Optional.of(key)); + } + return optionalKeys; + },unsafePrefixes,enforceIntegrity); this.maxUndoDepth = maxUndoDepth; diff --git a/src/main/java/com/lbry/database/revert/OperationStackIntegrityException.java b/src/main/java/com/lbry/database/revert/OperationStackIntegrityException.java new file mode 100644 index 0000000..5f2b102 --- /dev/null +++ b/src/main/java/com/lbry/database/revert/OperationStackIntegrityException.java @@ -0,0 +1,9 @@ +package com.lbry.database.revert; + +public class OperationStackIntegrityException extends RuntimeException{ + + public OperationStackIntegrityException(String message){ + super(message); + } + +} \ No newline at end of file diff --git a/src/main/java/com/lbry/database/revert/RevertibleOperation.java b/src/main/java/com/lbry/database/revert/RevertibleOperation.java index 2a05259..ee5287a 100644 --- a/src/main/java/com/lbry/database/revert/RevertibleOperation.java +++ b/src/main/java/com/lbry/database/revert/RevertibleOperation.java @@ -1,5 +1,11 @@ package com.lbry.database.revert; +import com.lbry.database.Prefix; +import com.lbry.database.rows.PrefixRow; +import com.lbry.database.util.Tuple2; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; public abstract class RevertibleOperation{ @@ -30,9 +36,32 @@ public abstract class RevertibleOperation{ throw new RuntimeException("Not implemented"); } - //TODO PACK - //TODO UNPACK + public byte[] pack(){ + return ByteBuffer.allocate(1+4+4+this.key.length+this.value.length).order(ByteOrder.BIG_ENDIAN) + .put((byte) (this.isPut?0x01:0x00)) + .putInt(this.key.length) + .putInt(this.value.length) + .put(this.key) + .put(this.value) + .array(); + } + public static Tuple2 unpack(byte[] packed){ + ByteBuffer bb = ByteBuffer.wrap(packed).order(ByteOrder.BIG_ENDIAN); + boolean isPut = (bb.get() & 0xFF)!=0x00; + int keyLength = bb.getInt(); + int valueLength = bb.getInt(); + byte[] keyBytes = new byte[keyLength]; + bb.get(keyBytes); + byte[] valueBytes = new byte[valueLength]; + bb.get(valueBytes); + byte[] remainingPacked = new byte[bb.remaining()]; + bb.get(remainingPacked); + if(isPut){ + return new Tuple2<>(new RevertiblePut(keyBytes,valueBytes),remainingPacked); + } + return new Tuple2<>(new RevertibleDelete(keyBytes,valueBytes),remainingPacked); + } @Override public boolean equals(Object obj){ @@ -43,7 +72,17 @@ public abstract class RevertibleOperation{ return false; } - //TODO REPR - //TODO STR + @Override + public String toString() { + Prefix prefix = Prefix.getByValue(this.value[0]); + String prefixStr = (prefix!=null?prefix.name():"?"); + String k = "?"; + String v = "?"; + if(prefix!=null){ + k = PrefixRow.TYPES.get(prefix).unpackKey(this.key).toString(); + v = PrefixRow.TYPES.get(prefix).unpackKey(this.value).toString(); + } + return (this.isPut?"PUT":"DELETE")+" "+prefixStr+": "+k+" | "+v; + } } \ No newline at end of file diff --git a/src/main/java/com/lbry/database/revert/RevertibleOperationStack.java b/src/main/java/com/lbry/database/revert/RevertibleOperationStack.java index 48f6006..a1e1a16 100644 --- a/src/main/java/com/lbry/database/revert/RevertibleOperationStack.java +++ b/src/main/java/com/lbry/database/revert/RevertibleOperationStack.java @@ -1,17 +1,474 @@ package com.lbry.database.revert; +import com.lbry.database.util.Tuple2; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + public class RevertibleOperationStack{ + private final Function> get; + private final Function,Iterable>> multiGet; + + private final Map items; + + private final Deque stash; + private final Map stashedLastOperationForKey; + + private final Set unsafePrefixes; + + private final boolean enforceIntegrity; + + public RevertibleOperationStack(Function> get,Function,Iterable>> multiGet,Set unsafePrefixes,boolean enforceIntegrity){ + this.get = get; + this.multiGet = multiGet; + + this.items = new HashMap<>(); + + this.stash = new ArrayDeque<>(); + this.stashedLastOperationForKey = new HashMap<>(); + + this.unsafePrefixes = unsafePrefixes!=null?unsafePrefixes:new HashSet<>(); + this.enforceIntegrity = enforceIntegrity; + } + public void stashOperations(RevertibleOperation[] operations){ - //TODO + this.stash.addAll(Arrays.asList(operations)); + for(RevertibleOperation operation : operations){ + this.stashedLastOperationForKey.put(operation.key,operation); + } } public void validateAndApplyStashedOperations(){ - //TODO + if(this.stash.isEmpty()){ + return; + } + + List needAppend = new ArrayList<>(); + Set uniqueKeys = new HashSet<>(); + + while(!this.stash.isEmpty()){ + RevertibleOperation operation = this.stash.pollFirst(); + RevertibleOperation[] operationArr = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArr = e.getValue(); + } + } + if(operationArr!=null && operationArr.length>=1 && operation.invert().equals(operationArr[operationArr.length-1])){ + this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1)); + continue; + } + if(operationArr!=null && operationArr.length>=1 && operation.equals(operationArr[operationArr.length-1])){ + continue; + }else{ + needAppend.add(operation); + uniqueKeys.add(operation.getKey()); + } + } + + Map existing = new HashMap<>(); + if(this.enforceIntegrity && !uniqueKeys.isEmpty()){ + List uniqueKeysList = new ArrayList<>(uniqueKeys); + for(int idx=0;idx batch = uniqueKeysList.subList(idx,idx+10000); + Iterator> iterator = this.multiGet.apply(batch).iterator(); + for(byte[] k : batch){ + byte[] v = iterator.next().get(); + existing.put(k,v); + } + + } + } + + for(RevertibleOperation operation : needAppend){ + RevertibleOperation[] operationArr = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArr = e.getValue(); + } + } + + if(operationArr!=null && operationArr.length>=1 && operationArr[operationArr.length-1].equals(operation)){ + this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1)); + RevertibleOperation[] operationArrX = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArrX = e.getValue(); + } + } + if(operationArrX==null || operationArrX.length==0){ + this.items.remove(operation.getKey()); + } + } + if(!this.enforceIntegrity){ + RevertibleOperation[] operationArrX = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArrX = e.getValue(); + } + } + + RevertibleOperation[] newArr = new RevertibleOperation[operationArrX==null?1:operationArrX.length+1]; + newArr[newArr.length-1] = operation; + this.items.put(newArr[0].getKey(),newArr); + } + + RevertibleOperation[] operationArrX = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArrX = e.getValue(); + } + } + + byte[] storedValue = existing.get(operation.getKey()); + boolean hasStoredValue = storedValue!=null; + RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue):null; + boolean deleteStoredOperationInOperationList = false; + if(operationArr!=null){ + for(RevertibleOperation o : operationArr){ + if(o.equals(deleteStoredOperation)){ + deleteStoredOperationInOperationList = true; + break; + } + } + } + boolean willDeleteExistingRecord = deleteStoredOperation!=null && deleteStoredOperationInOperationList; + + try{ + if(operation.isDelete()){ + if(hasStoredValue && !Arrays.equals(storedValue,operation.value) && !willDeleteExistingRecord){ + // There is a value and we're not deleting it in this operation. + // Check that a delete for the stored value is in the stack. + throw new OperationStackIntegrityException("Database operation tries to delete with incorrect existing value "+operation+"\nvs\n"+new String(storedValue)); + }else if(!hasStoredValue){ + throw new OperationStackIntegrityException("Database operation tries to delete nonexistent key: "+operation); + }else if(!Arrays.equals(storedValue,operation.value)){ + throw new OperationStackIntegrityException("Database operation tries to delete with incorrect value: "+operation); + } + }else{ + if(hasStoredValue && !willDeleteExistingRecord){ + throw new OperationStackIntegrityException("Database operation tries to overwrite before deleting existing: "+operation); + } + RevertibleOperation[] operationArrY = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArrY = e.getValue(); + } + } + if(operationArrY!=null && operationArrY.length>=1 && operationArrY[operationArrY.length-1].isPut){ + throw new OperationStackIntegrityException("Database operation tries to overwrite with "+operation+" before deleting pending: "+operationArrY[operationArrY.length-1]); + } + } + }catch(OperationStackIntegrityException e){ + if(this.unsafePrefixes.contains(operation.getKey()[0])){ + System.err.println("Skipping over integrity error: "+e); + }else{ + throw e; + } + } + + RevertibleOperation[] newArr = new RevertibleOperation[operationArrX==null?1:operationArrX.length+1]; + newArr[newArr.length-1] = operation; + this.items.put(newArr[0].getKey(),newArr); + } + + this.stashedLastOperationForKey.clear(); + } + + public void appendOperation(RevertibleOperation operation){ + RevertibleOperation inverted = operation.invert(); + + RevertibleOperation[] operationArr = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArr = e.getValue(); + } + } + if(operationArr!=null && operationArr.length>=1 && inverted.equals(operationArr[operationArr.length-1])){ + this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1)); + } + Optional storedValue = this.get.apply(operation.getKey()); + boolean hasStoredValue = storedValue.isPresent(); + RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue.get()):null; + boolean deleteStoredOperationInOperationList = false; + if(operationArr!=null){ + for(RevertibleOperation o : operationArr){ + if(o.equals(deleteStoredOperation)){ + deleteStoredOperationInOperationList = true; + break; + } + } + } + boolean willDeleteExistingRecord = deleteStoredOperation!=null && deleteStoredOperationInOperationList; + + try{ + if(operation.isPut && hasStoredValue && !willDeleteExistingRecord){ + throw new OperationStackIntegrityException("Database operation tries to add on top of existing key without deleting first: "+operation); + }else if(operation.isDelete() && hasStoredValue && !Arrays.equals(storedValue.get(),operation.getValue()) && !willDeleteExistingRecord){ + // There is a value and we're not deleting it in this operation. + // Check that a delete for the stored value is in the stack. + throw new OperationStackIntegrityException("Database operation tries to delete with incorrect existing value "+operation); + }else if(operation.isDelete() && !hasStoredValue){ + throw new OperationStackIntegrityException("Database operation tries to delete nonexistent key: "+operation); + }else if(operation.isDelete() && !Arrays.equals(storedValue.get(),operation.getValue())){ + throw new OperationStackIntegrityException("Database operation tries to delete with incorrect value: "+operation); + } + }catch(OperationStackIntegrityException e){ + if(this.unsafePrefixes.contains(operation.getKey()[0])){ + System.err.println("Skipping over integrity error: "+e); + }else{ + throw e; + } + } + + RevertibleOperation[] operationArrX = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArrX = e.getValue(); + } + } + RevertibleOperation[] newArr = new RevertibleOperation[operationArrX==null?0:operationArrX.length]; + newArr[newArr.length-1] = operation; + this.items.put(newArr[0].getKey(),newArr); + } + + /** + * Apply a put or delete op, checking that it introduces no integrity errors + * @param operations + */ + public void multiPut(List operations){ + if(operations==null){ + return; + } + for(RevertibleOperation op : operations){ + if(!op.isPut){ + throw new RuntimeException("List must contain only put operations."); + } + } + Map keys = new HashMap<>(); + for(RevertibleOperation operation : operations){ + keys.put(operation.getKey(),operation); + } + if(keys.keySet().size()!=operations.size()){ + throw new RuntimeException("List must contain unique keys."); + } + + List needPut = new ArrayList<>(); + for(RevertibleOperation operation : operations){ + RevertibleOperation[] operationArr = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArr = e.getValue(); + } + } + if(operationArr!=null && operationArr.length>=1 && operation.invert().equals(operationArr[operationArr.length-1])){ + this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1)); + continue; + }else if(operationArr!=null && operationArr.length>=1 && operation.equals(operationArr[operationArr.length-1])){ + continue; // Raise an error? + }else{ + needPut.add(operation); + } + } + + Iterator> storedValues = this.multiGet.apply(needPut.stream().map(RevertibleOperation::getKey).collect(Collectors.toList())).iterator(); + for(RevertibleOperation operation : needPut){ + Optional storedValue = storedValues.next(); + + boolean hasStoredValue = storedValue.isPresent(); + RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue.get()):null; + RevertibleOperation[] operationArrX = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArrX = e.getValue(); + } + } + boolean deleteStoredOperationInOperationList = false; + if(operationArrX!=null){ + for(RevertibleOperation o : operationArrX){ + if(o.equals(deleteStoredOperation)){ + deleteStoredOperationInOperationList = true; + break; + } + } + } + boolean willDeleteExistingRecord = deleteStoredOperation!=null && deleteStoredOperationInOperationList; + + try{ + if(hasStoredValue && !willDeleteExistingRecord){ + throw new OperationStackIntegrityException("Database operation tries to overwrite before deleting existing: "+operation); + } + }catch(OperationStackIntegrityException e){ + if(this.unsafePrefixes.contains(operation.getKey()[0])){ + System.err.println("Skipping over integrity error: "+e); + }else{ + throw e; + } + } + + RevertibleOperation[] operationArr = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArr = e.getValue(); + } + } + RevertibleOperation[] newArr = new RevertibleOperation[operationArr==null?1:operationArr.length+1]; + newArr[newArr.length-1] = operation; + this.items.put(newArr[0].getKey(),newArr); + } + } + + /** + * Apply a put or delete op, checking that it introduces no integrity errors + * @param operations + */ + public void multiDelete(List operations){ + if(operations==null){ + return; + } + for(RevertibleOperation op : operations){ + if(op.isDelete()){ + throw new RuntimeException("List must contain only delete operations."); + } + } + Map keys = new HashMap<>(); + for(RevertibleOperation operation : operations){ + keys.put(operation.getKey(),operation); + } + if(keys.keySet().size()!=operations.size()){ + throw new RuntimeException("List must contain unique keys."); + } + + List needDelete = new ArrayList<>(); + for(RevertibleOperation operation : operations){ + RevertibleOperation[] operationArr = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArr = e.getValue(); + } + } + if(operationArr!=null && operationArr.length>=1 && operation.invert().equals(operationArr[operationArr.length-1])){ + this.items.put(operationArr[0].getKey(),Arrays.copyOfRange(operationArr,0,operationArr.length-1)); + continue; + }else if(operationArr!=null && operationArr.length>=1 && operation.equals(operationArr[operationArr.length-1])){ + continue; // Raise an error? + }else{ + needDelete.add(operation); + } + } + + Iterator> storedValues = this.multiGet.apply(needDelete.stream().map(RevertibleOperation::getKey).collect(Collectors.toList())).iterator(); + for(RevertibleOperation operation : needDelete){ + Optional storedValue = storedValues.next(); + + boolean hasStoredValue = storedValue.isPresent(); + RevertibleOperation deleteStoredOperation = hasStoredValue?new RevertibleDelete(operation.getKey(),storedValue.get()):null; + RevertibleOperation[] operationArrX = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArrX = e.getValue(); + } + } + boolean deleteStoredOperationInOperationList = false; + if(operationArrX!=null){ + for(RevertibleOperation o : operationArrX){ + if(o.equals(deleteStoredOperation)){ + deleteStoredOperationInOperationList = true; + break; + } + } + } + boolean willDeleteExistingRecord = deleteStoredOperation!=null && deleteStoredOperationInOperationList; + + try{ + if(operation.isDelete() && hasStoredValue && Arrays.equals(storedValue.get(),operation.getValue()) && !willDeleteExistingRecord){ + // There is a value and we're not deleting it in this operation. + // Check that a delete for the stored value is in the stack. + throw new OperationStackIntegrityException("Database operation tries to delete with incorrect existing value "+operation); + }else if(!storedValue.isPresent()){ + throw new OperationStackIntegrityException("Database operation tries to delete nonexistent key: "+operation); + }else if(operation.isDelete() && Arrays.equals(storedValue.get(),operation.getValue())){ + throw new OperationStackIntegrityException("Database operation tries to delete with incorrect value: "+operation); + } + }catch(OperationStackIntegrityException e){ + if(this.unsafePrefixes.contains(operation.getKey()[0])){ + System.err.println("Skipping over integrity error: "+e); + }else{ + throw e; + } + } + + RevertibleOperation[] operationArr = null; + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),operation.getKey())){ + operationArr = e.getValue(); + } + } + RevertibleOperation[] newArr = new RevertibleOperation[operationArr==null?1:operationArr.length+1]; + newArr[newArr.length-1] = operation; + this.items.put(newArr[0].getKey(),newArr); + } } public void clear(){ + this.items.clear(); + this.stash.clear(); + this.stashedLastOperationForKey.clear(); + } + /** + * Get the serialized bytes to undo all of the changes made by the pending ops + */ + public byte[] getUndoOperations(){ + List reversed = new ArrayList<>(); + for(Map.Entry e : this.items.entrySet()){ + List operations = Arrays.asList(e.getValue()); + Collections.reverse(operations); + reversed.addAll(operations); + } + List invertedAndPacked = new ArrayList<>(); + int size = 0; + for(RevertibleOperation operation : reversed){ + byte[] undoOperation = operation.invert().pack(); + invertedAndPacked.add(undoOperation); + size += undoOperation.length; + } + ByteBuffer bb = ByteBuffer.allocate(size); + for(byte[] packed : invertedAndPacked){ + bb.put(packed); + } + return bb.array(); + } + + /** + * Unpack and apply a sequence of undo ops from serialized undo bytes + * @param packed + */ + public void applyPackedUndoOperations(byte[] packed){ + while(packed.length>0){ + Tuple2 unpacked = RevertibleOperation.unpack(packed); + this.appendOperation(unpacked.getA()); + packed = unpacked.getB(); + } + } + + public Optional getPendingOperation(byte[] key){ + for(Map.Entry e : this.stashedLastOperationForKey.entrySet()){ + if(Arrays.equals(e.getKey(),key)){ + return Optional.of(e.getValue()); + } + } + for(Map.Entry e : this.items.entrySet()){ + if(Arrays.equals(e.getKey(),key)){ + if(e.getValue().length>=1){ + return Optional.of(e.getValue()[e.getValue().length-1]); + } + } + } + return Optional.empty(); } } \ No newline at end of file diff --git a/src/main/java/com/lbry/database/rows/PrefixRow.java b/src/main/java/com/lbry/database/rows/PrefixRow.java index a320108..ad644f0 100644 --- a/src/main/java/com/lbry/database/rows/PrefixRow.java +++ b/src/main/java/com/lbry/database/rows/PrefixRow.java @@ -5,6 +5,7 @@ import com.lbry.database.PrefixDB; import com.lbry.database.keys.KeyInterface; import com.lbry.database.values.ValueInterface; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -17,10 +18,13 @@ import org.rocksdb.RocksIterator; public abstract class PrefixRow{ + public static final Map> TYPES = new HashMap<>(); + private final PrefixDB database; public PrefixRow(PrefixDB database){ this.database = database; + PrefixRow.TYPES.put(this.prefix(),this); } public RocksIterator iterate() throws RocksDBException{ diff --git a/src/main/java/com/lbry/database/util/Tuple2.java b/src/main/java/com/lbry/database/util/Tuple2.java new file mode 100644 index 0000000..9cd47a2 --- /dev/null +++ b/src/main/java/com/lbry/database/util/Tuple2.java @@ -0,0 +1,21 @@ +package com.lbry.database.util; + +public class Tuple2{ + + private final A a; + private final B b; + + public Tuple2(A a,B b){ + this.a = a; + this.b = b; + } + + public A getA() { + return this.a; + } + + public B getB() { + return this.b; + } + +} \ No newline at end of file