stream from torrent pieces, holding the response until the piece is completed

This commit is contained in:
Victor Shyba 2022-09-05 13:11:00 -03:00
parent 6efd4dd19a
commit b3bff39eea
2 changed files with 38 additions and 7 deletions

View file

@ -5,7 +5,7 @@ import logging
import random import random
from hashlib import sha1 from hashlib import sha1
from tempfile import mkdtemp from tempfile import mkdtemp
from typing import Optional from typing import Optional, Tuple
import libtorrent import libtorrent
@ -31,9 +31,13 @@ class TorrentHandle:
self.total_wanted_done = 0 self.total_wanted_done = 0
self.name = '' self.name = ''
self.tasks = [] self.tasks = []
self.torrent_file: Optional[libtorrent.file_storage] = None self._torrent_info: libtorrent.torrent_info = handle.torrent_file()
self._base_path = None self._base_path = None
@property
def torrent_file(self) -> Optional[libtorrent.file_storage]:
return self._torrent_info.files()
@property @property
def largest_file(self) -> Optional[str]: def largest_file(self) -> Optional[str]:
if self.torrent_file is None: if self.torrent_file is None:
@ -58,6 +62,25 @@ class TorrentHandle:
while self.tasks: while self.tasks:
self.tasks.pop().cancel() self.tasks.pop().cancel()
def byte_range_to_piece_range(
self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]:
start_piece = self._torrent_info.map_file(file_index, start_offset, 0)
end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
return start_piece, end_piece
async def stream_range_as_completed(self, file_index, start, end):
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
start_piece_offset = final_piece.start
piece_size = self._torrent_info.piece_length()
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s",
first_piece.piece, final_piece.piece, start, end, self.name)
for piece_index in range(first_piece.piece, final_piece.piece + 1):
while not self._handle.have_piece(piece_index):
log.info("Waiting for piece %d: %s", piece_index, self.name)
await asyncio.sleep(0.2)
log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name)
yield piece_size - start_piece_offset
def _show_status(self): def _show_status(self):
# fixme: cleanup # fixme: cleanup
if not self._handle.is_valid(): if not self._handle.is_valid():
@ -69,8 +92,8 @@ class TorrentHandle:
self.name = status.name self.name = status.name
if not self.metadata_completed.is_set(): if not self.metadata_completed.is_set():
self.metadata_completed.set() self.metadata_completed.set()
self._torrent_info = self._handle.torrent_file()
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
self.torrent_file = self._handle.torrent_file().files()
self._base_path = status.save_path self._base_path = status.save_path
first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index)
if not self.started.is_set(): if not self.started.is_set():
@ -220,6 +243,10 @@ class TorrentSession:
def is_completed(self, btih): def is_completed(self, btih):
return self._handles[btih].finished.is_set() return self._handles[btih].finished.is_set()
def stream_largest_file(self, btih, start, end):
handle = self._handles[btih]
return handle.stream_range_as_completed(handle.largest_file_index, start, end)
def get_magnet_uri(btih): def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}" return f"magnet:?xt=urn:btih:{btih}"

View file

@ -94,7 +94,7 @@ class TorrentSource(ManagedDownloadSource):
async def stream_file(self, request): async def stream_file(self, request):
log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id, log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id,
self.identifier[:6]) self.identifier[:6])
headers, size, start, end = self._prepare_range_response_headers( headers, start, end = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-') request.headers.get('range', 'bytes=0-')
) )
await self.start() await self.start()
@ -105,9 +105,13 @@ class TorrentSource(ManagedDownloadSource):
await response.prepare(request) await response.prepare(request)
with open(self.full_path, 'rb') as infile: with open(self.full_path, 'rb') as infile:
infile.seek(start) infile.seek(start)
await response.write_eof(infile.read(size)) async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end):
if start + read_size < end:
await response.write(infile.read(read_size))
else:
await response.write_eof(infile.read(end - infile.tell()))
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]: def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
if '=' in get_range: if '=' in get_range:
get_range = get_range.split('=')[1] get_range = get_range.split('=')[1]
start, end = get_range.split('-') start, end = get_range.split('-')
@ -126,7 +130,7 @@ class TorrentSource(ManagedDownloadSource):
'Content-Length': str(final_size), 'Content-Length': str(final_size),
'Content-Type': self.mime_type 'Content-Type': self.mime_type
} }
return headers, final_size, start, end return headers, start, end
class TorrentManager(SourceManager): class TorrentManager(SourceManager):