Compare commits

...

15 commits

Author SHA1 Message Date
Jack Robison
4ae59b15e0
v0.37.3 2019-06-05 21:01:23 -04:00
Jack Robison
3771263179
update version docstring, add torba_version 2019-06-05 20:48:50 -04:00
Jack Robison
80ab0ba46d
v0.37.3rc6 2019-06-05 20:22:50 -04:00
Jack Robison
bd31c4c395
pin aioupnp requirement 2019-06-05 20:22:11 -04:00
Jack Robison
9bcb1d7d95
require torba 0.5.4a0 2019-06-05 20:22:07 -04:00
Jack Robison
e9ce41bea6
fix torba version on travis
# Conflicts:
#	.travis.yml
2019-06-05 18:21:44 -04:00
Thomas Zarebczan
734b80b2b2
add MKV to mime types 2019-06-05 15:39:26 -04:00
Jack Robison
a90919a9ba
mbps instead of mbs, remove time from connection status 2019-06-04 19:35:34 -04:00
Jack Robison
347415dceb
add connection status 2019-06-04 19:34:09 -04:00
Jack Robison
fe55187696
0.37.2 2019-05-24 18:59:06 -04:00
Jack Robison
db8fb15cc5
0.37.2rc4 2019-05-24 18:12:19 -04:00
Jack Robison
a165ad910f
improve set_build.py, pin torba version
- adds BUILD_COMMIT to lbrynet.build_type
2019-05-24 18:11:50 -04:00
Jack Robison
b711069456
fix false negative with file completed field 2019-05-24 17:31:26 -04:00
Jack Robison
c977a672b0
start returning range request bytes at the requested offset 2019-05-24 17:31:26 -04:00
Jack Robison
ac5794d96d
add blob_lru_cache_size config setting, defaults to 32 2019-05-24 17:31:26 -04:00
24 changed files with 337 additions and 80 deletions

View file

@ -3,6 +3,11 @@ dist: xenial
language: python
python: "3.7"
env:
global:
# must also be updated in wine_build.sh
- TORBA=v0.5.4a0
jobs:
include:
@ -10,7 +15,7 @@ jobs:
name: "pylint lbrynet"
install:
- pip install astroid pylint
- pip install git+https://github.com/lbryio/torba.git#egg=torba
- pip install git+https://github.com/lbryio/torba.git@${TORBA}#egg=torba
- pip install -e .
script: pylint lbrynet
@ -18,7 +23,7 @@ jobs:
name: "Unit Tests"
install:
- pip install coverage
- pip install git+https://github.com/lbryio/torba.git#egg=torba
- pip install git+https://github.com/lbryio/torba.git@${TORBA}#egg=torba
- pip install -e .
script:
- HOME=/tmp coverage run -p --source=lbrynet -m unittest discover -vv tests.unit
@ -29,7 +34,8 @@ jobs:
- name: "Integration Tests"
install:
- pip install tox-travis coverage
- pushd .. && git clone https://github.com/lbryio/torba.git && popd
- pushd .. && git clone --single-branch --branch ${TORBA} https://github.com/lbryio/torba.git && popd
script: tox
after_success:
- coverage combine tests/
@ -38,7 +44,7 @@ jobs:
- name: "Run Examples"
install:
- pip install coverage
- pip install git+https://github.com/lbryio/torba.git#egg=torba
- pip install git+https://github.com/lbryio/torba.git@${TORBA}#egg=torba
- pip install -e .
script:
- HOME=/tmp coverage run -p --source=lbrynet scripts/generate_json_api.py
@ -79,7 +85,7 @@ jobs:
env: OS=linux
install:
- pip3 install pyinstaller
- pip3 install git+https://github.com/lbryio/torba.git
- pip3 install git+https://github.com/lbryio/torba.git@${TORBA}#egg=torba
- python3 scripts/set_build.py
- pip3 install -e .
script:
@ -121,7 +127,7 @@ jobs:
script:
- set -e
- echo "$DOCKER_PASSWORD" | docker login --username "$DOCKER_USERNAME" --password-stdin
- travis_retry docker build -t lbry/wallet-server:$TRAVIS_TAG -f scripts/Dockerfile.wallet_server .
- travis_retry docker build -t lbry/wallet-server:$TRAVIS_TAG -f scripts/Dockerfile.wallet_server --build-arg TORBA_VERSION=${TORBA} .
- docker push lbry/wallet-server:$TRAVIS_TAG
- if: tag IS blank AND branch = master AND NOT type IN (pull_request)
stage: build
@ -129,7 +135,7 @@ jobs:
script:
- set -e
- echo "$DOCKER_PASSWORD" | docker login --username "$DOCKER_USERNAME" --password-stdin
- travis_retry docker build -t lbry/wallet-server:master -f scripts/Dockerfile.wallet_server .
- travis_retry docker build -t lbry/wallet-server:master -f scripts/Dockerfile.wallet_server --build-arg TORBA_VERSION=${TORBA} .
- docker push lbry/wallet-server:master
cache:

