mirror of
https://github.com/LBRYFoundation/LBRY-Vault.git
synced 2025-09-01 09:45:18 +00:00
separate interface classes
This commit is contained in:
parent
ed8f9666df
commit
44072a4f48
2 changed files with 68 additions and 78 deletions
139
lib/interface.py
139
lib/interface.py
|
@ -47,56 +47,49 @@ def cert_verify_hostname(s):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Interface(threading.Thread):
|
def Interface(server, config = None):
|
||||||
|
host, port, protocol = server.split(':')
|
||||||
|
port = int(port)
|
||||||
|
if protocol in 'st':
|
||||||
|
return TcpInterface(server, config)
|
||||||
|
elif protocol in 'hg':
|
||||||
|
return HttpInterface(server, config)
|
||||||
|
else:
|
||||||
|
raise Exception('Unknown protocol: %s'%protocol)
|
||||||
|
|
||||||
|
class TcpInterface(threading.Thread):
|
||||||
|
|
||||||
def __init__(self, server, config = None):
|
def __init__(self, server, config = None):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
self.config = config if config is not None else SimpleConfig()
|
self.config = config if config is not None else SimpleConfig()
|
||||||
self.connect_event = threading.Event()
|
|
||||||
self.lock = threading.Lock()
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
self.rtime = 0
|
|
||||||
self.bytes_received = 0
|
|
||||||
self.is_connected = False
|
self.is_connected = False
|
||||||
self.poll_interval = 1
|
|
||||||
|
|
||||||
self.debug = False # dump network messages. can be changed at runtime using the console
|
self.debug = False # dump network messages. can be changed at runtime using the console
|
||||||
self.message_id = 0
|
self.message_id = 0
|
||||||
self.unanswered_requests = {}
|
self.unanswered_requests = {}
|
||||||
|
|
||||||
# parse server
|
# parse server
|
||||||
self.server = server
|
self.server = server
|
||||||
try:
|
self.host, self.port, self.protocol = self.server.split(':')
|
||||||
host, port, protocol = self.server.split(':')
|
self.port = int(self.port)
|
||||||
port = int(port)
|
self.use_ssl = (self.protocol == 's')
|
||||||
except Exception:
|
|
||||||
self.server = None
|
|
||||||
return
|
|
||||||
|
|
||||||
if protocol not in 'ghst':
|
|
||||||
raise Exception('Unknown protocol: %s'%protocol)
|
|
||||||
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
self.protocol = protocol
|
|
||||||
self.use_ssl = ( protocol in 'sg' )
|
|
||||||
self.proxy = self.parse_proxy_options(self.config.get('proxy'))
|
self.proxy = self.parse_proxy_options(self.config.get('proxy'))
|
||||||
if self.proxy:
|
if self.proxy:
|
||||||
self.proxy_mode = proxy_modes.index(self.proxy["mode"]) + 1
|
self.proxy_mode = proxy_modes.index(self.proxy["mode"]) + 1
|
||||||
|
|
||||||
|
|
||||||
def process_response(self, c):
|
def process_response(self, response):
|
||||||
if self.debug:
|
if self.debug:
|
||||||
print_error( "<--",c )
|
print_error("<--", response)
|
||||||
|
|
||||||
msg_id = c.get('id')
|
msg_id = response.get('id')
|
||||||
error = c.get('error')
|
error = response.get('error')
|
||||||
result = c.get('result')
|
result = response.get('result')
|
||||||
|
|
||||||
if error:
|
if error:
|
||||||
print_error("received error:", c)
|
print_error("received error:", response)
|
||||||
#queue.put((self,{'method':method, 'params':params, 'error':error, 'id':_id}))
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if msg_id is not None:
|
if msg_id is not None:
|
||||||
|
@ -104,31 +97,28 @@ class Interface(threading.Thread):
|
||||||
method, params, _id, queue = self.unanswered_requests.pop(msg_id)
|
method, params, _id, queue = self.unanswered_requests.pop(msg_id)
|
||||||
if queue is None:
|
if queue is None:
|
||||||
queue = self.response_queue
|
queue = self.response_queue
|
||||||
|
|
||||||
if method == 'server.version':
|
|
||||||
self.server_version = result
|
|
||||||
return
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
queue = self.response_queue
|
|
||||||
# notification
|
# notification
|
||||||
method = c.get('method')
|
method = response.get('method')
|
||||||
params = c.get('params')
|
params = response.get('params')
|
||||||
_id = None
|
_id = None
|
||||||
|
queue = self.response_queue
|
||||||
|
# restore parameters
|
||||||
if method == 'blockchain.numblocks.subscribe':
|
if method == 'blockchain.numblocks.subscribe':
|
||||||
result = params[0]
|
result = params[0]
|
||||||
params = []
|
params = []
|
||||||
|
|
||||||
elif method == 'blockchain.headers.subscribe':
|
elif method == 'blockchain.headers.subscribe':
|
||||||
result = params[0]
|
result = params[0]
|
||||||
params = []
|
params = []
|
||||||
|
|
||||||
elif method == 'blockchain.address.subscribe':
|
elif method == 'blockchain.address.subscribe':
|
||||||
addr = params[0]
|
addr = params[0]
|
||||||
result = params[1]
|
result = params[1]
|
||||||
params = [addr]
|
params = [addr]
|
||||||
|
|
||||||
|
if method == 'server.version':
|
||||||
|
self.server_version = result
|
||||||
|
return
|
||||||
|
|
||||||
queue.put((self, {'method':method, 'params':params, 'result':result, 'id':_id}))
|
queue.put((self, {'method':method, 'params':params, 'result':result, 'id':_id}))
|
||||||
|
|
||||||
|
|
||||||
|
@ -230,7 +220,7 @@ class Interface(threading.Thread):
|
||||||
x.parse(cert)
|
x.parse(cert)
|
||||||
x.slow_parse()
|
x.slow_parse()
|
||||||
except:
|
except:
|
||||||
traceback.print_exc(file=sys.stdout)
|
traceback.print_exc(file=sys.stderr)
|
||||||
print_error("wrong certificate", self.host)
|
print_error("wrong certificate", self.host)
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
|
@ -243,7 +233,7 @@ class Interface(threading.Thread):
|
||||||
return
|
return
|
||||||
except Exception:
|
except Exception:
|
||||||
print_error("wrap_socket failed", self.host)
|
print_error("wrap_socket failed", self.host)
|
||||||
traceback.print_exc(file=sys.stdout)
|
traceback.print_exc(file=sys.stderr)
|
||||||
return
|
return
|
||||||
|
|
||||||
if is_new:
|
if is_new:
|
||||||
|
@ -257,24 +247,6 @@ class Interface(threading.Thread):
|
||||||
self.pipe = util.SocketPipe(s)
|
self.pipe = util.SocketPipe(s)
|
||||||
|
|
||||||
|
|
||||||
def run_tcp(self):
|
|
||||||
t = time.time()
|
|
||||||
while self.is_connected:
|
|
||||||
# ping the server with server.version
|
|
||||||
if time.time() - t > 60:
|
|
||||||
self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
|
|
||||||
t = time.time()
|
|
||||||
try:
|
|
||||||
response = self.pipe.get()
|
|
||||||
except util.timeout:
|
|
||||||
continue
|
|
||||||
if response is None:
|
|
||||||
break
|
|
||||||
self.process_response(response)
|
|
||||||
|
|
||||||
self.is_connected = False
|
|
||||||
print_error("exit interface", self.server)
|
|
||||||
|
|
||||||
def send_request(self, request, queue=None):
|
def send_request(self, request, queue=None):
|
||||||
_id = request.get('id')
|
_id = request.get('id')
|
||||||
method = request.get('method')
|
method = request.get('method')
|
||||||
|
@ -286,13 +258,6 @@ class Interface(threading.Thread):
|
||||||
if self.debug:
|
if self.debug:
|
||||||
print_error("-->", request)
|
print_error("-->", request)
|
||||||
|
|
||||||
def start_interface(self):
|
|
||||||
if self.protocol in 'st':
|
|
||||||
self.start_tcp()
|
|
||||||
elif self.protocol in 'gh':
|
|
||||||
self.start_http()
|
|
||||||
self.connect_event.set()
|
|
||||||
|
|
||||||
def parse_proxy_options(self, s):
|
def parse_proxy_options(self, s):
|
||||||
if type(s) == type({}): return s # fixme: type should be fixed
|
if type(s) == type({}): return s # fixme: type should be fixed
|
||||||
if type(s) != type(""): return None
|
if type(s) != type(""): return None
|
||||||
|
@ -321,20 +286,32 @@ class Interface(threading.Thread):
|
||||||
def is_up_to_date(self):
|
def is_up_to_date(self):
|
||||||
return self.unanswered_requests == {}
|
return self.unanswered_requests == {}
|
||||||
|
|
||||||
def start(self, response_queue, wait = False):
|
def start(self, response_queue):
|
||||||
if not self.server:
|
|
||||||
return
|
|
||||||
self.response_queue = response_queue
|
self.response_queue = response_queue
|
||||||
threading.Thread.start(self)
|
threading.Thread.start(self)
|
||||||
if wait:
|
|
||||||
self.connect_event.wait()
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.start_interface()
|
self.start_tcp()
|
||||||
if self.is_connected:
|
|
||||||
self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
|
|
||||||
self.change_status()
|
self.change_status()
|
||||||
self.run_tcp() if self.protocol in 'st' else self.run_http()
|
if not self.is_connected:
|
||||||
|
return
|
||||||
|
|
||||||
|
t = 0
|
||||||
|
while self.is_connected:
|
||||||
|
# ping the server with server.version
|
||||||
|
if time.time() - t > 60:
|
||||||
|
self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
|
||||||
|
t = time.time()
|
||||||
|
try:
|
||||||
|
response = self.pipe.get()
|
||||||
|
except util.timeout:
|
||||||
|
continue
|
||||||
|
if response is None:
|
||||||
|
self.is_connected = False
|
||||||
|
break
|
||||||
|
self.process_response(response)
|
||||||
|
|
||||||
|
print_error("exit interface", self.server)
|
||||||
self.change_status()
|
self.change_status()
|
||||||
|
|
||||||
def change_status(self):
|
def change_status(self):
|
||||||
|
@ -343,7 +320,15 @@ class Interface(threading.Thread):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class HTTP_Interface(Interface):
|
class HttpInterface(TcpInterface):
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.start_http()
|
||||||
|
if self.is_connected:
|
||||||
|
self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
|
||||||
|
self.change_status()
|
||||||
|
self.run_http()
|
||||||
|
self.change_status()
|
||||||
|
|
||||||
def send_request(self, request, queue=None):
|
def send_request(self, request, queue=None):
|
||||||
import urllib2, json, time, cookielib
|
import urllib2, json, time, cookielib
|
||||||
|
@ -413,6 +398,10 @@ class HTTP_Interface(Interface):
|
||||||
self.send([], None)
|
self.send([], None)
|
||||||
|
|
||||||
def start_http(self):
|
def start_http(self):
|
||||||
|
self.rtime = 0
|
||||||
|
self.bytes_received = 0
|
||||||
|
self.poll_interval = 1
|
||||||
|
|
||||||
self.session_id = None
|
self.session_id = None
|
||||||
self.is_connected = True
|
self.is_connected = True
|
||||||
self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port )
|
self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port )
|
||||||
|
|
|
@ -266,9 +266,10 @@ class SocketPipe:
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
traceback.print_exc(file=sys.stdout)
|
print_error("pipe: socket error", err)
|
||||||
data = ''
|
data = ''
|
||||||
except:
|
except:
|
||||||
|
traceback.print_exc(file=sys.stderr)
|
||||||
data = ''
|
data = ''
|
||||||
|
|
||||||
if not data:
|
if not data:
|
||||||
|
|
Loading…
Add table
Reference in a new issue