mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-08-23 17:47:31 +00:00
cleanup daemon threads
This commit is contained in:
parent
44072a4f48
commit
f7280e4637
3 changed files with 62 additions and 52 deletions
|
@ -59,6 +59,7 @@ def get_daemon(config, start_daemon=True):
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ClientThread(threading.Thread):
|
class ClientThread(threading.Thread):
|
||||||
|
|
||||||
def __init__(self, server, s):
|
def __init__(self, server, s):
|
||||||
|
@ -104,14 +105,15 @@ class ClientThread(threading.Thread):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class NetworkServer:
|
class NetworkServer(threading.Thread):
|
||||||
|
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.daemon = True
|
||||||
self.config = config
|
self.config = config
|
||||||
self.network = Network(config)
|
self.network = Network(config)
|
||||||
# network sends responses on that queue
|
# network sends responses on that queue
|
||||||
self.network_queue = Queue.Queue()
|
self.network_queue = Queue.Queue()
|
||||||
self.network.start(self.network_queue)
|
|
||||||
|
|
||||||
self.running = False
|
self.running = False
|
||||||
# daemon terminates after period of inactivity
|
# daemon terminates after period of inactivity
|
||||||
|
@ -127,10 +129,13 @@ class NetworkServer:
|
||||||
return self.running
|
return self.running
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.network.stop()
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.running = True
|
||||||
|
threading.Thread.start(self)
|
||||||
|
|
||||||
def add_client(self, client):
|
def add_client(self, client):
|
||||||
for key in ['status','banner','updated','servers','interfaces']:
|
for key in ['status','banner','updated','servers','interfaces']:
|
||||||
value = self.network.get_status_value(key)
|
value = self.network.get_status_value(key)
|
||||||
|
@ -138,17 +143,13 @@ class NetworkServer:
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.clients.append(client)
|
self.clients.append(client)
|
||||||
|
|
||||||
|
|
||||||
def remove_client(self, client):
|
def remove_client(self, client):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.clients.remove(client)
|
self.clients.remove(client)
|
||||||
print_error("client quit:", len(self.clients))
|
print_error("client quit:", len(self.clients))
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.network.start(self.network_queue)
|
||||||
def main_loop(self):
|
|
||||||
self.running = True
|
|
||||||
threading.Thread(target=self.listen_thread).start()
|
|
||||||
while self.is_running():
|
while self.is_running():
|
||||||
try:
|
try:
|
||||||
response = self.network_queue.get(timeout=0.1)
|
response = self.network_queue.get(timeout=0.1)
|
||||||
|
@ -157,32 +158,35 @@ class NetworkServer:
|
||||||
for client in self.clients:
|
for client in self.clients:
|
||||||
client.daemon_pipe.get_queue.put(response)
|
client.daemon_pipe.get_queue.put(response)
|
||||||
|
|
||||||
print_error("Daemon exiting")
|
self.network.stop()
|
||||||
|
print_error("server exiting")
|
||||||
|
|
||||||
def listen_thread(self):
|
|
||||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
def daemon_loop(server):
|
||||||
self.daemon_port = self.config.get('daemon_port', DAEMON_PORT)
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
self.socket.bind(('', self.daemon_port))
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
self.socket.listen(5)
|
daemon_port = server.config.get('daemon_port', DAEMON_PORT)
|
||||||
self.socket.settimeout(1)
|
s.bind(('', daemon_port))
|
||||||
|
s.listen(5)
|
||||||
|
s.settimeout(1)
|
||||||
|
t = time.time()
|
||||||
|
while server.running:
|
||||||
|
try:
|
||||||
|
connection, address = s.accept()
|
||||||
|
except socket.timeout:
|
||||||
|
if not server.clients:
|
||||||
|
if time.time() - t > server.timeout:
|
||||||
|
print_error("Daemon timeout")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
t = time.time()
|
||||||
|
continue
|
||||||
t = time.time()
|
t = time.time()
|
||||||
while self.running:
|
client = ClientThread(server, connection)
|
||||||
try:
|
client.start()
|
||||||
connection, address = self.socket.accept()
|
server.stop()
|
||||||
except socket.timeout:
|
print_error("Daemon exiting")
|
||||||
if not self.clients:
|
|
||||||
if time.time() - t > self.timeout:
|
|
||||||
print_error("Daemon timeout")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
t = time.time()
|
|
||||||
continue
|
|
||||||
t = time.time()
|
|
||||||
client = ClientThread(self, connection)
|
|
||||||
client.start()
|
|
||||||
self.stop()
|
|
||||||
print_error("listen thread exiting")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -190,8 +194,10 @@ if __name__ == '__main__':
|
||||||
config = simple_config.SimpleConfig()
|
config = simple_config.SimpleConfig()
|
||||||
util.set_verbosity(True)
|
util.set_verbosity(True)
|
||||||
server = NetworkServer(config)
|
server = NetworkServer(config)
|
||||||
|
server.start()
|
||||||
try:
|
try:
|
||||||
server.main_loop()
|
daemon_loop(server)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print "Ctrl C - Stopping server"
|
print "Ctrl C - Stopping server"
|
||||||
|
server.stop()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
|
@ -2,10 +2,10 @@
|
||||||
|
|
||||||
# A simple script that connects to a server and displays block headers
|
# A simple script that connects to a server and displays block headers
|
||||||
|
|
||||||
import time, electrum
|
import time
|
||||||
|
import electrum
|
||||||
|
|
||||||
# start network
|
# start network
|
||||||
|
|
||||||
c = electrum.SimpleConfig()
|
c = electrum.SimpleConfig()
|
||||||
s = electrum.daemon.get_daemon(c,True)
|
s = electrum.daemon.get_daemon(c,True)
|
||||||
network = electrum.NetworkProxy(s,c)
|
network = electrum.NetworkProxy(s,c)
|
||||||
|
@ -20,14 +20,10 @@ if not network.is_connected():
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# 2. send the subscription
|
# 2. send the subscription
|
||||||
callback = lambda result: electrum.print_json(result.get('result'))
|
callback = lambda response: electrum.print_json(response.get('result'))
|
||||||
network.send([('blockchain.headers.subscribe',[])], callback)
|
network.send([('blockchain.headers.subscribe',[])], callback)
|
||||||
|
|
||||||
# 3. wait for results
|
# 3. wait for results
|
||||||
while network.is_connected():
|
while network.is_connected():
|
||||||
try:
|
time.sleep(1)
|
||||||
time.sleep(1)
|
|
||||||
except:
|
|
||||||
break
|
|
||||||
|
|
||||||
network.stop()
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
import sys, time, electrum
|
import sys
|
||||||
|
import time
|
||||||
|
import electrum
|
||||||
|
|
||||||
try:
|
try:
|
||||||
addr = sys.argv[1]
|
addr = sys.argv[1]
|
||||||
|
@ -8,19 +10,25 @@ except Exception:
|
||||||
print "usage: watch_address <bitcoin_address>"
|
print "usage: watch_address <bitcoin_address>"
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
# start network
|
||||||
|
c = electrum.SimpleConfig()
|
||||||
|
s = electrum.daemon.get_daemon(c,True)
|
||||||
|
network = electrum.NetworkProxy(s,c)
|
||||||
|
network.start()
|
||||||
|
|
||||||
# 1. start the interface and wait for connection
|
# wait until connected
|
||||||
interface = electrum.Interface('electrum.no-ip.org:50002:s')
|
while network.is_connecting():
|
||||||
interface.start(wait = True)
|
time.sleep(0.1)
|
||||||
if not interface.is_connected:
|
|
||||||
print "not connected"
|
if not network.is_connected():
|
||||||
exit()
|
print_msg("daemon is not connected")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
# 2. send the subscription
|
# 2. send the subscription
|
||||||
callback = lambda _,result: electrum.print_json(result.get('result'))
|
callback = lambda response: electrum.print_json(response.get('result'))
|
||||||
interface.send([('blockchain.address.subscribe',[addr])], callback)
|
network.send([('blockchain.address.subscribe',[addr])], callback)
|
||||||
|
|
||||||
# 3. wait for results
|
# 3. wait for results
|
||||||
while True:
|
while network.is_connected():
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue