diff --git a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java index 9e2343c..8d92837 100644 --- a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java +++ b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java @@ -1,11 +1,11 @@ package com.lbry.globe.thread; -import com.dampcake.bencode.Bencode; -import com.dampcake.bencode.Type; import com.lbry.globe.api.API; import com.lbry.globe.object.Node; import com.lbry.globe.object.Service; +import com.lbry.globe.util.DHT; import com.lbry.globe.util.GeoIP; +import com.lbry.globe.util.UDP; import java.io.IOException; import java.net.*; @@ -17,8 +17,6 @@ import org.json.JSONObject; public class DHTNodeFinderThread implements Runnable{ - private static final Bencode BENCODE = new Bencode(); - public static final String[] BOOTSTRAP = { "dht.lbry.grin.io:4444", // Grin "dht.lbry.madiator.com:4444", // Madiator @@ -43,7 +41,7 @@ public class DHTNodeFinderThread implements Runnable{ private final Map pingableDHTs = new ConcurrentHashMap<>(); - private final Queue incoming = new ConcurrentLinkedQueue<>(); + private final Queue incoming = new ConcurrentLinkedQueue<>(); @Override public void run(){ @@ -52,15 +50,22 @@ public class DHTNodeFinderThread implements Runnable{ this.pingableDHTs.put(new InetSocketAddress(uri.getHost(),uri.getPort()),true); } - // Ping Sender + this.startSender(); + this.startReceiver(); + this.handleIncomingMessages(); + } + + private void startSender(){ new Thread(() -> { while(true){ + System.out.println("[BULK PING]"); for(InetSocketAddress socketAddress : DHTNodeFinderThread.this.pingableDHTs.keySet()){ String hostname = socketAddress.getHostName(); int port = socketAddress.getPort(); try{ for(InetAddress ip : InetAddress.getAllByName(hostname)){ - DHTNodeFinderThread.ping(ip,port); + InetSocketAddress destination = new InetSocketAddress(ip,port); + this.doPing(destination); } }catch(Exception e){ e.printStackTrace(); @@ -71,147 +76,102 @@ public class DHTNodeFinderThread implements Runnable{ } catch (InterruptedException e) { throw new RuntimeException(e); } + API.saveNodes(); } }).start(); + } - // Receiver + private void doPing(InetSocketAddress destination) throws IOException{ + DHT.ping(DHTNodeFinderThread.SOCKET,destination).thenAccept((UDP.Packet packet) -> { + byte[] receivingBytes = packet.getData(); + DHT.Message message = DHT.Message.fromBencode(receivingBytes); + System.out.println(" - [Ping Response] "+message); + + try{ + this.doFindNode(packet.getAddress()); + }catch(Exception e){ + e.printStackTrace(); + } + + //TODO Improve updating pinged nodes. + + Node existingNode = API.NODES.get(packet.getAddress().getAddress()); + if(existingNode==null){ + JSONObject geoData = GeoIP.getCachedGeoIPInformation(packet.getDatagramPacket().getAddress()); + Double[] coords = GeoIP.getCoordinateFromLocation((geoData!=null && geoData.has("loc"))?geoData.getString("loc"):null); + existingNode = new Node(packet.getDatagramPacket().getAddress(),coords[0],coords[1]); + API.NODES.put(packet.getDatagramPacket().getAddress(),existingNode); + } + Service dhtService = null; + for(Service s : existingNode.getServices()){ + if(s.getPort()==packet.getDatagramPacket().getPort() && "dht".equals(s.getType())){ + dhtService = s; + break; + } + } + + if(dhtService==null){ + existingNode.getServices().add(new Service(UUID.randomUUID(),packet.getDatagramPacket().getPort(),"dht")); + }else{ + dhtService.updateLastSeen(); + } + }).exceptionally((Throwable e) -> null); + } + + private void doFindNode(InetSocketAddress destination) throws IOException{ + DHT.findNode(DHTNodeFinderThread.SOCKET,destination).thenAccept((UDP.Packet packet) -> { + byte[] receivingBytes = packet.getData(); + DHT.Message message = DHT.Message.fromBencode(receivingBytes); + System.out.println(" - [FindNode Response] "+message); + + List> nodes = (List>) message.getPayload(); + for(List n : nodes){ + String hostname = (String) n.get(1); + int port = (int) ((long) n.get(2)); + InetSocketAddress existingSocketAddr = null; + for(InetSocketAddress addr : this.pingableDHTs.keySet()){ + if(addr.getHostName().equals(hostname) && addr.getPort()==port){ + existingSocketAddr = addr; + } + } + if(existingSocketAddr==null){ + this.pingableDHTs.put(new InetSocketAddress(hostname,port),false); + } + } + }).exceptionally((Throwable e) -> null); + } + + private void startReceiver(){ new Thread(() -> { while(true) { try { - byte[] buffer = new byte[1024]; - DatagramPacket receiverPacket = new DatagramPacket(buffer, buffer.length); - DHTNodeFinderThread.SOCKET.receive(receiverPacket); + UDP.Packet receiverPacket = UDP.receive(DHTNodeFinderThread.SOCKET); DHTNodeFinderThread.this.incoming.add(receiverPacket); + + byte[] receivingBytes = receiverPacket.getData(); + + DHT.Message message = DHT.Message.fromBencode(receivingBytes); + DHT.RPCID rpcid = new DHT.RPCID(message); + DHT.getFutureManager().finishFuture(rpcid,receiverPacket); } catch (IOException e) { e.printStackTrace(); } } }).start(); + } - while(true){ - - //TODO: MARKS AS DELETED - + private void handleIncomingMessages(){ + while(DHTNodeFinderThread.SOCKET.isBound()){ while(this.incoming.peek()!=null){ - DatagramPacket receiverPacket = this.incoming.poll(); + UDP.Packet receiverPacket = this.incoming.poll(); byte[] receivingBytes = receiverPacket.getData(); - Map receivingDictionary = DHTNodeFinderThread.decodePacket(receivingBytes); - if(receivingDictionary.get("0").equals(1L)){ - if(receivingDictionary.get("3").equals("pong")){ - try{ - DHTNodeFinderThread.findNode(receiverPacket.getAddress(),receiverPacket.getPort()); - }catch(Exception e){ - e.printStackTrace(); - } - //TODO Improve updating pinged nodes. - System.out.println("PONG: "+receiverPacket.getSocketAddress()); - - Node existingNode = API.NODES.get(receiverPacket.getAddress()); - if(existingNode==null){ - JSONObject geoData = GeoIP.getCachedGeoIPInformation(receiverPacket.getAddress()); - Double[] coords = GeoIP.getCoordinateFromLocation((geoData!=null && geoData.has("loc"))?geoData.getString("loc"):null); - existingNode = new Node(receiverPacket.getAddress(),coords[0],coords[1]); - API.NODES.put(receiverPacket.getAddress(),existingNode); - } - Service dhtService = null; - for(Service s : existingNode.getServices()){ - if(s.getPort()==receiverPacket.getPort() && "dht".equals(s.getType())){ - dhtService = s; - break; - } - } - - if(dhtService==null){ - existingNode.getServices().add(new Service(UUID.randomUUID(),receiverPacket.getPort(),"dht")); - }else{ - dhtService.updateLastSeen(); - } - }else{ - //TODO Save connections too - List> nodes = (List>) receivingDictionary.get("3"); - for(List n : nodes){ - String hostname = (String) n.get(1); - int port = (int) ((long) n.get(2)); - InetSocketAddress existingSocketAddr = null; - for(InetSocketAddress addr : this.pingableDHTs.keySet()){ - if(addr.getHostName().equals(hostname) && addr.getPort()==port){ - existingSocketAddr = addr; - } - } - if(existingSocketAddr==null){ - this.pingableDHTs.put(new InetSocketAddress(hostname,port),false); - } - } - } + DHT.Message message = DHT.Message.fromBencode(receivingBytes); + if(message.getType()==DHT.Message.TYPE_REQUEST){ + System.out.println("Incoming request"); } } - - API.saveNodes(); - //TODO: REMOVE MARKED AS DELETED - - System.out.println("----"); - try { - Thread.sleep(1_000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } } } - private static void ping(InetAddress ip,int port) throws IOException{ - byte[] rpcID = new byte[20]; - new Random().nextBytes(rpcID); - - Map ping = new HashMap<>(); - ping.put("0",0); - ping.put("1",rpcID); - ping.put("2",new byte[48]); - ping.put("3","ping"); - ping.put("4",Collections.singletonList(Collections.singletonMap("protocolVersion",1))); - byte[] pingBytes = DHTNodeFinderThread.encodePacket(ping); - - DatagramPacket sendingDiagram = new DatagramPacket(pingBytes,pingBytes.length,ip,port); - DHTNodeFinderThread.SOCKET.send(sendingDiagram); - } - - private static void findNode(InetAddress ip,int port) throws IOException{ - byte[] rpcID = new byte[20]; - new Random().nextBytes(rpcID); - - Map findNode = new HashMap<>(); - findNode.put("0",0); - findNode.put("1",rpcID); - findNode.put("2",new byte[48]); - findNode.put("3","findNode"); - findNode.put("4",Arrays.asList(new byte[48],Collections.singletonMap("protocolVersion",1))); - byte[] findNodeBytes = DHTNodeFinderThread.encodePacket(findNode); - - DatagramPacket sendingDiagram = new DatagramPacket(findNodeBytes,findNodeBytes.length,ip,port); - DHTNodeFinderThread.SOCKET.send(sendingDiagram); - } - - private static byte[] encodePacket(Map map){ - return DHTNodeFinderThread.BENCODE.encode(map); - } - - private static Map decodePacket(byte[] bytes){ - // Fix invalid B-encoding - if(bytes[0]=='d'){ - bytes[0] = 'l'; - } - List list = DHTNodeFinderThread.BENCODE.decode(bytes,Type.LIST); - for(int i=0;i map){ + return BencodeConverter.BENCODE.encode(map); + } + + public static Map decode(byte[] bytes){ + // Fix invalid B-encoding + if(bytes[0]=='d'){ + bytes[0] = 'l'; + } + List list = BencodeConverter.BENCODE.decode(bytes,Type.LIST); + for(int i=0;i V walkAndConvertByteBufferToByteArrayOrString(Object value){ + if(value instanceof ByteBuffer){ + ByteBuffer bb = (ByteBuffer) value; + byte[] ba = bb.array(); + boolean hasControlOrNonASCII = false; + for(byte b : ba){ + int bv = b & 0xFF; + if(bv<0x20 || bv>=0x7F){ + hasControlOrNonASCII = true; + break; + } + } + if(hasControlOrNonASCII){ + return (V) ba; + } + return (V) new String(ba); + } + if(value instanceof List){ + List l = (List) value; + l.replaceAll(BencodeConverter::walkAndConvertByteBufferToByteArrayOrString); + } + if(value instanceof Map){ + Map m = (Map) value; + m.replaceAll((k,v) -> BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(v)); + } + return (V) value; + } + +} diff --git a/src/main/java/com/lbry/globe/util/DHT.java b/src/main/java/com/lbry/globe/util/DHT.java new file mode 100644 index 0000000..21fee8a --- /dev/null +++ b/src/main/java/com/lbry/globe/util/DHT.java @@ -0,0 +1,177 @@ +package com.lbry.globe.util; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class DHT{ + + private static final ScheduledExecutorService ses = Executors.newScheduledThreadPool(12); + private static final TimeoutFutureManager futureManager = new TimeoutFutureManager<>(ses); + + public static TimeoutFutureManager getFutureManager(){ + return DHT.futureManager; + } + + public static CompletableFuture ping(DatagramSocket socket,InetSocketAddress destination) throws IOException { + byte[] rpcID = new byte[20]; + new Random().nextBytes(rpcID); + + DHT.Message pingMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,new byte[48],"ping",Collections.singletonList(Collections.singletonMap("protocolVersion",1))); + + return DHT.sendWithFuture(socket,destination,pingMessage); + } + + public static CompletableFuture findNode(DatagramSocket socket,InetSocketAddress destination) throws IOException{ + byte[] rpcID = new byte[20]; + new Random().nextBytes(rpcID); + + DHT.Message findNodeMessage = new DHT.Message<>(DHT.Message.TYPE_REQUEST,rpcID,new byte[48],"findNode",Arrays.asList(new byte[48],Collections.singletonMap("protocolVersion",1))); + + return DHT.sendWithFuture(socket,destination,findNodeMessage); + } + + protected static CompletableFuture sendWithFuture(DatagramSocket socket,InetSocketAddress destination, DHT.Message message) throws IOException{ + UDP.send(socket,new UDP.Packet(destination,message.toBencode())); + RPCID key = new RPCID(message); + return DHT.futureManager.createFuture(key,5,TimeUnit.SECONDS); + } + + public static class Message

{ + + public static final int TYPE_REQUEST = 0; + public static final int TYPE_RESPONSE = 1; + + private int type; + + private byte[] rpcID; + + private byte[] nodeID; + + private P payload; + + private List arguments; + + private Message(){} + + public Message(int type,byte[] rpcID,byte[] nodeID){ + this(type,rpcID,nodeID,null); + } + + public Message(int type,byte[] rpcID,byte[] nodeID,P payload){ + this(type,rpcID,nodeID,payload,null); + } + + public Message(int type,byte[] rpcID,byte[] nodeID,P payload,List arguments){ + this.type = type; + this.rpcID = rpcID; + this.nodeID = nodeID; + this.payload = payload; + this.arguments = arguments; + } + + public int getType(){ + return this.type; + } + + public byte[] getRPCID(){ + return this.rpcID; + } + + public byte[] getNodeID(){ + return this.nodeID; + } + + public P getPayload(){ + return this.payload; + } + + public List getArguments(){ + return this.arguments; + } + + public byte[] toBencode(){ + Map dictionary = new HashMap<>(); + dictionary.put("0",this.type); + dictionary.put("1",this.rpcID); + dictionary.put("2",this.nodeID); + if(this.payload!=null){ + dictionary.put("3",this.payload); + } + if(this.arguments!=null){ + dictionary.put("4",this.arguments); + } + return BencodeConverter.encode(dictionary); + } + + private DHT.Message

setFromBencode(byte[] data){ + Map dictionary = BencodeConverter.decode(data); + this.type = ((Long) dictionary.get("0")).intValue(); + this.rpcID = ((ByteBuffer) dictionary.get("1")).array(); + this.nodeID = ((ByteBuffer) dictionary.get("2")).array(); + this.payload = null; + if(dictionary.containsKey("3")){ + Object payload = dictionary.get("3"); + this.payload = BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(payload); + } + this.arguments = null; + if(dictionary.containsKey("4")){ + this.arguments = (List) dictionary.get("4"); + } + return this; + } + + @Override + public String toString() { + return "Message{" + + "type=" + type + + ", rpcID=" + Hex.encode(rpcID) + + ", nodeID=" + Hex.encode(nodeID) + + ", payload=" + payload + + ", arguments=" + arguments + + '}'; + } + + public static DHT.Message fromBencode(byte[] data){ + return new Message<>().setFromBencode(data); + } + + } + + public static class RPCID{ + + private final DHT.Message message; + + public RPCID(DHT.Message message){ + this.message = message; + } + + @Override + public boolean equals(Object obj){ + if(obj instanceof RPCID){ + RPCID other = (RPCID) obj; + return Arrays.equals(this.message.rpcID,other.message.rpcID);// && Arrays.equals(this.message.nodeID,other.nodeID); + } + return super.equals(obj); + } + + @Override + public int hashCode(){ + return -1; + } + + @Override + public String toString() { + return "RPCID{" + + "rpcID=" + Hex.encode(this.message.rpcID) + + '}'; + } + } + +} diff --git a/src/main/java/com/lbry/globe/util/Hex.java b/src/main/java/com/lbry/globe/util/Hex.java new file mode 100644 index 0000000..6de29fe --- /dev/null +++ b/src/main/java/com/lbry/globe/util/Hex.java @@ -0,0 +1,27 @@ +package com.lbry.globe.util; + +public final class Hex{ + + private static final char[] CHARS = "0123456789ABCDEF".toCharArray(); + + public static byte[] decode(String s){ + int len = s.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + + Character.digit(s.charAt(i+1), 16)); + } + return data; + } + + public static String encode(byte[] data){ + char[] hexChars = new char[data.length * 2]; + for (int j = 0; j < data.length; j++) { + int v = data[j] & 0xFF; + hexChars[j * 2] = CHARS[v >>> 4]; + hexChars[j * 2 + 1] = CHARS[v & 0x0F]; + } + return new String(hexChars); + } + +} \ No newline at end of file diff --git a/src/main/java/com/lbry/globe/util/TimeoutFutureManager.java b/src/main/java/com/lbry/globe/util/TimeoutFutureManager.java new file mode 100644 index 0000000..340d222 --- /dev/null +++ b/src/main/java/com/lbry/globe/util/TimeoutFutureManager.java @@ -0,0 +1,36 @@ +package com.lbry.globe.util; + +import java.util.concurrent.*; + +public class TimeoutFutureManager{ + + private final ScheduledExecutorService executorService; + + private final ConcurrentHashMap> futures = new ConcurrentHashMap<>(); + + public TimeoutFutureManager(ScheduledExecutorService executorService){ + this.executorService = executorService; + } + + public CompletableFuture createFuture(K key,long delay,TimeUnit unit){ + CompletableFuture future = new CompletableFuture<>(); + this.futures.put(key,future); + + executorService.schedule(() -> { + if(!future.isDone()){ + this.futures.remove(key,future); + future.completeExceptionally(new TimeoutException()); + } + },delay,unit); + + return future; + } + + public void finishFuture(K key,V value){ + if(this.futures.containsKey(key)){ + this.futures.get(key).complete(value); + this.futures.remove(key); + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/lbry/globe/util/UDP.java b/src/main/java/com/lbry/globe/util/UDP.java new file mode 100644 index 0000000..59fecf8 --- /dev/null +++ b/src/main/java/com/lbry/globe/util/UDP.java @@ -0,0 +1,50 @@ +package com.lbry.globe.util; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; + +public class UDP{ + + private static final int BUFFER_SIZE = 4096; + + public static void send(DatagramSocket socket,Packet packet) throws IOException{ + socket.send(packet.getDatagramPacket()); + } + + public static Packet receive(DatagramSocket socket) throws IOException{ + Packet packet = new Packet(new DatagramPacket(new byte[UDP.BUFFER_SIZE],UDP.BUFFER_SIZE)); + socket.receive(packet.getDatagramPacket()); + return packet; + } + + public static class Packet{ + + private final DatagramPacket packet; + + protected Packet(DatagramPacket packet){ + this.packet = packet; + } + + public Packet(InetSocketAddress address,byte[] data){ + this.packet = new DatagramPacket(data,data.length,address); + } + + public InetSocketAddress getAddress(){ + return (InetSocketAddress) this.packet.getSocketAddress(); + } + + public byte[] getData(){ + byte[] data = new byte[this.packet.getLength()]; + System.arraycopy(this.packet.getData(),0,data,0,data.length); + return data; + } + + public DatagramPacket getDatagramPacket(){ + return this.packet; + } + + } + +} \ No newline at end of file