View file

@ -1,4 +1,4 @@
__name__ = "lbrynet"
__version__ = "0.37.1"
__version__ = "0.37.3"
version = tuple(__version__.split('.'))

View file

@ -2,8 +2,10 @@ import os
import typing
import asyncio
import logging
from lbrynet.utils import LRUCache
from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.connection_manager import ConnectionManager
if typing.TYPE_CHECKING:
from lbrynet.conf import Config
@ -30,6 +32,9 @@ class BlobManager:
else self._node_data_store.completed_blobs
self.blobs: typing.Dict[str, AbstractBlob] = {}
self.config = config
self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCache(
self.config.blob_lru_cache_size)
self.connection_manager = ConnectionManager(loop)
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
if self.config.save_blobs:
@ -81,9 +86,11 @@ class BlobManager:
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
if to_add:
self.completed_blob_hashes.update(to_add)
self.connection_manager.start()
return True
def stop(self):
self.connection_manager.stop()
while self.blobs:
_, blob = self.blobs.popitem()
blob.close()

View file

@ -8,18 +8,20 @@ from lbrynet.utils import cache_concurrent
if typing.TYPE_CHECKING:
from lbrynet.blob.blob_file import AbstractBlob
from lbrynet.blob.writer import HashBlobWriter
from lbrynet.connection_manager import ConnectionManager
log = logging.getLogger(__name__)
class BlobExchangeClientProtocol(asyncio.Protocol):
def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10):
def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10,
connection_manager: typing.Optional['ConnectionManager'] = None):
self.loop = loop
self.peer_port: typing.Optional[int] = None
self.peer_address: typing.Optional[str] = None
self.peer_timeout = peer_timeout
self.transport: typing.Optional[asyncio.Transport] = None
self.peer_timeout = peer_timeout
self.connection_manager = connection_manager
self.writer: typing.Optional['HashBlobWriter'] = None
self.blob: typing.Optional['AbstractBlob'] = None
@ -31,6 +33,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.closed = asyncio.Event(loop=self.loop)
def data_received(self, data: bytes):
if self.connection_manager:
if not self.peer_address:
addr_info = self.transport.get_extra_info('peername')
self.peer_address, self.peer_port = addr_info
# assert self.peer_address is not None
self.connection_manager.received_data(f"{self.peer_address}:{self.peer_port}", len(data))
#log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received",
# self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received)
if not self.transport or self.transport.is_closing():
@ -94,10 +102,15 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
"""
request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
blob_hash = self.blob.blob_hash
if not self.peer_address:
addr_info = self.transport.get_extra_info('peername')
self.peer_address, self.peer_port = addr_info
try:
msg = request.serialize()
log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode())
self.transport.write(msg)
if self.connection_manager:
self.connection_manager.sent_data(f"{self.peer_address}:{self.peer_port}", len(msg))
response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop)
availability_response = response.get_availability_response()
price_response = response.get_price_response()
@ -186,11 +199,16 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.writer = None
def connection_made(self, transport: asyncio.Transport):
addr = transport.get_extra_info('peername')
self.peer_address, self.peer_port = addr[0], addr[1]
self.transport = transport
self.peer_address, self.peer_port = self.transport.get_extra_info('peername')
if self.connection_manager:
self.connection_manager.connection_made(f"{self.peer_address}:{self.peer_port}")
log.debug("connection made to %s:%i", self.peer_address, self.peer_port)
def connection_lost(self, reason):
if self.connection_manager:
self.connection_manager.outgoing_connection_lost(f"{self.peer_address}:{self.peer_port}")
log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(reason),
str(type(reason)))
self.close()
@ -199,16 +217,19 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
@cache_concurrent
async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int,
peer_connect_timeout: float, blob_download_timeout: float,
connected_transport: asyncio.Transport = None, connection_id: int = 0)\
connected_transport: asyncio.Transport = None, connection_id: int = 0,
connection_manager: typing.Optional['ConnectionManager'] = None)\
-> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
"""
Returns [<downloaded blob>, <keep connection>]
"""
protocol = BlobExchangeClientProtocol(loop, blob_download_timeout)
protocol = BlobExchangeClientProtocol(
loop, blob_download_timeout, connection_manager
)
if connected_transport and not connected_transport.is_closing():
connected_transport.set_protocol(protocol)
protocol.connection_made(connected_transport)
protocol.transport = connected_transport
log.debug("reusing connection for %s:%d", address, tcp_port)
else:
connected_transport = None

View file

@ -41,7 +41,9 @@ class BlobDownloader:
start = self.loop.time()
bytes_received, transport = await request_blob(
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id,
connection_manager=self.blob_manager.connection_manager
)
if not transport and peer not in self.ignored:
self.ignored[peer] = self.loop.time()

View file

@ -22,15 +22,23 @@ class BlobServerProtocol(asyncio.Protocol):
self.buf = b''
self.transport = None
self.lbrycrd_address = lbrycrd_address
self.peer_address_and_port: typing.Optional[str] = None
def connection_made(self, transport):
self.transport = transport
self.peer_address_and_port = "%s:%i" % self.transport.get_extra_info('peername')
self.blob_manager.connection_manager.connection_received(self.peer_address_and_port)
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
self.blob_manager.connection_manager.incoming_connection_lost(self.peer_address_and_port)
def send_response(self, responses: typing.List[blob_response_types]):
to_send = []
while responses:
to_send.append(responses.pop())
self.transport.write(BlobResponse(to_send).serialize())
serialized = BlobResponse(to_send).serialize()
self.transport.write(serialized)
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, len(serialized))
async def handle_request(self, request: BlobRequest):
addr = self.transport.get_extra_info('peername')
@ -72,6 +80,7 @@ class BlobServerProtocol(asyncio.Protocol):
def data_received(self, data):
request = None
if data:
self.blob_manager.connection_manager.received_data(self.peer_address_and_port, len(data))
message, separator, remainder = data.rpartition(b'}')
if not separator:
self.buf += data
@ -97,7 +106,7 @@ class BlobServer:
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str):
self.loop = loop
self.blob_manager = blob_manager
self.server_task: asyncio.Task = None
self.server_task: typing.Optional[asyncio.Task] = None
self.started_listening = asyncio.Event(loop=self.loop)
self.lbrycrd_address = lbrycrd_address
self.server_protocol_class = BlobServerProtocol

View file

@ -1,2 +1,3 @@
# don't touch this. CI server changes this during build/deployment
BUILD = "dev"
BUILD_COMMIT = "source installation"

View file

@ -485,7 +485,10 @@ class Config(CLIConfig):
# blob announcement and download
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)
blob_lru_cache_size = Integer(
"LRU cache size for decrypted downloaded blobs used to minimize re-downloading the same blobs when "
"replying to a range request. Set to 0 to disable.", 32
)
announce_head_and_sd_only = Toggle(
"Announce only the descriptor and first (rather than all) data blob for a stream to the DHT", True,
previous_names=['announce_head_blobs_only']

View file

@ -0,0 +1,89 @@
import asyncio
import typing
import collections
import logging
log = logging.getLogger(__name__)
CONNECTED_EVENT = "connected"
DISCONNECTED_EVENT = "disconnected"
TRANSFERRED_EVENT = "transferred"
class ConnectionManager:
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop = loop
self.incoming_connected: typing.Set[str] = set()
self.incoming: typing.DefaultDict[str, int] = collections.defaultdict(int)
self.outgoing_connected: typing.Set[str] = set()
self.outgoing: typing.DefaultDict[str, int] = collections.defaultdict(int)
self._status = {}
self._task: typing.Optional[asyncio.Task] = None
@property
def status(self):
return self._status
def sent_data(self, host_and_port: str, size: int):
self.outgoing[host_and_port] += size
def received_data(self, host_and_port: str, size: int):
self.incoming[host_and_port] += size
def connection_made(self, host_and_port: str):
self.outgoing_connected.add(host_and_port)
def connection_received(self, host_and_port: str):
# self.incoming_connected.add(host_and_port)
pass
def outgoing_connection_lost(self, host_and_port: str):
if host_and_port in self.outgoing_connected:
self.outgoing_connected.remove(host_and_port)
def incoming_connection_lost(self, host_and_port: str):
if host_and_port in self.incoming_connected:
self.incoming_connected.remove(host_and_port)
async def _update(self):
self._status = {
'incoming_bps': {},
'outgoing_bps': {},
'total_incoming_mbps': 0.0,
'total_outgoing_mbps': 0.0,
}
while True:
last = self.loop.time()
await asyncio.sleep(1, loop=self.loop)
self._status['incoming_bps'].clear()
self._status['outgoing_bps'].clear()
while self.outgoing:
k, v = self.outgoing.popitem()
self._status['outgoing_bps'][k] = v
while self.incoming:
k, v = self.incoming.popitem()
self._status['incoming_bps'][k] = v
now = self.loop.time()
self._status['total_outgoing_mbps'] = int(sum(list(self._status['outgoing_bps'].values()))
/ (now - last)) / 1000000.0
self._status['total_incoming_mbps'] = int(sum(list(self._status['incoming_bps'].values()))
/ (now - last)) / 1000000.0
self._status['time'] = now
def stop(self):
if self._task:
self._task.cancel()
self._task = None
self.outgoing.clear()
self.outgoing_connected.clear()
self.incoming.clear()
self.incoming_connected.clear()
self._status.clear()
def start(self):
self.stop()
self._task = self.loop.create_task(self._update())

View file

@ -278,7 +278,7 @@ class BlobComponent(Component):
def __init__(self, component_manager):
super().__init__(component_manager)
self.blob_manager: BlobManager = None
self.blob_manager: typing.Optional[BlobManager] = None
@property
def component(self) -> typing.Optional[BlobManager]:
@ -294,7 +294,7 @@ class BlobComponent(Component):
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
if not os.path.isdir(blob_dir):
os.mkdir(blob_dir)
self.blob_manager = BlobManager(asyncio.get_event_loop(), blob_dir, storage, self.conf, data_store)
self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store)
return await self.blob_manager.setup()
async def stop(self):
@ -304,7 +304,10 @@ class BlobComponent(Component):
count = 0
if self.blob_manager:
count = len(self.blob_manager.completed_blob_hashes)
return {'finished_blobs': count}
return {
'finished_blobs': count,
'connections': self.blob_manager.connection_manager.status
}
class DHTComponent(Component):
@ -405,7 +408,7 @@ class StreamManagerComponent(Component):
def __init__(self, component_manager):
super().__init__(component_manager)
self.stream_manager: StreamManager = None
self.stream_manager: typing.Optional[StreamManager] = None
@property
def component(self) -> typing.Optional[StreamManager]:
@ -415,7 +418,7 @@ class StreamManagerComponent(Component):
if not self.stream_manager:
return
return {
'managed_files': len(self.stream_manager.streams)
'managed_files': len(self.stream_manager.streams),
}
async def start(self):
@ -444,7 +447,7 @@ class PeerProtocolServerComponent(Component):
def __init__(self, component_manager):
super().__init__(component_manager)
self.blob_server: BlobServer = None
self.blob_server: typing.Optional[BlobServer] = None
@property
def component(self) -> typing.Optional[BlobServer]:

View file

@ -485,6 +485,7 @@ class Daemon(metaclass=JSONRPCServerType):
async def handle_stream_get_request(self, request: web.Request):
if not self.conf.streaming_get:
log.warning("streaming_get is disabled, rejecting request")
raise web.HTTPForbidden()
name_and_claim_id = request.path.split("/get/")[1]
if "/" not in name_and_claim_id:
@ -500,6 +501,20 @@ class Daemon(metaclass=JSONRPCServerType):
raise web.HTTPFound(f"/stream/{stream.sd_hash}")
async def handle_stream_range_request(self, request: web.Request):
try:
return await self._handle_stream_range_request(request)
except web.HTTPException as err:
log.warning("http code during /stream range request: %s", err)
raise err
except asyncio.CancelledError:
log.debug("/stream range request cancelled")
except Exception:
log.exception("error handling /stream range request")
raise
finally:
log.debug("finished handling /stream range request")
async def _handle_stream_range_request(self, request: web.Request):
sd_hash = request.path.split("/stream/")[1]
if not self.stream_manager.started.is_set():
await self.stream_manager.started.wait()
@ -725,6 +740,16 @@ class Daemon(metaclass=JSONRPCServerType):
},
'blob_manager': {
'finished_blobs': (int) number of finished blobs in the blob manager,
'connections': {
'incoming_bps': {
<source ip and tcp port>: (int) bytes per second received,
},
'outgoing_bps': {
<destination ip and tcp port>: (int) bytes per second sent,
},
'total_outgoing_mbps': (float) megabytes per second sent,
'total_incoming_mbps': (float) megabytes per second received
}
},
'hash_announcer': {
'announce_queue_size': (int) number of blobs currently queued to be announced
@ -776,16 +801,14 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(dict) Dictionary of lbry version information
{
'build': (str) build type (e.g. "dev", "rc", "release"),
'ip': (str) remote ip, if available,
'lbrynet_version': (str) lbrynet_version,
'lbryum_version': (str) lbryum_version,
'lbryschema_version': (str) lbryschema_version,
'os_release': (str) os release string
'os_system': (str) os name
'platform': (str) platform string
'processor': (str) processor type,
'python_version': (str) python version,
'platform': (str) platform string,
'os_release': (str) os release string,
'os_system': (str) os name,
'lbrynet_version': (str) lbrynet version,
'torba_version': (str) torba version,
'build': (str) "dev" | "qa" | "rc" | "release",
}
"""
platform_info = system_info.get_platform()

