mirror of
https://github.com/LBRYFoundation/lbry-sdk.git
synced 2025-10-03 00:30:34 +00:00
Compare commits
15 commits
Author | SHA1 | Date | |
---|---|---|---|
|
4ae59b15e0 | ||
|
3771263179 | ||
|
80ab0ba46d | ||
|
bd31c4c395 | ||
|
9bcb1d7d95 | ||
|
e9ce41bea6 | ||
|
734b80b2b2 | ||
|
a90919a9ba | ||
|
347415dceb | ||
|
fe55187696 | ||
|
db8fb15cc5 | ||
|
a165ad910f | ||
|
b711069456 | ||
|
c977a672b0 | ||
|
ac5794d96d |
24 changed files with 337 additions and 80 deletions
20
.travis.yml
20
.travis.yml
|
@ -3,6 +3,11 @@ dist: xenial
|
|||
language: python
|
||||
python: "3.7"
|
||||
|
||||
env:
|
||||
global:
|
||||
# must also be updated in wine_build.sh
|
||||
- TORBA=v0.5.4a0
|
||||
|
||||
jobs:
|
||||
include:
|
||||
|
||||
|
@ -10,7 +15,7 @@ jobs:
|
|||
name: "pylint lbrynet"
|
||||
install:
|
||||
- pip install astroid pylint
|
||||
- pip install git+https://github.com/lbryio/torba.git#egg=torba
|
||||
- pip install git+https://github.com/lbryio/torba.git@${TORBA}#egg=torba
|
||||
- pip install -e .
|
||||
script: pylint lbrynet
|
||||
|
||||
|
@ -18,7 +23,7 @@ jobs:
|
|||
name: "Unit Tests"
|
||||
install:
|
||||
- pip install coverage
|
||||
- pip install git+https://github.com/lbryio/torba.git#egg=torba
|
||||
- pip install git+https://github.com/lbryio/torba.git@${TORBA}#egg=torba
|
||||
- pip install -e .
|
||||
script:
|
||||
- HOME=/tmp coverage run -p --source=lbrynet -m unittest discover -vv tests.unit
|
||||
|
@ -29,7 +34,8 @@ jobs:
|
|||
- name: "Integration Tests"
|
||||
install:
|
||||
- pip install tox-travis coverage
|
||||
- pushd .. && git clone https://github.com/lbryio/torba.git && popd
|
||||
- pushd .. && git clone --single-branch --branch ${TORBA} https://github.com/lbryio/torba.git && popd
|
||||
|
||||
script: tox
|
||||
after_success:
|
||||
- coverage combine tests/
|
||||
|
@ -38,7 +44,7 @@ jobs:
|
|||
- name: "Run Examples"
|
||||
install:
|
||||
- pip install coverage
|
||||
- pip install git+https://github.com/lbryio/torba.git#egg=torba
|
||||
- pip install git+https://github.com/lbryio/torba.git@${TORBA}#egg=torba
|
||||
- pip install -e .
|
||||
script:
|
||||
- HOME=/tmp coverage run -p --source=lbrynet scripts/generate_json_api.py
|
||||
|
@ -79,7 +85,7 @@ jobs:
|
|||
env: OS=linux
|
||||
install:
|
||||
- pip3 install pyinstaller
|
||||
- pip3 install git+https://github.com/lbryio/torba.git
|
||||
- pip3 install git+https://github.com/lbryio/torba.git@${TORBA}#egg=torba
|
||||
- python3 scripts/set_build.py
|
||||
- pip3 install -e .
|
||||
script:
|
||||
|
@ -121,7 +127,7 @@ jobs:
|
|||
script:
|
||||
- set -e
|
||||
- echo "$DOCKER_PASSWORD" | docker login --username "$DOCKER_USERNAME" --password-stdin
|
||||
- travis_retry docker build -t lbry/wallet-server:$TRAVIS_TAG -f scripts/Dockerfile.wallet_server .
|
||||
- travis_retry docker build -t lbry/wallet-server:$TRAVIS_TAG -f scripts/Dockerfile.wallet_server --build-arg TORBA_VERSION=${TORBA} .
|
||||
- docker push lbry/wallet-server:$TRAVIS_TAG
|
||||
- if: tag IS blank AND branch = master AND NOT type IN (pull_request)
|
||||
stage: build
|
||||
|
@ -129,7 +135,7 @@ jobs:
|
|||
script:
|
||||
- set -e
|
||||
- echo "$DOCKER_PASSWORD" | docker login --username "$DOCKER_USERNAME" --password-stdin
|
||||
- travis_retry docker build -t lbry/wallet-server:master -f scripts/Dockerfile.wallet_server .
|
||||
- travis_retry docker build -t lbry/wallet-server:master -f scripts/Dockerfile.wallet_server --build-arg TORBA_VERSION=${TORBA} .
|
||||
- docker push lbry/wallet-server:master
|
||||
|
||||
cache:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
__name__ = "lbrynet"
|
||||
__version__ = "0.37.1"
|
||||
__version__ = "0.37.3"
|
||||
version = tuple(__version__.split('.'))
|
||||
|
||||
|
|
|
@ -2,8 +2,10 @@ import os
|
|||
import typing
|
||||
import asyncio
|
||||
import logging
|
||||
from lbrynet.utils import LRUCache
|
||||
from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
from lbrynet.connection_manager import ConnectionManager
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.conf import Config
|
||||
|
@ -30,6 +32,9 @@ class BlobManager:
|
|||
else self._node_data_store.completed_blobs
|
||||
self.blobs: typing.Dict[str, AbstractBlob] = {}
|
||||
self.config = config
|
||||
self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCache(
|
||||
self.config.blob_lru_cache_size)
|
||||
self.connection_manager = ConnectionManager(loop)
|
||||
|
||||
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
|
||||
if self.config.save_blobs:
|
||||
|
@ -81,9 +86,11 @@ class BlobManager:
|
|||
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
|
||||
if to_add:
|
||||
self.completed_blob_hashes.update(to_add)
|
||||
self.connection_manager.start()
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
self.connection_manager.stop()
|
||||
while self.blobs:
|
||||
_, blob = self.blobs.popitem()
|
||||
blob.close()
|
||||
|
|
|
@ -8,18 +8,20 @@ from lbrynet.utils import cache_concurrent
|
|||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.blob.blob_file import AbstractBlob
|
||||
from lbrynet.blob.writer import HashBlobWriter
|
||||
from lbrynet.connection_manager import ConnectionManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10):
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10,
|
||||
connection_manager: typing.Optional['ConnectionManager'] = None):
|
||||
self.loop = loop
|
||||
self.peer_port: typing.Optional[int] = None
|
||||
self.peer_address: typing.Optional[str] = None
|
||||
self.peer_timeout = peer_timeout
|
||||
self.transport: typing.Optional[asyncio.Transport] = None
|
||||
|
||||
self.peer_timeout = peer_timeout
|
||||
self.connection_manager = connection_manager
|
||||
self.writer: typing.Optional['HashBlobWriter'] = None
|
||||
self.blob: typing.Optional['AbstractBlob'] = None
|
||||
|
||||
|
@ -31,6 +33,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
|||
self.closed = asyncio.Event(loop=self.loop)
|
||||
|
||||
def data_received(self, data: bytes):
|
||||
if self.connection_manager:
|
||||
if not self.peer_address:
|
||||
addr_info = self.transport.get_extra_info('peername')
|
||||
self.peer_address, self.peer_port = addr_info
|
||||
# assert self.peer_address is not None
|
||||
self.connection_manager.received_data(f"{self.peer_address}:{self.peer_port}", len(data))
|
||||
#log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received",
|
||||
# self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received)
|
||||
if not self.transport or self.transport.is_closing():
|
||||
|
@ -94,10 +102,15 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
|||
"""
|
||||
request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
|
||||
blob_hash = self.blob.blob_hash
|
||||
if not self.peer_address:
|
||||
addr_info = self.transport.get_extra_info('peername')
|
||||
self.peer_address, self.peer_port = addr_info
|
||||
try:
|
||||
msg = request.serialize()
|
||||
log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode())
|
||||
self.transport.write(msg)
|
||||
if self.connection_manager:
|
||||
self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg))
|
||||
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop)
|
||||
availability_response = response.get_availability_response()
|
||||
price_response = response.get_price_response()
|
||||
|
@ -186,11 +199,16 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
|||
self.writer = None
|
||||
|
||||
def connection_made(self, transport: asyncio.Transport):
|
||||
addr = transport.get_extra_info('peername')
|
||||
self.peer_address, self.peer_port = addr[0], addr[1]
|
||||
self.transport = transport
|
||||
self.peer_address, self.peer_port = self.transport.get_extra_info('peername')
|
||||
if self.connection_manager:
|
||||
self.connection_manager.connection_made(f"{self.peer_address}:{self.peer_port}")
|
||||
log.debug("connection made to %s:%i", self.peer_address, self.peer_port)
|
||||
|
||||
def connection_lost(self, reason):
|
||||
if self.connection_manager:
|
||||
self.connection_manager.outgoing_connection_lost(f"{self.peer_address}:{self.peer_port}")
|
||||
log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(reason),
|
||||
str(type(reason)))
|
||||
self.close()
|
||||
|
@ -199,16 +217,19 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
|||
@cache_concurrent
|
||||
async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int,
|
||||
peer_connect_timeout: float, blob_download_timeout: float,
|
||||
connected_transport: asyncio.Transport = None, connection_id: int = 0)\
|
||||
connected_transport: asyncio.Transport = None, connection_id: int = 0,
|
||||
connection_manager: typing.Optional['ConnectionManager'] = None)\
|
||||
-> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
|
||||
"""
|
||||
Returns [<downloaded blob>, <keep connection>]
|
||||
"""
|
||||
|
||||
protocol = BlobExchangeClientProtocol(loop, blob_download_timeout)
|
||||
protocol = BlobExchangeClientProtocol(
|
||||
loop, blob_download_timeout, connection_manager
|
||||
)
|
||||
if connected_transport and not connected_transport.is_closing():
|
||||
connected_transport.set_protocol(protocol)
|
||||
protocol.connection_made(connected_transport)
|
||||
protocol.transport = connected_transport
|
||||
log.debug("reusing connection for %s:%d", address, tcp_port)
|
||||
else:
|
||||
connected_transport = None
|
||||
|
|
|
@ -41,7 +41,9 @@ class BlobDownloader:
|
|||
start = self.loop.time()
|
||||
bytes_received, transport = await request_blob(
|
||||
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
||||
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id
|
||||
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id,
|
||||
connection_manager=self.blob_manager.connection_manager
|
||||
|
||||
)
|
||||
if not transport and peer not in self.ignored:
|
||||
self.ignored[peer] = self.loop.time()
|
||||
|
|
|
@ -22,15 +22,23 @@ class BlobServerProtocol(asyncio.Protocol):
|
|||
self.buf = b''
|
||||
self.transport = None
|
||||
self.lbrycrd_address = lbrycrd_address
|
||||
self.peer_address_and_port: typing.Optional[str] = None
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
self.peer_address_and_port = "%s:%i" % self.transport.get_extra_info('peername')
|
||||
self.blob_manager.connection_manager.connection_received(self.peer_address_and_port)
|
||||
|
||||
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
|
||||
self.blob_manager.connection_manager.incoming_connection_lost(self.peer_address_and_port)
|
||||
|
||||
def send_response(self, responses: typing.List[blob_response_types]):
|
||||
to_send = []
|
||||
while responses:
|
||||
to_send.append(responses.pop())
|
||||
self.transport.write(BlobResponse(to_send).serialize())
|
||||
serialized = BlobResponse(to_send).serialize()
|
||||
self.transport.write(serialized)
|
||||
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, len(serialized))
|
||||
|
||||
async def handle_request(self, request: BlobRequest):
|
||||
addr = self.transport.get_extra_info('peername')
|
||||
|
@ -72,6 +80,7 @@ class BlobServerProtocol(asyncio.Protocol):
|
|||
def data_received(self, data):
|
||||
request = None
|
||||
if data:
|
||||
self.blob_manager.connection_manager.received_data(self.peer_address_and_port, len(data))
|
||||
message, separator, remainder = data.rpartition(b'}')
|
||||
if not separator:
|
||||
self.buf += data
|
||||
|
@ -97,7 +106,7 @@ class BlobServer:
|
|||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
|
||||
self.loop = loop
|
||||
self.blob_manager = blob_manager
|
||||
self.server_task: asyncio.Task = None
|
||||
self.server_task: typing.Optional[asyncio.Task] = None
|
||||
self.started_listening = asyncio.Event(loop=self.loop)
|
||||
self.lbrycrd_address = lbrycrd_address
|
||||
self.server_protocol_class = BlobServerProtocol
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
# don't touch this. CI server changes this during build/deployment
|
||||
BUILD = "dev"
|
||||
BUILD_COMMIT = "source installation"
|
||||
|
|
|
@ -485,7 +485,10 @@ class Config(CLIConfig):
|
|||
|
||||
# blob announcement and download
|
||||
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)
|
||||
|
||||
blob_lru_cache_size = Integer(
|
||||
"LRU cache size for decrypted downloaded blobs used to minimize re-downloading the same blobs when "
|
||||
"replying to a range request. Set to 0 to disable.", 32
|
||||
)
|
||||
announce_head_and_sd_only = Toggle(
|
||||
"Announce only the descriptor and first (rather than all) data blob for a stream to the DHT", True,
|
||||
previous_names=['announce_head_blobs_only']
|
||||
|
|
89
lbrynet/connection_manager.py
Normal file
89
lbrynet/connection_manager.py
Normal file
|
@ -0,0 +1,89 @@
|
|||
import asyncio
|
||||
import typing
|
||||
import collections
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
CONNECTED_EVENT = "connected"
|
||||
DISCONNECTED_EVENT = "disconnected"
|
||||
TRANSFERRED_EVENT = "transferred"
|
||||
|
||||
|
||||
class ConnectionManager:
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop):
|
||||
self.loop = loop
|
||||
self.incoming_connected: typing.Set[str] = set()
|
||||
self.incoming: typing.DefaultDict[str, int] = collections.defaultdict(int)
|
||||
self.outgoing_connected: typing.Set[str] = set()
|
||||
self.outgoing: typing.DefaultDict[str, int] = collections.defaultdict(int)
|
||||
self._status = {}
|
||||
|
||||
self._task: typing.Optional[asyncio.Task] = None
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return self._status
|
||||
|
||||
def sent_data(self, host_and_port: str, size: int):
|
||||
self.outgoing[host_and_port] += size
|
||||
|
||||
def received_data(self, host_and_port: str, size: int):
|
||||
self.incoming[host_and_port] += size
|
||||
|
||||
def connection_made(self, host_and_port: str):
|
||||
self.outgoing_connected.add(host_and_port)
|
||||
|
||||
def connection_received(self, host_and_port: str):
|
||||
# self.incoming_connected.add(host_and_port)
|
||||
pass
|
||||
|
||||
def outgoing_connection_lost(self, host_and_port: str):
|
||||
if host_and_port in self.outgoing_connected:
|
||||
self.outgoing_connected.remove(host_and_port)
|
||||
|
||||
def incoming_connection_lost(self, host_and_port: str):
|
||||
if host_and_port in self.incoming_connected:
|
||||
self.incoming_connected.remove(host_and_port)
|
||||
|
||||
async def _update(self):
|
||||
|
||||
self._status = {
|
||||
'incoming_bps': {},
|
||||
'outgoing_bps': {},
|
||||
'total_incoming_mbps': 0.0,
|
||||
'total_outgoing_mbps': 0.0,
|
||||
}
|
||||
|
||||
while True:
|
||||
last = self.loop.time()
|
||||
await asyncio.sleep(1, loop=self.loop)
|
||||
self._status['incoming_bps'].clear()
|
||||
self._status['outgoing_bps'].clear()
|
||||
while self.outgoing:
|
||||
k, v = self.outgoing.popitem()
|
||||
self._status['outgoing_bps'][k] = v
|
||||
while self.incoming:
|
||||
k, v = self.incoming.popitem()
|
||||
self._status['incoming_bps'][k] = v
|
||||
now = self.loop.time()
|
||||
self._status['total_outgoing_mbps'] = int(sum(list(self._status['outgoing_bps'].values()))
|
||||
/ (now - last)) / 1000000.0
|
||||
self._status['total_incoming_mbps'] = int(sum(list(self._status['incoming_bps'].values()))
|
||||
/ (now - last)) / 1000000.0
|
||||
self._status['time'] = now
|
||||
|
||||
def stop(self):
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
self._task = None
|
||||
self.outgoing.clear()
|
||||
self.outgoing_connected.clear()
|
||||
self.incoming.clear()
|
||||
self.incoming_connected.clear()
|
||||
self._status.clear()
|
||||
|
||||
def start(self):
|
||||
self.stop()
|
||||
self._task = self.loop.create_task(self._update())
|
|
@ -278,7 +278,7 @@ class BlobComponent(Component):
|
|||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.blob_manager: BlobManager = None
|
||||
self.blob_manager: typing.Optional[BlobManager] = None
|
||||
|
||||
@property
|
||||
def component(self) -> typing.Optional[BlobManager]:
|
||||
|
@ -294,7 +294,7 @@ class BlobComponent(Component):
|
|||
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
|
||||
if not os.path.isdir(blob_dir):
|
||||
os.mkdir(blob_dir)
|
||||
self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, self.conf, data_store)
|
||||
self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store)
|
||||
return await self.blob_manager.setup()
|
||||
|
||||
async def stop(self):
|
||||
|
@ -304,7 +304,10 @@ class BlobComponent(Component):
|
|||
count = 0
|
||||
if self.blob_manager:
|
||||
count = len(self.blob_manager.completed_blob_hashes)
|
||||
return {'finished_blobs': count}
|
||||
return {
|
||||
'finished_blobs': count,
|
||||
'connections': self.blob_manager.connection_manager.status
|
||||
}
|
||||
|
||||
|
||||
class DHTComponent(Component):
|
||||
|
@ -405,7 +408,7 @@ class StreamManagerComponent(Component):
|
|||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.stream_manager: StreamManager = None
|
||||
self.stream_manager: typing.Optional[StreamManager] = None
|
||||
|
||||
@property
|
||||
def component(self) -> typing.Optional[StreamManager]:
|
||||
|
@ -415,7 +418,7 @@ class StreamManagerComponent(Component):
|
|||
if not self.stream_manager:
|
||||
return
|
||||
return {
|
||||
'managed_files': len(self.stream_manager.streams)
|
||||
'managed_files': len(self.stream_manager.streams),
|
||||
}
|
||||
|
||||
async def start(self):
|
||||
|
@ -444,7 +447,7 @@ class PeerProtocolServerComponent(Component):
|
|||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.blob_server: BlobServer = None
|
||||
self.blob_server: typing.Optional[BlobServer] = None
|
||||
|
||||
@property
|
||||
def component(self) -> typing.Optional[BlobServer]:
|
||||
|
|
|
@ -485,6 +485,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
async def handle_stream_get_request(self, request: web.Request):
|
||||
if not self.conf.streaming_get:
|
||||
log.warning("streaming_get is disabled, rejecting request")
|
||||
raise web.HTTPForbidden()
|
||||
name_and_claim_id = request.path.split("/get/")[1]
|
||||
if "/" not in name_and_claim_id:
|
||||
|
@ -500,6 +501,20 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
raise web.HTTPFound(f"/stream/{stream.sd_hash}")
|
||||
|
||||
async def handle_stream_range_request(self, request: web.Request):
|
||||
try:
|
||||
return await self._handle_stream_range_request(request)
|
||||
except web.HTTPException as err:
|
||||
log.warning("http code during /stream range request: %s", err)
|
||||
raise err
|
||||
except asyncio.CancelledError:
|
||||
log.debug("/stream range request cancelled")
|
||||
except Exception:
|
||||
log.exception("error handling /stream range request")
|
||||
raise
|
||||
finally:
|
||||
log.debug("finished handling /stream range request")
|
||||
|
||||
async def _handle_stream_range_request(self, request: web.Request):
|
||||
sd_hash = request.path.split("/stream/")[1]
|
||||
if not self.stream_manager.started.is_set():
|
||||
await self.stream_manager.started.wait()
|
||||
|
@ -725,6 +740,16 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
},
|
||||
'blob_manager': {
|
||||
'finished_blobs': (int) number of finished blobs in the blob manager,
|
||||
'connections': {
|
||||
'incoming_bps': {
|
||||
<source ip and tcp port>: (int) bytes per second received,
|
||||
},
|
||||
'outgoing_bps': {
|
||||
<destination ip and tcp port>: (int) bytes per second sent,
|
||||
},
|
||||
'total_outgoing_mbps': (float) megabytes per second sent,
|
||||
'total_incoming_mbps': (float) megabytes per second received
|
||||
}
|
||||
},
|
||||
'hash_announcer': {
|
||||
'announce_queue_size': (int) number of blobs currently queued to be announced
|
||||
|
@ -776,16 +801,14 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
Returns:
|
||||
(dict) Dictionary of lbry version information
|
||||
{
|
||||
'build': (str) build type (e.g. "dev", "rc", "release"),
|
||||
'ip': (str) remote ip, if available,
|
||||
'lbrynet_version': (str) lbrynet_version,
|
||||
'lbryum_version': (str) lbryum_version,
|
||||
'lbryschema_version': (str) lbryschema_version,
|
||||
'os_release': (str) os release string
|
||||
'os_system': (str) os name
|
||||
'platform': (str) platform string
|
||||
'processor': (str) processor type,
|
||||
'python_version': (str) python version,
|
||||
'platform': (str) platform string,
|
||||
'os_release': (str) os release string,
|
||||
'os_system': (str) os name,
|
||||
'lbrynet_version': (str) lbrynet version,
|
||||
'torba_version': (str) torba version,
|
||||
'build': (str) "dev" | "qa" | "rc" | "release",
|
||||
}
|
||||
"""
|
||||
platform_info = system_info.get_platform()
|
||||
|
|
|
@ -3,6 +3,7 @@ import os
|
|||
import logging.handlers
|
||||
|
||||
from lbrynet import build_type, __version__ as lbrynet_version
|
||||
from torba import __version__ as torba_version
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -18,6 +19,7 @@ def get_platform() -> dict:
|
|||
"os_release": platform.release(),
|
||||
"os_system": os_system,
|
||||
"lbrynet_version": lbrynet_version,
|
||||
"torba_version": torba_version,
|
||||
"build": build_type.BUILD, # CI server sets this during build step
|
||||
}
|
||||
if p["os_system"] == "Linux":
|
||||
|
|
|
@ -140,8 +140,10 @@ types_map = {
|
|||
'.cbr': ('application/vnd.comicbook-rar', 'document'),
|
||||
'.cbz': ('application/vnd.comicbook+zip', 'document'),
|
||||
'.lbry': ('application/x-ext-lbry', 'document'),
|
||||
'.m4v': ('video/m4v', 'video'),
|
||||
'.mid': ('audio/midi', 'audio'),
|
||||
'.midi': ('audio/midi', 'audio'),
|
||||
'.mkv': ('video/x-matroska', 'video'),
|
||||
'.mobi': ('application/x-mobipocket-ebook', 'document'),
|
||||
'.pct': ('image/pict', 'image'),
|
||||
'.pic': ('image/pict', 'image'),
|
||||
|
@ -149,8 +151,7 @@ types_map = {
|
|||
'.prc': ('application/x-mobipocket-ebook', 'document'),
|
||||
'.rtf': ('application/rtf', 'document'),
|
||||
'.xul': ('text/xul', 'document'),
|
||||
'.m4v': ('video/m4v', 'video'),
|
||||
|
||||
|
||||
# microsoft is special and has its own 'standard'
|
||||
# https://docs.microsoft.com/en-us/windows/desktop/wmp/file-name-extensions
|
||||
'.wmv': ('video/x-ms-wmv', 'video')
|
||||
|
|
|
@ -3,7 +3,7 @@ import typing
|
|||
import logging
|
||||
import binascii
|
||||
from lbrynet.error import DownloadSDTimeout
|
||||
from lbrynet.utils import resolve_host
|
||||
from lbrynet.utils import resolve_host, lru_cache_concurrent
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
from lbrynet.blob_exchange.downloader import BlobDownloader
|
||||
from lbrynet.dht.peer import KademliaPeer
|
||||
|
@ -36,6 +36,16 @@ class StreamDownloader:
|
|||
self.time_to_descriptor: typing.Optional[float] = None
|
||||
self.time_to_first_bytes: typing.Optional[float] = None
|
||||
|
||||
async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
|
||||
return await self.read_blob(blob_info, 2)
|
||||
|
||||
if self.blob_manager.decrypted_blob_lru_cache:
|
||||
cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)(
|
||||
cached_read_blob
|
||||
)
|
||||
|
||||
self.cached_read_blob = cached_read_blob
|
||||
|
||||
async def add_fixed_peers(self):
|
||||
def _delayed_add_fixed_peers():
|
||||
self.added_fixed_peers = True
|
||||
|
|
|
@ -3,7 +3,7 @@ import asyncio
|
|||
import typing
|
||||
import logging
|
||||
import binascii
|
||||
from aiohttp.web import Request, StreamResponse
|
||||
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
|
||||
from lbrynet.utils import generate_id
|
||||
from lbrynet.error import DownloadSDTimeout
|
||||
from lbrynet.schema.mime_types import guess_media_type
|
||||
|
@ -42,6 +42,9 @@ class ManagedStream:
|
|||
STATUS_STOPPED = "stopped"
|
||||
STATUS_FINISHED = "finished"
|
||||
|
||||
SAVING_ID = 1
|
||||
STREAMING_ID = 2
|
||||
|
||||
__slots__ = [
|
||||
'loop',
|
||||
'config',
|
||||
|
@ -214,7 +217,8 @@ class ManagedStream:
|
|||
written_bytes = None
|
||||
return {
|
||||
'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}",
|
||||
'completed': (self.output_file_exists and self.status in ('stopped', 'finished')) or all(
|
||||
'completed': (self.output_file_exists and (self.status in ('stopped', 'finished'))
|
||||
or not self.saving.is_set()) or all(
|
||||
self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]),
|
||||
'file_name': file_name,
|
||||
'download_directory': download_directory,
|
||||
|
@ -304,14 +308,19 @@ class ManagedStream:
|
|||
raise IndexError(start_blob_num)
|
||||
for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]):
|
||||
assert i + start_blob_num == blob_info.blob_num
|
||||
decrypted = await self.downloader.read_blob(blob_info, connection_id)
|
||||
if connection_id == self.STREAMING_ID:
|
||||
decrypted = await self.downloader.cached_read_blob(blob_info)
|
||||
else:
|
||||
decrypted = await self.downloader.read_blob(blob_info, connection_id)
|
||||
yield (blob_info, decrypted)
|
||||
|
||||
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
|
||||
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
|
||||
self.sd_hash[:6])
|
||||
headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers(
|
||||
request.headers.get('range', 'bytes=0-')
|
||||
)
|
||||
await self.start(node)
|
||||
headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-'))
|
||||
response = StreamResponse(
|
||||
status=206,
|
||||
headers=headers
|
||||
|
@ -321,11 +330,16 @@ class ManagedStream:
|
|||
self.streaming.set()
|
||||
try:
|
||||
wrote = 0
|
||||
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=2):
|
||||
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=self.STREAMING_ID):
|
||||
if not wrote:
|
||||
decrypted = decrypted[first_blob_start_offset:]
|
||||
if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
|
||||
decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151)))
|
||||
log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1,
|
||||
len(self.descriptor.blobs) - 1)
|
||||
await response.write_eof(decrypted)
|
||||
else:
|
||||
log.debug("sending browser blob (%i/%i)", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
||||
await response.write(decrypted)
|
||||
wrote += len(decrypted)
|
||||
log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "",
|
||||
|
@ -354,7 +368,7 @@ class ManagedStream:
|
|||
self.started_writing.clear()
|
||||
try:
|
||||
with open(output_path, 'wb') as file_write_handle:
|
||||
async for blob_info, decrypted in self._aiter_read_stream(connection_id=1):
|
||||
async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
|
||||
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
||||
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted)
|
||||
self.written_bytes += len(decrypted)
|
||||
|
@ -485,7 +499,7 @@ class ManagedStream:
|
|||
return
|
||||
await asyncio.sleep(1, loop=self.loop)
|
||||
|
||||
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
|
||||
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
|
||||
if '=' in get_range:
|
||||
get_range = get_range.split('=')[1]
|
||||
start, end = get_range.split('-')
|
||||
|
@ -503,16 +517,23 @@ class ManagedStream:
|
|||
log.debug("estimating stream size")
|
||||
|
||||
start = int(start)
|
||||
if not 0 <= start < size:
|
||||
raise HTTPRequestRangeNotSatisfiable()
|
||||
|
||||
end = int(end) if end else size - 1
|
||||
|
||||
if end >= size:
|
||||
raise HTTPRequestRangeNotSatisfiable()
|
||||
|
||||
skip_blobs = start // 2097150
|
||||
skip = skip_blobs * 2097151
|
||||
start = skip
|
||||
skip_first_blob = start - skip
|
||||
start = skip_first_blob + skip
|
||||
final_size = end - start + 1
|
||||
|
||||
headers = {
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Content-Range': f'bytes {start}-{end}/{size}',
|
||||
'Content-Length': str(final_size),
|
||||
'Content-Type': self.mime_type
|
||||
}
|
||||
return headers, size, skip_blobs
|
||||
return headers, size, skip_blobs, skip_first_blob
|
||||
|
|
|
@ -62,6 +62,7 @@ class CommandTestCase(IntegrationTestCase):
|
|||
LEDGER = lbrynet.wallet
|
||||
MANAGER = LbryWalletManager
|
||||
VERBOSITY = logging.WARN
|
||||
blob_lru_cache_size = 0
|
||||
|
||||
async def asyncSetUp(self):
|
||||
await super().asyncSetUp()
|
||||
|
@ -81,6 +82,7 @@ class CommandTestCase(IntegrationTestCase):
|
|||
conf.lbryum_servers = [('127.0.0.1', 50001)]
|
||||
conf.reflector_servers = [('127.0.0.1', 5566)]
|
||||
conf.known_dht_nodes = []
|
||||
conf.blob_lru_cache_size = self.blob_lru_cache_size
|
||||
|
||||
await self.account.ensure_address_gap()
|
||||
address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]
|
||||
|
|
|
@ -229,11 +229,12 @@ class LRUCache:
|
|||
return item in self.cache
|
||||
|
||||
|
||||
def lru_cache_concurrent(cache_size: int):
|
||||
if not cache_size > 0:
|
||||
def lru_cache_concurrent(cache_size: typing.Optional[int] = None,
|
||||
override_lru_cache: typing.Optional[LRUCache] = None):
|
||||
if not cache_size and override_lru_cache is None:
|
||||
raise ValueError("invalid cache size")
|
||||
concurrent_cache = {}
|
||||
lru_cache = LRUCache(cache_size)
|
||||
lru_cache = override_lru_cache or LRUCache(cache_size)
|
||||
|
||||
def wrapper(async_fn):
|
||||
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
FROM debian:buster-slim
|
||||
|
||||
ARG TORBA_VERSION=master
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get upgrade -y && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
|
@ -15,7 +17,7 @@ RUN python3.7 -m pip install --upgrade pip setuptools wheel
|
|||
COPY . /app
|
||||
WORKDIR /app
|
||||
|
||||
RUN python3.7 -m pip install git+https://github.com/lbryio/torba.git#egg=torba
|
||||
RUN python3.7 -m pip install --user git+https://github.com/lbryio/torba.git@${TORBA_VERSION}#egg=torba
|
||||
RUN python3.7 -m pip install -e .
|
||||
|
||||
# Orchstr8 API
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
FROM debian:buster-slim
|
||||
|
||||
ARG user=lbry
|
||||
ARG TORBA_VERSION=master
|
||||
|
||||
# create an unprivileged user
|
||||
RUN groupadd -g 999 $user && useradd -r -u 999 -g $user $user
|
||||
|
@ -23,7 +24,7 @@ WORKDIR /home/$user
|
|||
RUN python3.7 -m pip install --upgrade pip setuptools
|
||||
|
||||
# get torba
|
||||
RUN python3.7 -m pip install --user git+https://github.com/lbryio/torba.git#egg=torba
|
||||
RUN python3.7 -m pip install --user git+https://github.com/lbryio/torba.git@${TORBA_VERSION}#egg=torba
|
||||
|
||||
# get uvloop
|
||||
RUN python3.7 -m pip install --user uvloop
|
||||
|
|
|
@ -1,33 +1,35 @@
|
|||
"""Set the build version to be 'qa', 'rc', 'release'"""
|
||||
|
||||
"""Set the build version to be 'dev', 'qa', 'rc', 'release'"""
|
||||
|
||||
import os.path
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
|
||||
log = logging.getLogger()
|
||||
log.addHandler(logging.StreamHandler())
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def get_build_type(travis_tag=None):
|
||||
if not travis_tag:
|
||||
return "qa"
|
||||
log.debug("getting build type for tag: \"%s\"", travis_tag)
|
||||
if re.match('v\d+\.\d+\.\d+rc\d+$', travis_tag):
|
||||
return 'rc'
|
||||
elif re.match('v\d+\.\d+\.\d+$', travis_tag):
|
||||
return 'release'
|
||||
return 'qa'
|
||||
|
||||
|
||||
def main():
|
||||
build = get_build()
|
||||
root_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
with open(os.path.join(root_dir, 'lbrynet', 'build_type.py'), 'w') as f:
|
||||
f.write("BUILD = '{}'\n".format(build))
|
||||
|
||||
|
||||
def get_build():
|
||||
try:
|
||||
tag = subprocess.check_output(['git', 'describe', '--exact-match', '--all']).strip()
|
||||
if re.match('tags\/v\d+\.\d+\.\d+rc\d+$', tag.decode()):
|
||||
print('Build: rc')
|
||||
return 'rc'
|
||||
elif re.match('tags\/v\d+\.\d+\.\d+$', tag.decode()):
|
||||
print('Build: release')
|
||||
return 'release'
|
||||
print('Build: qa')
|
||||
return 'qa'
|
||||
except subprocess.CalledProcessError:
|
||||
print("Couldn't determine build type, defaulting to qa.")
|
||||
return 'qa'
|
||||
build_type_path = os.path.join(root_dir, 'lbrynet', 'build_type.py')
|
||||
log.debug("configuring build type file: %s", build_type_path)
|
||||
travis_commit = os.environ['TRAVIS_COMMIT'][:6]
|
||||
build_type = get_build_type(os.environ.get('TRAVIS_TAG', None))
|
||||
log.debug("setting build type=%s, build commit=%s", build_type, travis_commit)
|
||||
with open(build_type_path, 'w') as f:
|
||||
f.write("BUILD = \"{}\"\nBUILD_COMMIT = \"{}\"\n".format(build_type, travis_commit))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
set -x
|
||||
|
||||
# must also be updated in travis.yml
|
||||
TORBA_VERSION=v0.5.4a0
|
||||
rm -rf /tmp/.wine-*
|
||||
|
||||
apt-get -qq update
|
||||
apt-get -qq install -y git
|
||||
|
||||
pip install setuptools_scm
|
||||
git clone https://github.com/lbryio/torba.git --depth 1
|
||||
cd torba && pip install -e . && cd ..
|
||||
|
||||
cd lbry
|
||||
|
||||
|
@ -15,6 +14,13 @@ cd lbry
|
|||
wget -Onetifaces-0.10.7-cp37-cp37m-win32.whl https://ci.appveyor.com/api/buildjobs/6hworunifsymrhp2/artifacts/dist%2Fnetifaces-0.10.7-cp37-cp37m-win32.whl
|
||||
pip install netifaces-0.10.7-cp37-cp37m-win32.whl
|
||||
|
||||
git clone --depth=1 --single-branch --branch ${TORBA_VERSION} https://github.com/lbryio/torba.git
|
||||
cd torba
|
||||
pip install .
|
||||
cd ..
|
||||
rm -rf torba
|
||||
|
||||
pip show torba
|
||||
pip install -e .
|
||||
pip install pywin32
|
||||
|
||||
|
|
4
setup.py
4
setup.py
|
@ -23,9 +23,9 @@ setup(
|
|||
'console_scripts': 'lbrynet=lbrynet.extras.cli:main'
|
||||
},
|
||||
install_requires=[
|
||||
'torba',
|
||||
'torba==0.5.4a0',
|
||||
'aiohttp==3.5.4',
|
||||
'aioupnp',
|
||||
'aioupnp==0.0.12',
|
||||
'appdirs==1.4.3',
|
||||
'certifi>=2018.11.29',
|
||||
'colorama==0.3.7',
|
||||
|
|
|
@ -2,6 +2,7 @@ import os
|
|||
import hashlib
|
||||
import aiohttp
|
||||
import aiohttp.web
|
||||
import asyncio
|
||||
|
||||
from lbrynet.utils import aiohttp_request
|
||||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||
|
@ -373,3 +374,46 @@ class RangeRequests(CommandTestCase):
|
|||
await stream.finished_writing.wait()
|
||||
with open(stream.full_path, 'rb') as f:
|
||||
self.assertEqual(self.data, f.read())
|
||||
|
||||
|
||||
class RangeRequestsLRUCache(CommandTestCase):
|
||||
blob_lru_cache_size = 32
|
||||
|
||||
async def _request_stream(self):
|
||||
name = 'foo'
|
||||
url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/{name}'
|
||||
|
||||
async with aiohttp_request('get', url) as req:
|
||||
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
|
||||
content_range = req.headers.get('Content-Range')
|
||||
content_length = int(req.headers.get('Content-Length'))
|
||||
streamed_bytes = await req.content.read()
|
||||
self.assertEqual(content_length, len(streamed_bytes))
|
||||
self.assertEqual(15, content_length)
|
||||
self.assertEqual(b'hi\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', streamed_bytes)
|
||||
self.assertEqual('bytes 0-14/15', content_range)
|
||||
|
||||
async def test_range_requests_with_blob_lru_cache(self):
|
||||
self.data = b'hi'
|
||||
self.daemon.conf.save_blobs = False
|
||||
self.daemon.conf.save_files = False
|
||||
await self.stream_create('foo', '0.01', data=self.data, file_size=0)
|
||||
await self.daemon.jsonrpc_file_list()[0].fully_reflected.wait()
|
||||
await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo')
|
||||
self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||
|
||||
await self.daemon.streaming_runner.setup()
|
||||
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
|
||||
self.daemon.conf.streaming_port)
|
||||
await site.start()
|
||||
self.assertListEqual(self.daemon.jsonrpc_file_list(), [])
|
||||
|
||||
await self._request_stream()
|
||||
self.assertEqual(1, len(self.daemon.jsonrpc_file_list()))
|
||||
self.server.stop_server()
|
||||
|
||||
# running with cache size 0 gets through without errors without
|
||||
# this since the server doesnt stop immediately
|
||||
await asyncio.sleep(1, loop=self.loop)
|
||||
|
||||
await self._request_stream()
|
||||
|
|
1
tox.ini
1
tox.ini
|
@ -5,6 +5,7 @@ envlist = py37-integration
|
|||
deps =
|
||||
coverage
|
||||
../torba
|
||||
|
||||
extras = test
|
||||
changedir = {toxinidir}/tests
|
||||
setenv =
|
||||
|
|
Loading…
Add table
Reference in a new issue