diff --git a/lbrynet/conf.py b/lbrynet/conf.py index f8a78f605..88a655abd 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -29,6 +29,10 @@ HEADERS_FILE_SHA256_CHECKSUM = ( 366295, 'b0c8197153a33ccbc52fb81a279588b6015b68b7726f73f6a2b81f7e25bfe4b9' ) +if sys.platform == "win32": + PIPE_NAME = r'\\.\pipe\lbrypipe' +else: + PIPE_NAME = './lbrypipe' class Setting(typing.Generic[T]): diff --git a/lbrynet/extras/cli.py b/lbrynet/extras/cli.py index 6a67b2515..8745b13c5 100644 --- a/lbrynet/extras/cli.py +++ b/lbrynet/extras/cli.py @@ -309,4 +309,7 @@ def main(argv=None): if __name__ == "__main__": + if sys.platform == 'win32': + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + sys.exit(main()) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 836053132..85300ebc7 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -16,7 +16,7 @@ from functools import wraps from torba.client.baseaccount import SingleKey, HierarchicalDeterministic from lbrynet import __version__, utils -from lbrynet.conf import Config, Setting, SLACK_WEBHOOK +from lbrynet.conf import Config, Setting, SLACK_WEBHOOK, PIPE_NAME from lbrynet.blob.blob_file import is_valid_blobhash from lbrynet.blob_exchange.downloader import download_blob from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted @@ -40,6 +40,7 @@ from lbrynet.schema.validator import validate_claim_id from lbrynet.schema.address import decode_address if typing.TYPE_CHECKING: + from asyncio import transports from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.dht.node import Node from lbrynet.extras.daemon.Components import UPnPComponent @@ -130,6 +131,29 @@ DHT_HAS_CONTACTS = "dht_has_contacts" WALLET_IS_UNLOCKED = "wallet_is_unlocked" +class NamedPipeServer(asyncio.Protocol): + def __init__(self, request_handler: 'asyncio.coroutine'): + self.transport = None + self.request_handler = request_handler + + async def handle_pipe_request(self, data): + log.info("received data from client %s", str(data)) + json_response = await self.request_handler(data) + self.transport.write(json_response.encode()) + + def connection_made(self, transport: 'transports.BaseTransport'): + self.transport = transport + + def connection_lost(self, exc: 'Optional[Exception]'): + pass + + def data_received(self, data: bytes): + asyncio.create_task(self.handle_pipe_request(data)) + + def eof_received(self): + pass + + class DHTHasContacts(RequiredCondition): name = DHT_HAS_CONTACTS component = DHT_COMPONENT @@ -389,6 +413,13 @@ class Daemon(metaclass=JSONRPCServerType): await self.analytics_manager.send_server_startup() await self.runner.setup() + try: + loop = asyncio.get_event_loop() + await loop.start_serving_pipe(lambda : NamedPipeServer(self.handle_pipe_request), PIPE_NAME) + log.info('lbrynet API listening on pipe %s', PIPE_NAME) + except Exception as e: + log.error(str(e)) + try: site = web.TCPSite(self.runner, self.conf.api_host, self.conf.api_port) await site.start() @@ -430,6 +461,15 @@ class Daemon(metaclass=JSONRPCServerType): if self.analytics_manager.is_started: self.analytics_manager.stop() + async def handle_pipe_request(self, data): + data = json.loads(data) + ledger = None + if 'wallet' in self.component_manager.get_components_status(): + # self.ledger only available if wallet component is not skipped + ledger = self.ledger + result = await self._process_rpc_call(data) + return jsonrpc_dumps_pretty(result, ledger=ledger) + async def handle_old_jsonrpc(self, request): data = await request.json() result = await self._process_rpc_call(data) diff --git a/scripts/test_script_for_pipes.py b/scripts/test_script_for_pipes.py new file mode 100644 index 000000000..9dc4eb9a2 --- /dev/null +++ b/scripts/test_script_for_pipes.py @@ -0,0 +1,47 @@ +import asyncio +import json +import typing + +if typing.TYPE_CHECKING: + from typing import Optional + from asyncio import transports + + +path = r'\\.\pipe\lbrypipe' + +class WindowsPipeProtocol(asyncio.Protocol): + def __init__(self): + self.transport = None + self.closed = asyncio.Event() + + def connection_made(self, transport: 'transports.BaseTransport'): + self.transport = transport + message = {'method': 'account_balance', 'params': {}} + message = json.dumps(message) + self.transport.write(message.encode()) + + def connection_lost(self, exc: 'Optional[Exception]'): + self.closed.set() + + def data_received(self, data: bytes): + print(data.decode()) + self.transport.close() + self.closed.set() + + def eof_received(self): + pass + + +def windows_pipe_factory(): + return WindowsPipeProtocol + +async def main(): + loop = asyncio.get_event_loop() + transport, protocol = await loop.create_pipe_connection(windows_pipe_factory(), path) + await protocol.closed.wait() + + +if __name__ == "__main__": + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + loop = asyncio.get_event_loop() + loop.run_until_complete(main())