View file

@ -3,6 +3,7 @@ import os
import logging.handlers
from lbrynet import build_type, __version__ as lbrynet_version
from torba import __version__ as torba_version
log = logging.getLogger(__name__)
@ -18,6 +19,7 @@ def get_platform() -> dict:
"os_release": platform.release(),
"os_system": os_system,
"lbrynet_version": lbrynet_version,
"torba_version": torba_version,
"build": build_type.BUILD, # CI server sets this during build step
}
if p["os_system"] == "Linux":

View file

@ -140,8 +140,10 @@ types_map = {
'.cbr': ('application/vnd.comicbook-rar', 'document'),
'.cbz': ('application/vnd.comicbook+zip', 'document'),
'.lbry': ('application/x-ext-lbry', 'document'),
'.m4v': ('video/m4v', 'video'),
'.mid': ('audio/midi', 'audio'),
'.midi': ('audio/midi', 'audio'),
'.mkv': ('video/x-matroska', 'video'),
'.mobi': ('application/x-mobipocket-ebook', 'document'),
'.pct': ('image/pict', 'image'),
'.pic': ('image/pict', 'image'),
@ -149,8 +151,7 @@ types_map = {
'.prc': ('application/x-mobipocket-ebook', 'document'),
'.rtf': ('application/rtf', 'document'),
'.xul': ('text/xul', 'document'),
'.m4v': ('video/m4v', 'video'),
# microsoft is special and has its own 'standard'
# https://docs.microsoft.com/en-us/windows/desktop/wmp/file-name-extensions
'.wmv': ('video/x-ms-wmv', 'video')

View file

@ -3,7 +3,7 @@ import typing
import logging
import binascii
from lbrynet.error import DownloadSDTimeout
from lbrynet.utils import resolve_host
from lbrynet.utils import resolve_host, lru_cache_concurrent
from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.blob_exchange.downloader import BlobDownloader
from lbrynet.dht.peer import KademliaPeer
@ -36,6 +36,16 @@ class StreamDownloader:
self.time_to_descriptor: typing.Optional[float] = None
self.time_to_first_bytes: typing.Optional[float] = None
async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
return await self.read_blob(blob_info, 2)
if self.blob_manager.decrypted_blob_lru_cache:
cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)(
cached_read_blob
)
self.cached_read_blob = cached_read_blob
async def add_fixed_peers(self):
def _delayed_add_fixed_peers():
self.added_fixed_peers = True

View file

@ -3,7 +3,7 @@ import asyncio
import typing
import logging
import binascii
from aiohttp.web import Request, StreamResponse
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbrynet.utils import generate_id
from lbrynet.error import DownloadSDTimeout
from lbrynet.schema.mime_types import guess_media_type
@ -42,6 +42,9 @@ class ManagedStream:
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
SAVING_ID = 1
STREAMING_ID = 2
__slots__ = [
'loop',
'config',
@ -214,7 +217,8 @@ class ManagedStream:
written_bytes = None
return {
'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}",
'completed': (self.output_file_exists and self.status in ('stopped', 'finished')) or all(
'completed': (self.output_file_exists and (self.status in ('stopped', 'finished'))
or not self.saving.is_set()) or all(
self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]),
'file_name': file_name,
'download_directory': download_directory,
@ -304,14 +308,19 @@ class ManagedStream:
raise IndexError(start_blob_num)
for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]):
assert i + start_blob_num == blob_info.blob_num
decrypted = await self.downloader.read_blob(blob_info, connection_id)
if connection_id == self.STREAMING_ID:
decrypted = await self.downloader.cached_read_blob(blob_info)
else:
decrypted = await self.downloader.read_blob(blob_info, connection_id)
yield (blob_info, decrypted)
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
self.sd_hash[:6])
headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-')
)
await self.start(node)
headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-'))
response = StreamResponse(
status=206,
headers=headers
@ -321,11 +330,16 @@ class ManagedStream:
self.streaming.set()
try:
wrote = 0
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=2):
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=self.STREAMING_ID):
if not wrote:
decrypted = decrypted[first_blob_start_offset:]
if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151)))
log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1,
len(self.descriptor.blobs) - 1)
await response.write_eof(decrypted)
else:
log.debug("sending browser blob (%i/%i)", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
await response.write(decrypted)
wrote += len(decrypted)
log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "",
@ -354,7 +368,7 @@ class ManagedStream:
self.started_writing.clear()
try:
with open(output_path, 'wb') as file_write_handle:
async for blob_info, decrypted in self._aiter_read_stream(connection_id=1):
async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted)
self.written_bytes += len(decrypted)
@ -485,7 +499,7 @@ class ManagedStream:
return
await asyncio.sleep(1, loop=self.loop)
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
if '=' in get_range:
get_range = get_range.split('=')[1]
start, end = get_range.split('-')
@ -503,16 +517,23 @@ class ManagedStream:
log.debug("estimating stream size")
start = int(start)
if not 0 <= start < size:
raise HTTPRequestRangeNotSatisfiable()
end = int(end) if end else size - 1
if end >= size:
raise HTTPRequestRangeNotSatisfiable()
skip_blobs = start // 2097150
skip = skip_blobs * 2097151
start = skip
skip_first_blob = start - skip
start = skip_first_blob + skip
final_size = end - start + 1
headers = {
'Accept-Ranges': 'bytes',
'Content-Range': f'bytes {start}-{end}/{size}',
'Content-Length': str(final_size),
'Content-Type': self.mime_type
}
return headers, size, skip_blobs
return headers, size, skip_blobs, skip_first_blob

