Refactor LBRYStreamProducer and add Content-Length header

Also fixes producer pause/unpause behavior and adds slight delay
between sending chunks
This commit is contained in:
Alex Liebowitz 2016-08-31 03:49:43 -04:00
parent 5537dd878f
commit aa3aff91d0

View file

@ -242,58 +242,68 @@ class EncryptedFileStreamer(object):
""" """
bufferSize = abstract.FileDescriptor.bufferSize bufferSize = abstract.FileDescriptor.bufferSize
delay = 0.25
def __init__(self, request, path, total_bytes):
# How long to wait between sending blocks (needed because some
# video players freeze up if you try to send data too fast)
stream_interval = 0.02
# How long to wait before checking again
new_data_check_interval = 0.25
def __init__(self, request, path, stream, file_manager):
def _set_content_length_header(length):
self._request.setHeader('content-length', length)
return defer.succeed(None)
self._request = request self._request = request
self._fileObject = open(path, 'rb') self._file = open(path, 'rb')
self._content_type = mimetypes.guess_type(path)[0] self._stream = stream
self._bytes_written = 0 self._file_manager = file_manager
self._stopped = False
self._total_bytes = total_bytes
self._deferred = defer.succeed(None) self._running = True
self._request.setResponseCode(200) self._request.setResponseCode(200)
self._request.setHeader('accept-ranges', 'none') self._request.setHeader('accept-ranges', 'none')
self._request.setHeader('content-type', self._content_type) self._request.setHeader('content-type', mimetypes.guess_type(path)[0])
self._request.setHeader("Content-Security-Policy", "sandbox") self._request.setHeader("Content-Security-Policy", "sandbox")
self.resumeProducing() self._deferred = stream.get_total_bytes()
self._deferred.addCallback(_set_content_length_header)
self._deferred.addCallback(lambda _: self.resumeProducing())
def pauseProducing(self): def pauseProducing(self):
self._paused = True self._running = False
log.info("Pausing producer")
return defer.succeed(None) return defer.succeed(None)
def resumeProducing(self): def resumeProducing(self):
def _check_for_new_data(): def _check_for_new_data():
data = self._fileObject.read(self.bufferSize) if not self._running:
self._request.write(data)
log.info('wrote to request')
self._bytes_written += len(data)
if self._bytes_written >= self._total_bytes:
self.stopProducing()
return defer.succeed(None)
elif self._stopped:
return defer.succeed(None) return defer.succeed(None)
data = self._file.read(self.bufferSize)
if data:
self._request.write(data)
self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.stream_interval, _check_for_new_data))
else: else:
self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.delay, _check_for_new_data)) status = self._file_manager.get_lbry_file_status(self._stream)
return defer.succeed(None) if status != ManagedLBRYFileDownloader.STATUS_FINISHED:
self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.new_data_check_interval, _check_for_new_data))
else:
self.stopProducing()
log.info("Resuming producer") self._running = True
self._deferred.addCallback(lambda _: _check_for_new_data()) self._deferred.addCallback(lambda _: _check_for_new_data())
return defer.succeed(None)
def stopProducing(self): def stopProducing(self):
log.info("Stopping producer") self._running = False
self._stopped = True self._file.close()
self._fileObject.close()
self._deferred.addErrback(lambda err: err.trap(defer.CancelledError)) self._deferred.addErrback(lambda err: err.trap(defer.CancelledError))
self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone)) self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone))
self._deferred.cancel() self._deferred.cancel()
# self._request.finish() self._request.finish()
self._request.unregisterProducer() self._request.unregisterProducer()
return defer.succeed(None) return defer.succeed(None)
@ -301,20 +311,15 @@ class EncryptedFileStreamer(object):
class HostedEncryptedFile(resource.Resource): class HostedEncryptedFile(resource.Resource):
def __init__(self, api): def __init__(self, api):
self._api = api self._api = api
self._producer = None
resource.Resource.__init__(self) resource.Resource.__init__(self)
def makeProducer(self, request, stream): def _make_stream_producer(self, request, stream):
def _save_producer(producer):
self._producer = producer
return defer.succeed(None)
path = os.path.join(self._api.download_directory, stream.file_name) path = os.path.join(self._api.download_directory, stream.file_name)
d = stream.get_total_bytes() producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager)
d.addCallback(lambda total_bytes: _save_producer(EncryptedFileStreamer(request, path, total_bytes))) d = defer.Deferred(None)
d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True)) d.addCallback(lambda _: request.registerProducer(producer, streaming=True))
##request.notifyFinish().addCallback(lambda _: self._producer.stopProducing()) request.notifyFinish().addCallback(lambda _: producer.stopProducing())
request.notifyFinish().addErrback(self._responseFailed, d) request.notifyFinish().addErrback(self._responseFailed, d)
return d return d
@ -323,7 +328,7 @@ class HostedEncryptedFile(resource.Resource):
if 'name' in request.args.keys(): if 'name' in request.args.keys():
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys(): if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
d = self._api._download_name(request.args['name'][0]) d = self._api._download_name(request.args['name'][0])
d.addCallback(lambda stream: self.makeProducer(request, stream)) d.addCallback(lambda stream: self._make_stream_producer(request, stream))
elif request.args['name'][0] in self._api.waiting_on.keys(): elif request.args['name'][0] in self._api.waiting_on.keys():
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0]) request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
request.finish() request.finish()
@ -333,12 +338,10 @@ class HostedEncryptedFile(resource.Resource):
return server.NOT_DONE_YET return server.NOT_DONE_YET
def _responseFailed(self, err, call): def _responseFailed(self, err, call):
log.error("Hosted file response failed with error: " + str(err)) call.addErrback(lambda err: err.trap(error.ConnectionDone))
call.addErrback(lambda err: err.trap(defer.CancelledError))
#call.addErrback(lambda err: err.trap(error.ConnectionDone)) call.addErrback(lambda err: log.info("Error: " + str(err)))
#call.addErrback(lambda err: err.trap(defer.CancelledError)) call.cancel()
#call.addErrback(lambda err: log.info("Error: " + str(err)))
#call.cancel()
class EncryptedFileUpload(resource.Resource): class EncryptedFileUpload(resource.Resource):
""" """