From 6931d8e58642c5d7f45ab1dcf3f8de012b66e20c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 10 May 2017 01:23:42 -0400 Subject: [PATCH] decorator for queued api commands --- lbrynet/lbrynet_daemon/Daemon.py | 2 ++ lbrynet/lbrynet_daemon/auth/server.py | 32 ++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 2e5a2bef4..f866ea163 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -1681,6 +1681,7 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(cost) @AuthJSONRPCServer.auth_required + @AuthJSONRPCServer.queued @defer.inlineCallbacks def jsonrpc_channel_new(self, channel_name, amount): """ @@ -1735,6 +1736,7 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(response) @AuthJSONRPCServer.auth_required + @AuthJSONRPCServer.queued @defer.inlineCallbacks def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None, description=None, author=None, language=None, license=None, diff --git a/lbrynet/lbrynet_daemon/auth/server.py b/lbrynet/lbrynet_daemon/auth/server.py index 684b1224c..5f46af3a3 100644 --- a/lbrynet/lbrynet_daemon/auth/server.py +++ b/lbrynet/lbrynet_daemon/auth/server.py @@ -114,6 +114,8 @@ class AuthorizedBase(object): def __init__(self): self.authorized_functions = [] self.callable_methods = {} + self._call_lock = {} + self._queued_methods = [] for methodname in dir(self): if methodname.startswith("jsonrpc_"): @@ -121,12 +123,19 @@ class AuthorizedBase(object): self.callable_methods.update({methodname.split("jsonrpc_")[1]: method}) if hasattr(method, '_auth_required'): self.authorized_functions.append(methodname.split("jsonrpc_")[1]) + if hasattr(method, '_queued'): + self._queued_methods.append(methodname.split("jsonrpc_")[1]) @staticmethod def auth_required(f): f._auth_required = True return f + @staticmethod + def queued(f): + f._queued = True + return f + class AuthJSONRPCServer(AuthorizedBase): """Authorized JSONRPC server used as the base class for the LBRY API @@ -254,6 +263,7 @@ class AuthJSONRPCServer(AuthorizedBase): id_ = None try: function_name = parsed.get('method') + is_queued = function_name in self._queued_methods args = parsed.get('params', {}) id_ = parsed.get('id', None) token = parsed.pop('hmac', None) @@ -324,7 +334,27 @@ class AuthJSONRPCServer(AuthorizedBase): ) return server.NOT_DONE_YET - d = defer.maybeDeferred(function, **args_dict) + if is_queued: + d_lock = self._call_lock.get(function_name, False) + if not d_lock: + d = defer.maybeDeferred(function, **args_dict) + self._call_lock[function_name] = finished_deferred + + def _del_lock(*args): + if function_name in self._call_lock: + del self._call_lock[function_name] + if args: + return args + + finished_deferred.addCallback(_del_lock) + + else: + log.info("queued %s", function_name) + d = d_lock + d.addBoth(lambda _: log.info("running %s from queue", function_name)) + d.addCallback(lambda _: defer.maybeDeferred(function, **args_dict)) + else: + d = defer.maybeDeferred(function, **args_dict) # finished_deferred will callback when the request is finished # and errback if something went wrong. If the errback is