View file

@ -62,6 +62,7 @@ class CommandTestCase(IntegrationTestCase):
LEDGER = lbrynet.wallet
MANAGER = LbryWalletManager
VERBOSITY = logging.WARN
blob_lru_cache_size = 0
async def asyncSetUp(self):
await super().asyncSetUp()
@ -81,6 +82,7 @@ class CommandTestCase(IntegrationTestCase):
conf.lbryum_servers = [('127.0.0.1', 50001)]
conf.reflector_servers = [('127.0.0.1', 5566)]
conf.known_dht_nodes = []
conf.blob_lru_cache_size = self.blob_lru_cache_size
await self.account.ensure_address_gap()
address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]

View file

@ -229,11 +229,12 @@ class LRUCache:
return item in self.cache
def lru_cache_concurrent(cache_size: int):
if not cache_size > 0:
def lru_cache_concurrent(cache_size: typing.Optional[int] = None,
override_lru_cache: typing.Optional[LRUCache] = None):
if not cache_size and override_lru_cache is None:
raise ValueError("invalid cache size")
concurrent_cache = {}
lru_cache = LRUCache(cache_size)
lru_cache = override_lru_cache or LRUCache(cache_size)
def wrapper(async_fn):

View file

@ -1,5 +1,7 @@
FROM debian:buster-slim
ARG TORBA_VERSION=master
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
@ -15,7 +17,7 @@ RUN python3.7 -m pip install --upgrade pip setuptools wheel
COPY . /app
WORKDIR /app
RUN python3.7 -m pip install git+https://github.com/lbryio/torba.git#egg=torba
RUN python3.7 -m pip install --user git+https://github.com/lbryio/torba.git@${TORBA_VERSION}#egg=torba
RUN python3.7 -m pip install -e .
# Orchstr8 API

