From eaa2f7e296b85c08ab4f6397ed2f472c35190d48 Mon Sep 17 00:00:00 2001 From: Ben van Hartingsveldt Date: Sat, 12 Jul 2025 19:12:21 +0200 Subject: [PATCH] Add new endpoint --- .../com/lbry/globe/handler/HTTPHandler.java | 106 ++++++++++++++++++ .../globe/thread/DHTNodeFinderThread.java | 29 ++--- src/main/java/com/lbry/globe/util/DHT.java | 55 +++++++-- .../lbry/globe/util/TimeoutFutureManager.java | 13 +++ 4 files changed, 170 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/lbry/globe/handler/HTTPHandler.java b/src/main/java/com/lbry/globe/handler/HTTPHandler.java index b8ad824..98c904c 100644 --- a/src/main/java/com/lbry/globe/handler/HTTPHandler.java +++ b/src/main/java/com/lbry/globe/handler/HTTPHandler.java @@ -3,6 +3,10 @@ package com.lbry.globe.handler; import com.lbry.globe.Main; import com.lbry.globe.api.API; +import com.lbry.globe.util.DHT; +import com.lbry.globe.util.Hex; +import com.lbry.globe.util.TimeoutFutureManager; +import com.lbry.globe.util.UDP; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -13,9 +17,11 @@ import io.netty.util.AttributeKey; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.security.MessageDigest; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.logging.Level; import java.util.logging.Logger; @@ -85,6 +91,106 @@ public class HTTPHandler extends ChannelInboundHandlerAdapter{ ctx.write(response); return; } + if("/api/command".equals(uri.getPath())){ + JSONObject json = new JSONObject(); + + String[] queryParts = uri.getQuery()!=null?uri.getQuery().split(";"):new String[]{""}; + if("ping".equals(queryParts[0]) || "findNode".equals(queryParts[0]) || "findValue".equals(queryParts[0])){ + //STORE IS NOT SUPPORTED + json.put("query",queryParts); + + Map peers = DHT.getPeers(); + CompletableFuture[] futures = new CompletableFuture[peers.size()]; + int i=0; + for(Map.Entry entry : peers.entrySet()){ + try{ + if("ping".equals(queryParts[0])){ + futures[i] = DHT.ping(DHT.getSocket(),entry.getKey()); + } + if("findNode".equals(queryParts[0])){ + futures[i] = DHT.findNode(DHT.getSocket(),entry.getKey(),queryParts.length>=2?Hex.decode(queryParts[1]):new byte[48]); + } + if("findValue".equals(queryParts[0])){ + futures[i] = DHT.findValue(DHT.getSocket(),entry.getKey(),queryParts.length>=2?Hex.decode(queryParts[1]):new byte[48]); + } + }catch(IOException ignored){} + i++; + } + + CompletableFuture> total = TimeoutFutureManager.getBulk(futures); + + JSONObject jsonData = new JSONObject(); + json.put("data",jsonData); + try{ + List responses = total.get(); + for(UDP.Packet resp : responses){ + if(resp!=null){ + DHT.Message message = DHT.Message.fromBencode(resp.getData()); + if("ping".equals(queryParts[0])){ + String pong = (String) message.getPayload(); + jsonData.put(resp.getAddress().getAddress().getHostAddress()+":"+resp.getAddress().getPort(),pong); + } + if("findNode".equals(queryParts[0])){ + JSONArray payload = new JSONArray(); + List> nodes = (List>) message.getPayload(); + for(List node : nodes){ + JSONObject p = new JSONObject(); + p.put("nodeID", Hex.encode((byte[]) node.get(0))); + p.put("hostname",node.get(1)); + p.put("port",node.get(2)); + payload.put(p); + } + jsonData.put(resp.getAddress().getAddress().getHostAddress()+":"+resp.getAddress().getPort(),payload); + } + if("findValue".equals(queryParts[0])){ + Map map = (Map) message.getPayload(); + JSONObject payload = new JSONObject(); + payload.put("p",map.get("p")); + payload.put("protocolVersion",map.get("protocolVersion")); + JSONArray contacts = new JSONArray(); + List> nodes = (List>) map.get("contacts"); + for(List node : nodes){ + JSONObject p = new JSONObject(); + p.put("nodeID", Hex.encode((byte[]) node.get(0))); + p.put("hostname",node.get(1)); + p.put("port",node.get(2)); + contacts.put(p); + } + payload.put("contacts",contacts); + //payload.put("token",Hex.encode((byte[]) map.get("token"))); + jsonData.put(resp.getAddress().getAddress().getHostAddress()+":"+resp.getAddress().getPort(),payload); + } + } + } + }catch(Exception e){ + e.printStackTrace(); + } + + + +// for(Map.Entry dest : DHT.getPeers().entrySet()){ +// if(!dest.getValue()){ +// try{ +// +// UDP.Packet packet = DHT.ping(DHT.getSocket(),dest.getKey()).get(1, TimeUnit.SECONDS); +// DHT.Message message = DHT.Message.fromBencode(packet.getData()); +// json.put(dest.getKey().toString(),message.getPayload()); +// }catch(Exception e){ +// json.put(dest.getKey().toString(),e.toString()); +// } +// } +// } + }else{ + json.put("error","Expecting one of 'ping','findNode' or 'findValue'."); + } + + ByteBuf responseContent = Unpooled.copiedBuffer(json.toString().getBytes()); + FullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(),HttpResponseStatus.OK,responseContent); + response.headers().add("Content-Length",responseContent.capacity()); + response.headers().add("Content-Type","application/json"); + ctx.write(response); + return; + } byte[] fileData = null; try{ fileData = HTTPHandler.readResource(HTTPHandler.getResource(uri.getPath().substring(1))); diff --git a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java index 8d92837..24ea16a 100644 --- a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java +++ b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java @@ -10,7 +10,6 @@ import com.lbry.globe.util.UDP; import java.io.IOException; import java.net.*; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import org.json.JSONObject; @@ -29,25 +28,13 @@ public class DHTNodeFinderThread implements Runnable{ "s2.lbry.network:4444", }; - private static final DatagramSocket SOCKET; - - static{ - try{ - SOCKET = new DatagramSocket(); - }catch(SocketException e){ - throw new RuntimeException(e); - } - } - - private final Map pingableDHTs = new ConcurrentHashMap<>(); - private final Queue incoming = new ConcurrentLinkedQueue<>(); @Override public void run(){ for(String bootstrap : DHTNodeFinderThread.BOOTSTRAP){ URI uri = URI.create("udp://"+bootstrap); - this.pingableDHTs.put(new InetSocketAddress(uri.getHost(),uri.getPort()),true); + DHT.getPeers().put(new InetSocketAddress(uri.getHost(),uri.getPort()),true); } this.startSender(); @@ -59,7 +46,7 @@ public class DHTNodeFinderThread implements Runnable{ new Thread(() -> { while(true){ System.out.println("[BULK PING]"); - for(InetSocketAddress socketAddress : DHTNodeFinderThread.this.pingableDHTs.keySet()){ + for(InetSocketAddress socketAddress : DHT.getPeers().keySet()){ String hostname = socketAddress.getHostName(); int port = socketAddress.getPort(); try{ @@ -82,7 +69,7 @@ public class DHTNodeFinderThread implements Runnable{ } private void doPing(InetSocketAddress destination) throws IOException{ - DHT.ping(DHTNodeFinderThread.SOCKET,destination).thenAccept((UDP.Packet packet) -> { + DHT.ping(DHT.getSocket(),destination).thenAccept((UDP.Packet packet) -> { byte[] receivingBytes = packet.getData(); DHT.Message message = DHT.Message.fromBencode(receivingBytes); System.out.println(" - [Ping Response] "+message); @@ -119,7 +106,7 @@ public class DHTNodeFinderThread implements Runnable{ } private void doFindNode(InetSocketAddress destination) throws IOException{ - DHT.findNode(DHTNodeFinderThread.SOCKET,destination).thenAccept((UDP.Packet packet) -> { + DHT.findNode(DHT.getSocket(),destination,new byte[48]).thenAccept((UDP.Packet packet) -> { byte[] receivingBytes = packet.getData(); DHT.Message message = DHT.Message.fromBencode(receivingBytes); System.out.println(" - [FindNode Response] "+message); @@ -129,13 +116,13 @@ public class DHTNodeFinderThread implements Runnable{ String hostname = (String) n.get(1); int port = (int) ((long) n.get(2)); InetSocketAddress existingSocketAddr = null; - for(InetSocketAddress addr : this.pingableDHTs.keySet()){ + for(InetSocketAddress addr : DHT.getPeers().keySet()){ if(addr.getHostName().equals(hostname) && addr.getPort()==port){ existingSocketAddr = addr; } } if(existingSocketAddr==null){ - this.pingableDHTs.put(new InetSocketAddress(hostname,port),false); + DHT.getPeers().put(new InetSocketAddress(hostname,port),false); } } }).exceptionally((Throwable e) -> null); @@ -145,7 +132,7 @@ public class DHTNodeFinderThread implements Runnable{ new Thread(() -> { while(true) { try { - UDP.Packet receiverPacket = UDP.receive(DHTNodeFinderThread.SOCKET); + UDP.Packet receiverPacket = UDP.receive(DHT.getSocket()); DHTNodeFinderThread.this.incoming.add(receiverPacket); byte[] receivingBytes = receiverPacket.getData(); @@ -161,7 +148,7 @@ public class DHTNodeFinderThread implements Runnable{ } private void handleIncomingMessages(){ - while(DHTNodeFinderThread.SOCKET.isBound()){ + while(DHT.getSocket().isBound()){ while(this.incoming.peek()!=null){ UDP.Packet receiverPacket = this.incoming.poll(); byte[] receivingBytes = receiverPacket.getData(); diff --git a/src/main/java/com/lbry/globe/util/DHT.java b/src/main/java/com/lbry/globe/util/DHT.java index 8d29a2a..ff56529 100644 --- a/src/main/java/com/lbry/globe/util/DHT.java +++ b/src/main/java/com/lbry/globe/util/DHT.java @@ -3,36 +3,63 @@ package com.lbry.globe.util; import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; +import java.net.SocketException; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; public class DHT{ - private static final ScheduledExecutorService ses = Executors.newScheduledThreadPool(12); - private static final TimeoutFutureManager futureManager = new TimeoutFutureManager<>(ses); + public static byte[] NODE_ID = new byte[48]; + + private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private static final TimeoutFutureManager futureManager = new TimeoutFutureManager<>(executor); + private static final Map peers = new ConcurrentHashMap<>(); + private static final DatagramSocket socket; + + static{ + try{ + socket = new DatagramSocket(); + }catch(SocketException e){ + throw new RuntimeException(e); + } + } public static TimeoutFutureManager getFutureManager(){ return DHT.futureManager; } + public static DatagramSocket getSocket(){ + return DHT.socket; + } + + public static Map getPeers(){ + return DHT.peers; + } + public static CompletableFuture ping(DatagramSocket socket,InetSocketAddress destination) throws IOException { byte[] rpcID = new byte[20]; new Random().nextBytes(rpcID); - DHT.Message pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,new byte[48],"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1))); + DHT.Message pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1))); return DHT.sendWithFuture(socket,destination,pingMessage); } - public static CompletableFuture findNode(DatagramSocket socket,InetSocketAddress destination) throws IOException{ + public static CompletableFuture findNode(DatagramSocket socket,InetSocketAddress destination,byte[] key) throws IOException{ byte[] rpcID = new byte[20]; new Random().nextBytes(rpcID); - DHT.Message findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,new byte[48],"findNode",Arrays.asList(new byte[48],Collections.singletonMap("protocolVersion",1))); + DHT.Message findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"findNode",Arrays.asList(key,Collections.singletonMap("protocolVersion",1))); + + return DHT.sendWithFuture(socket,destination,findNodeMessage); + } + + public static CompletableFuture findValue(DatagramSocket socket,InetSocketAddress destination,byte[] key) throws IOException{ + byte[] rpcID = new byte[20]; + new Random().nextBytes(rpcID); + + DHT.Message findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,DHT.NODE_ID,"findValue",Arrays.asList(key,Collections.singletonMap("protocolVersion",1))); return DHT.sendWithFuture(socket,destination,findNodeMessage); } @@ -40,7 +67,7 @@ public class DHT{ protected static CompletableFuture sendWithFuture(DatagramSocket socket,InetSocketAddress destination, DHT.Message message) throws IOException{ UDP.send(socket,new UDP.Packet(destination,message.toBencode())); RPCID key = new RPCID(message); - return DHT.futureManager.createFuture(key,5,TimeUnit.SECONDS); + return DHT.futureManager.createFuture(key,1,TimeUnit.SECONDS); } public static class Message

{ @@ -96,7 +123,7 @@ public class DHT{ return this.arguments; } - public byte[] toBencode(){ + public Map toMap(){ Map dictionary = new HashMap<>(); dictionary.put("0",this.type); dictionary.put("1",this.rpcID); @@ -107,7 +134,11 @@ public class DHT{ if(this.arguments!=null){ dictionary.put("4",this.arguments); } - return BencodeConverter.encode(dictionary); + return dictionary; + } + + public byte[] toBencode(){ + return BencodeConverter.encode(this.toMap()); } private DHT.Message

setFromBencode(byte[] data){ diff --git a/src/main/java/com/lbry/globe/util/TimeoutFutureManager.java b/src/main/java/com/lbry/globe/util/TimeoutFutureManager.java index 340d222..ec59ebc 100644 --- a/src/main/java/com/lbry/globe/util/TimeoutFutureManager.java +++ b/src/main/java/com/lbry/globe/util/TimeoutFutureManager.java @@ -1,6 +1,9 @@ package com.lbry.globe.util; +import java.util.List; import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class TimeoutFutureManager{ @@ -33,4 +36,14 @@ public class TimeoutFutureManager{ } } + public static CompletableFuture> getBulk(CompletableFuture[] futures){ + return CompletableFuture.allOf(futures).exceptionally((t) -> null).thenApply((v) -> Stream.of(futures).map(future -> { + try{ + return future.join(); + }catch(Exception e){ + return null; + } + }).collect(Collectors.toList())); + } + } \ No newline at end of file