Improve DHT
This commit is contained in:
parent
6fd1a3f426
commit
79fc5930ea
3 changed files with 39 additions and 36 deletions
|
@ -38,7 +38,7 @@ public class DHTNodeFinderThread implements Runnable{
|
|||
}
|
||||
|
||||
this.startSender();
|
||||
this.startReceiver();
|
||||
DHT.startReceiver();
|
||||
}
|
||||
|
||||
private void startSender(){
|
||||
|
@ -120,22 +120,4 @@ public class DHTNodeFinderThread implements Runnable{
|
|||
}).exceptionally((Throwable e) -> null);
|
||||
}
|
||||
|
||||
private void startReceiver(){
|
||||
new Thread(() -> {
|
||||
while(DHT.getSocket().isBound()) {
|
||||
try {
|
||||
UDP.Packet receiverPacket = UDP.receive(DHT.getSocket());
|
||||
|
||||
byte[] receivingBytes = receiverPacket.getData();
|
||||
|
||||
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
|
||||
DHT.RPCID rpcid = new DHT.RPCID(message);
|
||||
DHT.getFutureManager().finishFuture(rpcid,receiverPacket);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
},"DHT Receiver").start();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,10 +32,10 @@ public class BencodeConverter{
|
|||
}
|
||||
|
||||
// Normal B-decoding
|
||||
return BencodeConverter.BENCODE.decode(bytes,Type.DICTIONARY);
|
||||
return BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(BencodeConverter.BENCODE.decode(bytes,Type.DICTIONARY));
|
||||
}
|
||||
|
||||
public static <V> V walkAndConvertByteBufferToByteArrayOrString(Object value){
|
||||
private static <V> V walkAndConvertByteBufferToByteArrayOrString(Object value){
|
||||
if(value instanceof ByteBuffer){
|
||||
ByteBuffer bb = (ByteBuffer) value;
|
||||
byte[] ba = bb.array();
|
||||
|
|
|
@ -38,30 +38,27 @@ public class DHT{
|
|||
}
|
||||
|
||||
public static CompletableFuture<UDP.Packet> ping(DatagramSocket socket,InetSocketAddress destination) throws IOException {
|
||||
byte[] rpcID = new byte[20];
|
||||
new Random().nextBytes(rpcID);
|
||||
|
||||
DHT.Message<String> pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1)));
|
||||
DHT.Message<String> pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,DHT.RPCID.generate(),DHT.NODE_ID,"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1)));
|
||||
|
||||
return DHT.sendWithFuture(socket,destination,pingMessage);
|
||||
}
|
||||
|
||||
public static CompletableFuture<UDP.Packet> findNode(DatagramSocket socket,InetSocketAddress destination,byte[] key) throws IOException{
|
||||
byte[] rpcID = new byte[20];
|
||||
new Random().nextBytes(rpcID);
|
||||
|
||||
DHT.Message<String> findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"findNode",Arrays.asList(key,Collections.singletonMap("protocolVersion",1)));
|
||||
DHT.Message<String> findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,DHT.RPCID.generate(),DHT.NODE_ID,"findNode",Arrays.asList(key,Collections.singletonMap("protocolVersion",1)));
|
||||
|
||||
return DHT.sendWithFuture(socket,destination,findNodeMessage);
|
||||
}
|
||||
|
||||
public static CompletableFuture<UDP.Packet> findValue(DatagramSocket socket,InetSocketAddress destination,byte[] key) throws IOException{
|
||||
byte[] rpcID = new byte[20];
|
||||
new Random().nextBytes(rpcID);
|
||||
DHT.Message<String> findValueMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,DHT.RPCID.generate(),DHT.NODE_ID,"findValue",Arrays.asList(key,Collections.singletonMap("protocolVersion",1)));
|
||||
|
||||
DHT.Message<String> findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"findValue",Arrays.asList(key,Collections.singletonMap("protocolVersion",1)));
|
||||
return DHT.sendWithFuture(socket,destination,findValueMessage);
|
||||
}
|
||||
|
||||
return DHT.sendWithFuture(socket,destination,findNodeMessage);
|
||||
public static CompletableFuture<UDP.Packet> store(DatagramSocket socket,InetSocketAddress destination) throws IOException{
|
||||
DHT.Message<String> storeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,DHT.RPCID.generate(),DHT.NODE_ID,"store");
|
||||
|
||||
return DHT.sendWithFuture(socket,destination,storeMessage);
|
||||
}
|
||||
|
||||
protected static CompletableFuture<UDP.Packet> sendWithFuture(DatagramSocket socket,InetSocketAddress destination, DHT.Message<?> message) throws IOException{
|
||||
|
@ -144,12 +141,12 @@ public class DHT{
|
|||
private DHT.Message<P> setFromBencode(byte[] data){
|
||||
Map<String,?> dictionary = BencodeConverter.decode(data);
|
||||
this.type = ((Long) dictionary.get("0")).intValue();
|
||||
this.rpcID = ((ByteBuffer) dictionary.get("1")).array();
|
||||
this.nodeID = ((ByteBuffer) dictionary.get("2")).array();
|
||||
this.rpcID = (byte[]) dictionary.get("1");
|
||||
this.nodeID = (byte[]) dictionary.get("2");
|
||||
this.payload = null;
|
||||
if(dictionary.containsKey("3")){
|
||||
Object payload = dictionary.get("3");
|
||||
this.payload = BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(payload);
|
||||
this.payload = (P) payload;
|
||||
}
|
||||
this.arguments = null;
|
||||
if(dictionary.containsKey("4")){
|
||||
|
@ -175,6 +172,24 @@ public class DHT{
|
|||
|
||||
}
|
||||
|
||||
public static void startReceiver(){
|
||||
new Thread(() -> {
|
||||
while(DHT.getSocket().isBound()) {
|
||||
try {
|
||||
UDP.Packet receiverPacket = UDP.receive(DHT.getSocket());
|
||||
|
||||
byte[] receivingBytes = receiverPacket.getData();
|
||||
|
||||
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
|
||||
DHT.RPCID rpcid = new DHT.RPCID(message);
|
||||
DHT.getFutureManager().finishFuture(rpcid,receiverPacket);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
},"DHT Receiver").start();
|
||||
}
|
||||
|
||||
public static class RPCID{
|
||||
|
||||
private final byte[] id;
|
||||
|
@ -208,6 +223,12 @@ public class DHT{
|
|||
'}';
|
||||
}
|
||||
|
||||
public static byte[] generate(){
|
||||
byte[] rpcID = new byte[20];
|
||||
new Random().nextBytes(rpcID);
|
||||
return rpcID;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue