From 364866d9051a324334c74cd0e2b79dedfc88d7a2 Mon Sep 17 00:00:00 2001 From: Ben van Hartingsveldt Date: Sun, 13 Jul 2025 15:56:02 +0200 Subject: [PATCH] Improve pinging hubs --- src/main/java/com/lbry/globe/Main.java | 13 +-- .../globe/thread/DHTNodeFinderThread.java | 7 +- .../globe/thread/HubNodeFinderThread.java | 87 ++++++++++++------- src/main/java/com/lbry/globe/util/DHT.java | 4 +- 4 files changed, 70 insertions(+), 41 deletions(-) diff --git a/src/main/java/com/lbry/globe/Main.java b/src/main/java/com/lbry/globe/Main.java index 48af4f1..184f633 100644 --- a/src/main/java/com/lbry/globe/Main.java +++ b/src/main/java/com/lbry/globe/Main.java @@ -17,6 +17,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Arrays; import java.util.logging.Logger; @@ -35,7 +36,7 @@ public class Main implements Runnable{ @Override public void run(){ - EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + EventLoopGroup group = new MultiThreadIoEventLoopGroup(new DefaultThreadFactory("Netty Event Loop"),NioIoHandler.newFactory()); this.runTCPServerHTTP(group); } @@ -59,17 +60,17 @@ public class Main implements Runnable{ public static void main(String... args){ Main.LOGGER.info("Loading nodes cache"); - Runtime.getRuntime().addShutdownHook(new Thread(API::saveNodes)); + Runtime.getRuntime().addShutdownHook(new Thread(API::saveNodes,"Save Nodes")); API.loadNodes(); Main.LOGGER.info("Loading GeoIP cache"); - Runtime.getRuntime().addShutdownHook(new Thread(GeoIP::saveCache)); + Runtime.getRuntime().addShutdownHook(new Thread(GeoIP::saveCache,"Save Cache")); GeoIP.loadCache(); Main.LOGGER.info("Starting finder thread for blockchain nodes"); - new Thread(new BlockchainNodeFinderThread()).start(); + new Thread(new BlockchainNodeFinderThread(),"Block Node Finder").start(); Main.LOGGER.info("Starting finder thread for DHT nodes"); - new Thread(new DHTNodeFinderThread()).start(); + new DHTNodeFinderThread().run(); Main.LOGGER.info("Starting finder thread for hub nodes"); - new Thread(new HubNodeFinderThread()).start(); + new HubNodeFinderThread().run(); Main.LOGGER.info("Starting server"); new Main(args).run(); } diff --git a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java index f02c664..a338462 100644 --- a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java +++ b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java @@ -14,6 +14,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import io.netty.util.concurrent.DefaultThreadFactory; import org.json.JSONObject; public class DHTNodeFinderThread implements Runnable{ @@ -45,8 +46,8 @@ public class DHTNodeFinderThread implements Runnable{ } private void startSender(){ - Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> { - System.out.println("[BULK PING]"); + Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("DHT Sender")).scheduleWithFixedDelay(() -> { + System.out.println("[DHT] BULK PING"); API.saveNodes(); for(InetSocketAddress socketAddress : DHT.getPeers().keySet()){ String hostname = socketAddress.getHostName(); @@ -139,7 +140,7 @@ public class DHTNodeFinderThread implements Runnable{ e.printStackTrace(); } } - }).start(); + },"DHT Receiver").start(); } private void handleIncomingMessages(){ diff --git a/src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java b/src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java index 5ccd156..8e610b2 100644 --- a/src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java +++ b/src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java @@ -7,12 +7,20 @@ import com.lbry.globe.util.GeoIP; import java.io.InputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.Socket; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.ThreadPerTaskExecutor; import org.json.JSONArray; import org.json.JSONObject; +import javax.net.SocketFactory; + public class HubNodeFinderThread implements Runnable{ public static final String[] HUBS = { @@ -34,36 +42,59 @@ public class HubNodeFinderThread implements Runnable{ private static final Map LAST_SEEN = new TreeMap<>(Comparator.comparing(InetAddress::getHostAddress)); + private static final Map SOCKETS = new TreeMap<>(Comparator.comparing(InetAddress::getHostAddress)); + @Override - public void run() { - while(true){ + public void run(){ + Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("Hub Sender")).scheduleWithFixedDelay(() -> { + System.out.println("[HUB] BULK PING"); for(String hostname : HubNodeFinderThread.HUBS){ try{ for(InetAddress ip : InetAddress.getAllByName(hostname)){ - new Thread(() -> { - try{ - Socket s = new Socket(ip,50001); - JSONObject obj = new JSONObject(); - obj.put("id",new Random().nextInt()); - obj.put("method","server.banner"); - obj.put("params",new JSONArray()); - s.getOutputStream().write((obj+"\n").getBytes()); - s.getOutputStream().flush(); - InputStream in = s.getInputStream(); - StringBuilder sb = new StringBuilder(); - int b; - while((b = in.read())!='\n'){ - sb.append(new String(new byte[]{(byte) (b & 0xFF)})); + if(!HubNodeFinderThread.SOCKETS.containsKey(ip)){ + HubNodeFinderThread.SOCKETS.put(ip,new Socket()); + } + try{ + if(!HubNodeFinderThread.SOCKETS.get(ip).isConnected() || HubNodeFinderThread.SOCKETS.get(ip).isClosed()){ + if(HubNodeFinderThread.SOCKETS.get(ip).isClosed()){ + HubNodeFinderThread.SOCKETS.put(ip,new Socket()); } - in.close(); - JSONObject respObj = new JSONObject(sb.toString()); - boolean successful = respObj.has("result") && !respObj.isNull("result"); - if(successful){ - LAST_SEEN.put(ip,System.currentTimeMillis()); - } - }catch(Exception e){ + HubNodeFinderThread.SOCKETS.get(ip).connect(new InetSocketAddress(ip,50001),1000); } - }).start(); + }catch(Exception ignored){} + + Socket s = HubNodeFinderThread.SOCKETS.get(ip); + if(s==null || !s.isConnected() || s.isClosed()){ + continue; + } + System.out.println(" - [Hub] To: "+s.getRemoteSocketAddress()); + + JSONObject obj = new JSONObject(); + obj.put("id",new Random().nextInt()); + obj.put("method","server.banner"); + obj.put("params",new JSONArray()); + s.getOutputStream().write((obj+"\n").getBytes()); + s.getOutputStream().flush(); + } + for(InetAddress ip : InetAddress.getAllByName(hostname)){ + Socket s = HubNodeFinderThread.SOCKETS.get(ip); + if(s==null || !s.isConnected() || s.isClosed()){ + continue; + } + System.out.println(" - [Hub] From: "+s.getRemoteSocketAddress()); + + InputStream in = s.getInputStream(); + StringBuilder sb = new StringBuilder(); + int b; + while((b = in.read())!='\n'){ + sb.append(new String(new byte[]{(byte) (b & 0xFF)})); + } + in.close(); + JSONObject respObj = new JSONObject(sb.toString()); + boolean successful = respObj.has("result") && !respObj.isNull("result"); + if(successful){ + LAST_SEEN.put(ip,System.currentTimeMillis()); + } } }catch(Exception e){ e.printStackTrace(); @@ -123,13 +154,7 @@ public class HubNodeFinderThread implements Runnable{ } API.saveNodes(); - - try { - Thread.sleep(10_000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + },0,10,TimeUnit.SECONDS); } } diff --git a/src/main/java/com/lbry/globe/util/DHT.java b/src/main/java/com/lbry/globe/util/DHT.java index ff56529..f89faf4 100644 --- a/src/main/java/com/lbry/globe/util/DHT.java +++ b/src/main/java/com/lbry/globe/util/DHT.java @@ -1,5 +1,7 @@ package com.lbry.globe.util; +import io.netty.util.concurrent.DefaultThreadFactory; + import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; @@ -12,7 +14,7 @@ public class DHT{ public static byte[] NODE_ID = new byte[48]; - private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("Timeout Future")); private static final TimeoutFutureManager futureManager = new TimeoutFutureManager<>(executor); private static final Map peers = new ConcurrentHashMap<>(); private static final DatagramSocket socket;