View file

@ -1,6 +1,7 @@
FROM debian:buster-slim
ARG user=lbry
ARG TORBA_VERSION=master
# create an unprivileged user
RUN groupadd -g 999 $user && useradd -r -u 999 -g $user $user
@ -23,7 +24,7 @@ WORKDIR /home/$user
RUN python3.7 -m pip install --upgrade pip setuptools
# get torba
RUN python3.7 -m pip install --user git+https://github.com/lbryio/torba.git#egg=torba
RUN python3.7 -m pip install --user git+https://github.com/lbryio/torba.git@${TORBA_VERSION}#egg=torba
# get uvloop
RUN python3.7 -m pip install --user uvloop

View file

@ -1,33 +1,35 @@
"""Set the build version to be 'qa', 'rc', 'release'"""
"""Set the build version to be 'dev', 'qa', 'rc', 'release'"""
import os.path
import re
import subprocess
import sys
import os
import re
import logging
log = logging.getLogger()
log.addHandler(logging.StreamHandler())
log.setLevel(logging.DEBUG)
def get_build_type(travis_tag=None):
if not travis_tag:
return "qa"
log.debug("getting build type for tag: \"%s\"", travis_tag)
if re.match('v\d+\.\d+\.\d+rc\d+$', travis_tag):
return 'rc'
elif re.match('v\d+\.\d+\.\d+$', travis_tag):
return 'release'
return 'qa'
def main():
build = get_build()
root_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
with open(os.path.join(root_dir, 'lbrynet', 'build_type.py'), 'w') as f:
f.write("BUILD = '{}'\n".format(build))
def get_build():
try:
tag = subprocess.check_output(['git', 'describe', '--exact-match', '--all']).strip()
if re.match('tags\/v\d+\.\d+\.\d+rc\d+$', tag.decode()):
print('Build: rc')
return 'rc'
elif re.match('tags\/v\d+\.\d+\.\d+$', tag.decode()):
print('Build: release')
return 'release'
print('Build: qa')
return 'qa'
except subprocess.CalledProcessError:
print("Couldn't determine build type, defaulting to qa.")
return 'qa'
build_type_path = os.path.join(root_dir, 'lbrynet', 'build_type.py')
log.debug("configuring build type file: %s", build_type_path)
travis_commit = os.environ['TRAVIS_COMMIT'][:6]
build_type = get_build_type(os.environ.get('TRAVIS_TAG', None))
log.debug("setting build type=%s, build commit=%s", build_type, travis_commit)
with open(build_type_path, 'w') as f:
f.write("BUILD = \"{}\"\nBUILD_COMMIT = \"{}\"\n".format(build_type, travis_commit))
if __name__ == '__main__':

