diff --git a/src/main/java/com/lbry/globe/Main.java b/src/main/java/com/lbry/globe/Main.java index 184f633..0531178 100644 --- a/src/main/java/com/lbry/globe/Main.java +++ b/src/main/java/com/lbry/globe/Main.java @@ -7,6 +7,7 @@ import com.lbry.globe.thread.BlockchainNodeFinderThread; import com.lbry.globe.thread.DHTNodeFinderThread; import com.lbry.globe.thread.HubNodeFinderThread; import com.lbry.globe.util.GeoIP; +import com.lbry.globe.util.NamedThreadFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; @@ -17,7 +18,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; -import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Arrays; import java.util.logging.Logger; @@ -36,7 +36,7 @@ public class Main implements Runnable{ @Override public void run(){ - EventLoopGroup group = new MultiThreadIoEventLoopGroup(new DefaultThreadFactory("Netty Event Loop"),NioIoHandler.newFactory()); + EventLoopGroup group = new MultiThreadIoEventLoopGroup(new NamedThreadFactory("Netty Event Loop"),NioIoHandler.newFactory()); this.runTCPServerHTTP(group); } @@ -66,7 +66,7 @@ public class Main implements Runnable{ Runtime.getRuntime().addShutdownHook(new Thread(GeoIP::saveCache,"Save Cache")); GeoIP.loadCache(); Main.LOGGER.info("Starting finder thread for blockchain nodes"); - new Thread(new BlockchainNodeFinderThread(),"Block Node Finder").start(); + new BlockchainNodeFinderThread().run(); Main.LOGGER.info("Starting finder thread for DHT nodes"); new DHTNodeFinderThread().run(); Main.LOGGER.info("Starting finder thread for hub nodes"); diff --git a/src/main/java/com/lbry/globe/thread/BlockchainNodeFinderThread.java b/src/main/java/com/lbry/globe/thread/BlockchainNodeFinderThread.java index bba189b..51467f7 100644 --- a/src/main/java/com/lbry/globe/thread/BlockchainNodeFinderThread.java +++ b/src/main/java/com/lbry/globe/thread/BlockchainNodeFinderThread.java @@ -13,7 +13,10 @@ import java.net.HttpURLConnection; import java.net.InetAddress; import java.net.URI; import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import com.lbry.globe.util.NamedThreadFactory; import org.json.JSONArray; import org.json.JSONObject; @@ -23,7 +26,8 @@ public class BlockchainNodeFinderThread implements Runnable{ public void run(){ String rpcURL = Environment.getVariable("BLOCKCHAIN_RPC_URL"); if(rpcURL!=null){ - while(true){ + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Blockchain Detector")).scheduleWithFixedDelay(() -> { + System.out.println("[BLOCKCHAIN] BULK PING"); try{ HttpURLConnection conn = (HttpURLConnection) new URI(rpcURL).toURL().openConnection(); conn.setDoOutput(true); @@ -45,12 +49,8 @@ public class BlockchainNodeFinderThread implements Runnable{ }catch(Exception e){ e.printStackTrace(); } - try { - Thread.sleep(10_000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + },0,10,TimeUnit.SECONDS); + } } diff --git a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java index feee06f..23085c8 100644 --- a/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java +++ b/src/main/java/com/lbry/globe/thread/DHTNodeFinderThread.java @@ -5,6 +5,7 @@ import com.lbry.globe.object.Node; import com.lbry.globe.object.Service; import com.lbry.globe.util.DHT; import com.lbry.globe.util.GeoIP; +import com.lbry.globe.util.NamedThreadFactory; import com.lbry.globe.util.UDP; import java.io.IOException; @@ -14,7 +15,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import io.netty.util.concurrent.DefaultThreadFactory; import org.json.JSONObject; public class DHTNodeFinderThread implements Runnable{ @@ -46,7 +46,7 @@ public class DHTNodeFinderThread implements Runnable{ } private void startSender(){ - Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("DHT Sender")).scheduleWithFixedDelay(() -> { + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DHT Sender")).scheduleWithFixedDelay(() -> { System.out.println("[DHT] BULK PING"); API.saveNodes(); for(InetSocketAddress socketAddress : DHT.getPeers().keySet()){ @@ -126,7 +126,7 @@ public class DHTNodeFinderThread implements Runnable{ private void startReceiver(){ new Thread(() -> { - while(true) { + while(DHT.getSocket().isBound()) { try { UDP.Packet receiverPacket = UDP.receive(DHT.getSocket()); DHTNodeFinderThread.this.incoming.add(receiverPacket); diff --git a/src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java b/src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java index 8e610b2..1c8c863 100644 --- a/src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java +++ b/src/main/java/com/lbry/globe/thread/HubNodeFinderThread.java @@ -4,23 +4,19 @@ import com.lbry.globe.api.API; import com.lbry.globe.object.Node; import com.lbry.globe.object.Service; import com.lbry.globe.util.GeoIP; +import com.lbry.globe.util.NamedThreadFactory; import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.util.*; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.ThreadPerTaskExecutor; import org.json.JSONArray; import org.json.JSONObject; -import javax.net.SocketFactory; - public class HubNodeFinderThread implements Runnable{ public static final String[] HUBS = { @@ -46,11 +42,17 @@ public class HubNodeFinderThread implements Runnable{ @Override public void run(){ - Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("Hub Sender")).scheduleWithFixedDelay(() -> { + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Hub Sender")).scheduleWithFixedDelay(() -> { System.out.println("[HUB] BULK PING"); for(String hostname : HubNodeFinderThread.HUBS){ + InetAddress[] ips = null; try{ - for(InetAddress ip : InetAddress.getAllByName(hostname)){ + ips = InetAddress.getAllByName(hostname); + }catch(Exception e){ + e.printStackTrace(); + } + if(ips!=null){ + for(InetAddress ip : ips){ if(!HubNodeFinderThread.SOCKETS.containsKey(ip)){ HubNodeFinderThread.SOCKETS.put(ip,new Socket()); } @@ -69,35 +71,41 @@ public class HubNodeFinderThread implements Runnable{ } System.out.println(" - [Hub] To: "+s.getRemoteSocketAddress()); - JSONObject obj = new JSONObject(); - obj.put("id",new Random().nextInt()); - obj.put("method","server.banner"); - obj.put("params",new JSONArray()); - s.getOutputStream().write((obj+"\n").getBytes()); - s.getOutputStream().flush(); + try{ + JSONObject obj = new JSONObject(); + obj.put("id",new Random().nextInt()); + obj.put("method","server.banner"); + obj.put("params",new JSONArray()); + s.getOutputStream().write((obj+"\n").getBytes()); + s.getOutputStream().flush(); + }catch(Exception e){ + e.printStackTrace(); + } } - for(InetAddress ip : InetAddress.getAllByName(hostname)){ + for(InetAddress ip : ips){ Socket s = HubNodeFinderThread.SOCKETS.get(ip); if(s==null || !s.isConnected() || s.isClosed()){ continue; } System.out.println(" - [Hub] From: "+s.getRemoteSocketAddress()); - InputStream in = s.getInputStream(); - StringBuilder sb = new StringBuilder(); - int b; - while((b = in.read())!='\n'){ - sb.append(new String(new byte[]{(byte) (b & 0xFF)})); - } - in.close(); - JSONObject respObj = new JSONObject(sb.toString()); - boolean successful = respObj.has("result") && !respObj.isNull("result"); - if(successful){ - LAST_SEEN.put(ip,System.currentTimeMillis()); + try{ + InputStream in = s.getInputStream(); + StringBuilder sb = new StringBuilder(); + int b; + while((b = in.read())!='\n'){ + sb.append(new String(new byte[]{(byte) (b & 0xFF)})); + } + in.close(); + JSONObject respObj = new JSONObject(sb.toString()); + boolean successful = respObj.has("result") && !respObj.isNull("result"); + if(successful){ + LAST_SEEN.put(ip,System.currentTimeMillis()); + } + }catch(Exception e){ + e.printStackTrace(); } } - }catch(Exception e){ - e.printStackTrace(); } } diff --git a/src/main/java/com/lbry/globe/util/DHT.java b/src/main/java/com/lbry/globe/util/DHT.java index f89faf4..77ae2fe 100644 --- a/src/main/java/com/lbry/globe/util/DHT.java +++ b/src/main/java/com/lbry/globe/util/DHT.java @@ -1,7 +1,5 @@ package com.lbry.globe.util; -import io.netty.util.concurrent.DefaultThreadFactory; - import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; @@ -14,7 +12,7 @@ public class DHT{ public static byte[] NODE_ID = new byte[48]; - private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("Timeout Future")); + private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Timeout Future")); private static final TimeoutFutureManager futureManager = new TimeoutFutureManager<>(executor); private static final Map peers = new ConcurrentHashMap<>(); private static final DatagramSocket socket; diff --git a/src/main/java/com/lbry/globe/util/NamedThreadFactory.java b/src/main/java/com/lbry/globe/util/NamedThreadFactory.java new file mode 100644 index 0000000..c95f97e --- /dev/null +++ b/src/main/java/com/lbry/globe/util/NamedThreadFactory.java @@ -0,0 +1,18 @@ +package com.lbry.globe.util; + +import java.util.concurrent.ThreadFactory; + +public class NamedThreadFactory implements ThreadFactory{ + + private final String name; + + public NamedThreadFactory(String name){ + this.name = name; + } + + @Override + public Thread newThread(Runnable r){ + return new Thread(r,this.name); + } + +}