From 36782e087820b61c70c701490a6eb9392dcf5dba Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 4 Nov 2017 20:25:19 -0400 Subject: [PATCH] use shared deferredSemaphore for daemon methods decorated with 'queued' fixes race condition between publish and channel_new --- lbrynet/daemon/auth/server.py | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index c3171f0fe..c396d13a0 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -191,12 +191,12 @@ class AuthJSONRPCServer(AuthorizedBase): allowed_during_startup = [] def __init__(self, use_authentication=None): - self._call_lock = {} self._use_authentication = ( use_authentication if use_authentication is not None else conf.settings['use_auth_http'] ) self.announced_startup = False self.sessions = {} + self._queued_lock = defer.DeferredSemaphore(1) def setup(self): return NotImplementedError() @@ -363,24 +363,7 @@ class AuthJSONRPCServer(AuthorizedBase): return server.NOT_DONE_YET if is_queued: - d_lock = self._call_lock.get(function_name, False) - if not d_lock: - d = defer.maybeDeferred(function, self, **args_dict) - self._call_lock[function_name] = finished_deferred - - def _del_lock(*args): - if function_name in self._call_lock: - del self._call_lock[function_name] - if args: - return args - - finished_deferred.addCallback(_del_lock) - - else: - log.info("queued %s", function_name) - d = d_lock - d.addBoth(lambda _: log.info("running %s from queue", function_name)) - d.addCallback(lambda _: defer.maybeDeferred(function, self, **args_dict)) + d = self._queued_lock.run(function, self, **args_dict) else: d = defer.maybeDeferred(function, self, **args_dict)