View file

@ -1,13 +1,12 @@
set -x
# must also be updated in travis.yml
TORBA_VERSION=v0.5.4a0
rm -rf /tmp/.wine-*
apt-get -qq update
apt-get -qq install -y git
pip install setuptools_scm
git clone https://github.com/lbryio/torba.git --depth 1
cd torba && pip install -e . && cd ..
cd lbry
@ -15,6 +14,13 @@ cd lbry
wget -Onetifaces-0.10.7-cp37-cp37m-win32.whl https://ci.appveyor.com/api/buildjobs/6hworunifsymrhp2/artifacts/dist%2Fnetifaces-0.10.7-cp37-cp37m-win32.whl
pip install netifaces-0.10.7-cp37-cp37m-win32.whl
git clone --depth=1 --single-branch --branch ${TORBA_VERSION} https://github.com/lbryio/torba.git
cd torba
pip install .
cd ..
rm -rf torba
pip show torba
pip install -e .
pip install pywin32

View file

@ -23,9 +23,9 @@ setup(
'console_scripts': 'lbrynet=lbrynet.extras.cli:main'
},
install_requires=[
'torba',
'torba==0.5.4a0',
'aiohttp==3.5.4',
'aioupnp',
'aioupnp==0.0.12',
'appdirs==1.4.3',
'certifi>=2018.11.29',
'colorama==0.3.7',

View file

@ -2,6 +2,7 @@ import os
import hashlib
import aiohttp
import aiohttp.web
import asyncio
from lbrynet.utils import aiohttp_request
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
@ -373,3 +374,46 @@ class RangeRequests(CommandTestCase):
await stream.finished_writing.wait()
with open(stream.full_path, 'rb') as f:
self.assertEqual(self.data, f.read())
class RangeRequestsLRUCache(CommandTestCase):
blob_lru_cache_size = 32
async def _request_stream(self):
name = 'foo'
url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/{name}'
async with aiohttp_request('get', url) as req:
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
content_range = req.headers.get('Content-Range')
content_length = int(req.headers.get('Content-Length'))
streamed_bytes = await req.content.read()
self.assertEqual(content_length, len(streamed_bytes))
self.assertEqual(15, content_length)
self.assertEqual(b'hi\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', streamed_bytes)
self.assertEqual('bytes 0-14/15', content_range)
async def test_range_requests_with_blob_lru_cache(self):
self.data = b'hi'
self.daemon.conf.save_blobs = False
self.daemon.conf.save_files = False
await self.stream_create('foo', '0.01', data=self.data, file_size=0)
await self.daemon.jsonrpc_file_list()[0].fully_reflected.wait()
await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo')
self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir)))
await self.daemon.streaming_runner.setup()
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
self.daemon.conf.streaming_port)
await site.start()
self.assertListEqual(self.daemon.jsonrpc_file_list(), [])
await self._request_stream()
self.assertEqual(1, len(self.daemon.jsonrpc_file_list()))
self.server.stop_server()
# running with cache size 0 gets through without errors without
# this since the server doesnt stop immediately
await asyncio.sleep(1, loop=self.loop)
await self._request_stream()

View file

@ -5,6 +5,7 @@ envlist = py37-integration
deps =
coverage
../torba
extras = test
changedir = {toxinidir}/tests
setenv =