From 79fc5930ea75d3b33dba81a1994fca2279c8fd10 Mon Sep 17 00:00:00 2001 From: Ben van Hartingsveldt Date: Tue, 15 Jul 2025 15:23:33 +0200 Subject: [PATCH] Improve DHT --- .../globe/thread/DHTNodeFinderThread.java | 20 +------- .../com/lbry/globe/util/BencodeConverter.java | 4 +- src/main/java/com/lbry/globe/util/DHT.java | 51 +++++++++++++------ 3 files changed, 39 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java index ba8607a..e235713 100644 --- a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java +++ b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java @@ -38,7 +38,7 @@ public class DHTNodeFinderThread implements Runnable{ } this.startSender(); - this.startReceiver(); + DHT.startReceiver(); } private void startSender(){ @@ -120,22 +120,4 @@ public class DHTNodeFinderThread implements Runnable{ }).exceptionally((Throwable e) -> null); } - private void startReceiver(){ - new Thread(() -> { - while(DHT.getSocket().isBound()) { - try { - UDP.Packet receiverPacket = UDP.receive(DHT.getSocket()); - - byte[] receivingBytes = receiverPacket.getData(); - - DHT.Message message = DHT.Message.fromBencode(receivingBytes); - DHT.RPCID rpcid = new DHT.RPCID(message); - DHT.getFutureManager().finishFuture(rpcid,receiverPacket); - } catch (IOException e) { - e.printStackTrace(); - } - } - },"DHT Receiver").start(); - } - } diff --git a/src/main/java/com/lbry/globe/util/BencodeConverter.java b/src/main/java/com/lbry/globe/util/BencodeConverter.java index 60c34e2..8ff208a 100644 --- a/src/main/java/com/lbry/globe/util/BencodeConverter.java +++ b/src/main/java/com/lbry/globe/util/BencodeConverter.java @@ -32,10 +32,10 @@ public class BencodeConverter{ } // Normal B-decoding - return BencodeConverter.BENCODE.decode(bytes,Type.DICTIONARY); + return BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(BencodeConverter.BENCODE.decode(bytes,Type.DICTIONARY)); } - public static V walkAndConvertByteBufferToByteArrayOrString(Object value){ + private static V walkAndConvertByteBufferToByteArrayOrString(Object value){ if(value instanceof ByteBuffer){ ByteBuffer bb = (ByteBuffer) value; byte[] ba = bb.array(); diff --git a/src/main/java/com/lbry/globe/util/DHT.java b/src/main/java/com/lbry/globe/util/DHT.java index 77ae2fe..56b8227 100644 --- a/src/main/java/com/lbry/globe/util/DHT.java +++ b/src/main/java/com/lbry/globe/util/DHT.java @@ -38,30 +38,27 @@ public class DHT{ } public static CompletableFuture ping(DatagramSocket socket,InetSocketAddress destination) throws IOException { - byte[] rpcID = new byte[20]; - new Random().nextBytes(rpcID); - - DHT.Message pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1))); + DHT.Message pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,DHT.RPCID.generate(),DHT.NODE_ID,"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1))); return DHT.sendWithFuture(socket,destination,pingMessage); } public static CompletableFuture findNode(DatagramSocket socket,InetSocketAddress destination,byte[] key) throws IOException{ - byte[] rpcID = new byte[20]; - new Random().nextBytes(rpcID); - - DHT.Message findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"findNode",Arrays.asList(key,Collections.singletonMap("protocolVersion",1))); + DHT.Message findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,DHT.RPCID.generate(),DHT.NODE_ID,"findNode",Arrays.asList(key,Collections.singletonMap("protocolVersion",1))); return DHT.sendWithFuture(socket,destination,findNodeMessage); } public static CompletableFuture findValue(DatagramSocket socket,InetSocketAddress destination,byte[] key) throws IOException{ - byte[] rpcID = new byte[20]; - new Random().nextBytes(rpcID); + DHT.Message findValueMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,DHT.RPCID.generate(),DHT.NODE_ID,"findValue",Arrays.asList(key,Collections.singletonMap("protocolVersion",1))); - DHT.Message findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"findValue",Arrays.asList(key,Collections.singletonMap("protocolVersion",1))); + return DHT.sendWithFuture(socket,destination,findValueMessage); + } - return DHT.sendWithFuture(socket,destination,findNodeMessage); + public static CompletableFuture store(DatagramSocket socket,InetSocketAddress destination) throws IOException{ + DHT.Message storeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,DHT.RPCID.generate(),DHT.NODE_ID,"store"); + + return DHT.sendWithFuture(socket,destination,storeMessage); } protected static CompletableFuture sendWithFuture(DatagramSocket socket,InetSocketAddress destination, DHT.Message message) throws IOException{ @@ -144,12 +141,12 @@ public class DHT{ private DHT.Message

setFromBencode(byte[] data){ Map dictionary = BencodeConverter.decode(data); this.type = ((Long) dictionary.get("0")).intValue(); - this.rpcID = ((ByteBuffer) dictionary.get("1")).array(); - this.nodeID = ((ByteBuffer) dictionary.get("2")).array(); + this.rpcID = (byte[]) dictionary.get("1"); + this.nodeID = (byte[]) dictionary.get("2"); this.payload = null; if(dictionary.containsKey("3")){ Object payload = dictionary.get("3"); - this.payload = BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(payload); + this.payload = (P) payload; } this.arguments = null; if(dictionary.containsKey("4")){ @@ -175,6 +172,24 @@ public class DHT{ } + public static void startReceiver(){ + new Thread(() -> { + while(DHT.getSocket().isBound()) { + try { + UDP.Packet receiverPacket = UDP.receive(DHT.getSocket()); + + byte[] receivingBytes = receiverPacket.getData(); + + DHT.Message message = DHT.Message.fromBencode(receivingBytes); + DHT.RPCID rpcid = new DHT.RPCID(message); + DHT.getFutureManager().finishFuture(rpcid,receiverPacket); + } catch (IOException e) { + e.printStackTrace(); + } + } + },"DHT Receiver").start(); + } + public static class RPCID{ private final byte[] id; @@ -208,6 +223,12 @@ public class DHT{ '}'; } + public static byte[] generate(){ + byte[] rpcID = new byte[20]; + new Random().nextBytes(rpcID); + return rpcID; + } + } }