From c64e0c0e64a0355f0813b96a6a2a4a6701d6684d Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 5 May 2015 18:40:22 +0900 Subject: [PATCH] Send requests only from the interface thread. Currently requests are sent from the requestor's thread. The lock is not properly held where necessary so this is not thread-safe. For example it can race with the thread stopping and closing the socket the requestor is trying to use to send with. Resolve such races by having send_request() simply queue the requests, which are asynchronously sent from the interface thread itself. --- lib/interface.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/interface.py b/lib/interface.py index ffdb3d68c..986ab3fc8 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -60,6 +60,7 @@ class TcpInterface(threading.Thread): self.debug = False # dump network messages. can be changed at runtime using the console self.message_id = 0 self.response_queue = response_queue + self.request_queue = Queue.Queue() self.unanswered_requests = {} # are we waiting for a pong? self.is_ping = False @@ -249,22 +250,26 @@ class TcpInterface(threading.Thread): return s + def send_request(self, request, response_queue = None): + '''Queue a request. Blocking only if called from other threads.''' + self.request_queue.put((request, response_queue), threading.current_thread() != self) - def send_request(self, request, queue=None): - _id = request.get('id') - method = request.get('method') - params = request.get('params') - with self.lock: + def send_requests(self): + '''Sends all queued requests''' + while self.is_connected() and not self.request_queue.empty(): + request, response_queue = self.request_queue.get() + method = request.get('method') + params = request.get('params') + r = {'id': self.message_id, 'method': method, 'params': params} try: - r = {'id':self.message_id, 'method':method, 'params':params} self.pipe.send(r) - if self.debug: - self.print_error("-->", r) except socket.error, e: - self.print_error("socked error:", e) + self.print_error("socket error:", e) self.connected = False return - self.unanswered_requests[self.message_id] = method, params, _id, queue + if self.debug: + self.print_error("-->", r) + self.unanswered_requests[self.message_id] = method, params, request.get('id'), response_queue self.message_id += 1 def is_connected(self): @@ -304,6 +309,7 @@ class TcpInterface(threading.Thread): request_time = False while self.connected: self.maybe_ping() + self.send_requests() if not self.connected: break try: