From 928498d695745bb6a527f33d06dbbdbb362248f0 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Fri, 23 Dec 2011 01:57:13 +0300 Subject: [PATCH] more command separation --- server/server.py | 197 +++++++++++++++++++++++++---------------------- 1 file changed, 106 insertions(+), 91 deletions(-) diff --git a/server/server.py b/server/server.py index 4793112fc..59c3d0ae3 100755 --- a/server/server.py +++ b/server/server.py @@ -326,50 +326,10 @@ def send_tx(tx): v = "error: transaction rejected by memorypool" return v - -def listen_thread(store): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind((config.get('server','host'), config.getint('server','port'))) - s.listen(1) - while not stopping: - conn, addr = s.accept() - thread.start_new_thread(client_thread, (addr, conn,)) - def random_string(N): import random, string return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N)) -def client_thread(ipaddr,conn): - #print "client thread", ipaddr - try: - ipaddr = ipaddr[0] - msg = '' - while 1: - d = conn.recv(1024) - msg += d - if not d: - break - if '#' in msg: - msg = msg.split('#', 1)[0] - break - try: - cmd, data = ast.literal_eval(msg) - except: - print "syntax error", repr(msg), ipaddr - conn.close() - return - - out = do_command(cmd, data, ipaddr) - if out: - #print ipaddr, cmd, len(out) - try: - conn.send(out) - except: - print "error, could not send" - - finally: - conn.close() def cmd_stop(data): @@ -401,13 +361,115 @@ def get_cache(pw,addr): return 'wrong password' +def cmd_poll(session_id): + session = sessions.get(session_id) + if session is None: + print time.asctime(), "session not found", session_id, ipaddr + out = repr( (-1, {})) + else: + t1 = time.time() + addresses = session['addresses'] + session['last_time'] = time.time() + ret = {} + k = 0 + for addr in addresses: + if store.tx_cache.get( addr ) is not None: k += 1 + + # get addtess status, i.e. the last block for that address. + tx_points = store.get_history(addr) + if not tx_points: + status = None + else: + lastpoint = tx_points[-1] + status = lastpoint['blk_hash'] + # this is a temporary hack; move it up once old clients have disappeared + if status == 'mempool' and session['version'] != "old": + status = status + ':%d'% len(tx_points) + + last_status = addresses.get( addr ) + if last_status != status: + addresses[addr] = status + ret[addr] = status + if ret: + sessions[session_id]['addresses'] = addresses + out = repr( (block_number, ret ) ) + t2 = time.time() - t1 + if t2 > 10: + print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2 + + return out + + +def new_session(addresses, version, ipaddr): + session_id = random_string(10) + + print time.strftime("[%d/%m/%Y-%H:%M:%S]"), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version + + sessions[session_id] = { 'addresses':{}, 'version':version, 'ip':ipaddr } + for a in addresses: + sessions[session_id]['addresses'][a] = '' + out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) ) + sessions[session_id]['last_time'] = time.time() + return out + +def update_session(session_id,addresses): + print time.strftime("[%d/%m/%Y-%H:%M:%S]"), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses) + sessions[session_id]['addresses'] = {} + for a in addresses: + sessions[session_id]['addresses'][a] = '' + out = 'ok' + sessions[session_id]['last_time'] = time.time() + + +def listen_thread(store): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind((config.get('server','host'), config.getint('server','port'))) + s.listen(1) + while not stopping: + conn, addr = s.accept() + thread.start_new_thread(client_thread, (addr, conn,)) + + + +def client_thread(ipaddr,conn): + #print "client thread", ipaddr + try: + ipaddr = ipaddr[0] + msg = '' + while 1: + d = conn.recv(1024) + msg += d + if not d: + break + if '#' in msg: + msg = msg.split('#', 1)[0] + break + try: + cmd, data = ast.literal_eval(msg) + except: + print "syntax error", repr(msg), ipaddr + conn.close() + return + + out = do_command(cmd, data, ipaddr) + if out: + #print ipaddr, cmd, len(out) + try: + conn.send(out) + except: + print "error, could not send" + + finally: + conn.close() + + def do_command(cmd, data, ipaddr): if cmd=='b': out = "%d"%block_number elif cmd in ['session','new_session']: - session_id = random_string(10) try: if cmd == 'session': addresses = ast.literal_eval(data) @@ -418,14 +480,7 @@ def do_command(cmd, data, ipaddr): except: print "error", data return None - - print time.strftime("[%d/%m/%Y-%H:%M:%S]"), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version - - sessions[session_id] = { 'addresses':{}, 'version':version, 'ip':ipaddr } - for a in addresses: - sessions[session_id]['addresses'][a] = '' - out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) ) - sessions[session_id]['last_time'] = time.time() + out = new_session(addresses, version, ipaddr) elif cmd=='update_session': try: @@ -433,14 +488,8 @@ def do_command(cmd, data, ipaddr): except: print "error" return None + out = update_session(session_id,addresses) - print time.strftime("[%d/%m/%Y-%H:%M:%S]"), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses) - - sessions[session_id]['addresses'] = {} - for a in addresses: - sessions[session_id]['addresses'][a] = '' - out = 'ok' - sessions[session_id]['last_time'] = time.time() elif cmd == 'bccapi_login': import electrum @@ -490,41 +539,7 @@ def do_command(cmd, data, ipaddr): out = '' elif cmd=='poll': - session_id = data - session = sessions.get(session_id) - if session is None: - print time.asctime(), "session not found", session_id, ipaddr - out = repr( (-1, {})) - else: - t1 = time.time() - addresses = session['addresses'] - session['last_time'] = time.time() - ret = {} - k = 0 - for addr in addresses: - if store.tx_cache.get( addr ) is not None: k += 1 - - # get addtess status, i.e. the last block for that address. - tx_points = store.get_history(addr) - if not tx_points: - status = None - else: - lastpoint = tx_points[-1] - status = lastpoint['blk_hash'] - # this is a temporary hack; move it up once old clients have disappeared - if status == 'mempool' and session['version'] != "old": - status = status + ':%d'% len(tx_points) - - last_status = addresses.get( addr ) - if last_status != status: - addresses[addr] = status - ret[addr] = status - if ret: - sessions[session_id]['addresses'] = addresses - out = repr( (block_number, ret ) ) - t2 = time.time() - t1 - if t2 > 10: - print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2 + out = cmd_poll(data) elif cmd == 'h': # history