From aa3aff91d041be80abff7d66077ba3056bc83488 Mon Sep 17 00:00:00 2001 From: Alex Liebowitz Date: Wed, 31 Aug 2016 03:49:43 -0400 Subject: [PATCH] Refactor LBRYStreamProducer and add Content-Length header Also fixes producer pause/unpause behavior and adds slight delay between sending chunks --- lbrynet/lbrynet_daemon/DaemonServer.py | 95 +++++++++++++------------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/lbrynet/lbrynet_daemon/DaemonServer.py b/lbrynet/lbrynet_daemon/DaemonServer.py index 7ede73313..230f152ee 100644 --- a/lbrynet/lbrynet_daemon/DaemonServer.py +++ b/lbrynet/lbrynet_daemon/DaemonServer.py @@ -242,58 +242,68 @@ class EncryptedFileStreamer(object): """ bufferSize = abstract.FileDescriptor.bufferSize - delay = 0.25 - def __init__(self, request, path, total_bytes): + + # How long to wait between sending blocks (needed because some + # video players freeze up if you try to send data too fast) + stream_interval = 0.02 + + # How long to wait before checking again + new_data_check_interval = 0.25 + + + def __init__(self, request, path, stream, file_manager): + def _set_content_length_header(length): + self._request.setHeader('content-length', length) + return defer.succeed(None) + self._request = request - self._fileObject = open(path, 'rb') - self._content_type = mimetypes.guess_type(path)[0] - self._bytes_written = 0 - self._stopped = False - self._total_bytes = total_bytes + self._file = open(path, 'rb') + self._stream = stream + self._file_manager = file_manager - self._deferred = defer.succeed(None) + self._running = True self._request.setResponseCode(200) self._request.setHeader('accept-ranges', 'none') - self._request.setHeader('content-type', self._content_type) + self._request.setHeader('content-type', mimetypes.guess_type(path)[0]) self._request.setHeader("Content-Security-Policy", "sandbox") - self.resumeProducing() + self._deferred = stream.get_total_bytes() + self._deferred.addCallback(_set_content_length_header) + self._deferred.addCallback(lambda _: self.resumeProducing()) def pauseProducing(self): - self._paused = True - log.info("Pausing producer") + self._running = False return defer.succeed(None) def resumeProducing(self): def _check_for_new_data(): - data = self._fileObject.read(self.bufferSize) - - self._request.write(data) - log.info('wrote to request') - self._bytes_written += len(data) - - if self._bytes_written >= self._total_bytes: - self.stopProducing() - return defer.succeed(None) - elif self._stopped: + if not self._running: return defer.succeed(None) + + data = self._file.read(self.bufferSize) + if data: + self._request.write(data) + self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.stream_interval, _check_for_new_data)) else: - self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.delay, _check_for_new_data)) - return defer.succeed(None) + status = self._file_manager.get_lbry_file_status(self._stream) + if status != ManagedLBRYFileDownloader.STATUS_FINISHED: + self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.new_data_check_interval, _check_for_new_data)) + else: + self.stopProducing() - log.info("Resuming producer") + self._running = True self._deferred.addCallback(lambda _: _check_for_new_data()) + return defer.succeed(None) def stopProducing(self): - log.info("Stopping producer") - self._stopped = True - self._fileObject.close() + self._running = False + self._file.close() self._deferred.addErrback(lambda err: err.trap(defer.CancelledError)) self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone)) self._deferred.cancel() - # self._request.finish() + self._request.finish() self._request.unregisterProducer() return defer.succeed(None) @@ -301,20 +311,15 @@ class EncryptedFileStreamer(object): class HostedEncryptedFile(resource.Resource): def __init__(self, api): self._api = api - self._producer = None resource.Resource.__init__(self) - def makeProducer(self, request, stream): - def _save_producer(producer): - self._producer = producer - return defer.succeed(None) - + def _make_stream_producer(self, request, stream): path = os.path.join(self._api.download_directory, stream.file_name) - d = stream.get_total_bytes() - d.addCallback(lambda total_bytes: _save_producer(EncryptedFileStreamer(request, path, total_bytes))) - d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True)) - ##request.notifyFinish().addCallback(lambda _: self._producer.stopProducing()) + producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager) + d = defer.Deferred(None) + d.addCallback(lambda _: request.registerProducer(producer, streaming=True)) + request.notifyFinish().addCallback(lambda _: producer.stopProducing()) request.notifyFinish().addErrback(self._responseFailed, d) return d @@ -323,7 +328,7 @@ class HostedEncryptedFile(resource.Resource): if 'name' in request.args.keys(): if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys(): d = self._api._download_name(request.args['name'][0]) - d.addCallback(lambda stream: self.makeProducer(request, stream)) + d.addCallback(lambda stream: self._make_stream_producer(request, stream)) elif request.args['name'][0] in self._api.waiting_on.keys(): request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0]) request.finish() @@ -333,12 +338,10 @@ class HostedEncryptedFile(resource.Resource): return server.NOT_DONE_YET def _responseFailed(self, err, call): - log.error("Hosted file response failed with error: " + str(err)) - - #call.addErrback(lambda err: err.trap(error.ConnectionDone)) - #call.addErrback(lambda err: err.trap(defer.CancelledError)) - #call.addErrback(lambda err: log.info("Error: " + str(err))) - #call.cancel() + call.addErrback(lambda err: err.trap(error.ConnectionDone)) + call.addErrback(lambda err: err.trap(defer.CancelledError)) + call.addErrback(lambda err: log.info("Error: " + str(err))) + call.cancel() class EncryptedFileUpload(resource.Resource): """