diff --git a/tests/lbrynet/lbrynet/reflector/__init__.py b/tests/lbrynet/lbrynet/reflector/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/lbrynet/lbrynet/reflector/client/__init__.py b/tests/lbrynet/lbrynet/reflector/client/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/lbrynet/lbrynet/reflector/client/client.py b/tests/lbrynet/lbrynet/reflector/client/client.py deleted file mode 100644 index ce2102f95..000000000 --- a/tests/lbrynet/lbrynet/reflector/client/client.py +++ /dev/null @@ -1,243 +0,0 @@ -""" -The reflector protocol (all dicts encoded in json): - -Client Handshake (sent once per connection, at the start of the connection): - -{ - 'version': 0, -} - - -Server Handshake (sent once per connection, after receiving the client handshake): - -{ - 'version': 0, -} - - -Client Info Request: - -{ - 'blob_hash': "", - 'blob_size': -} - - -Server Info Response (sent in response to Client Info Request): - -{ - 'send_blob': True|False -} - -If response is 'YES', client may send a Client Blob Request or a Client Info Request. -If response is 'NO', client may only send a Client Info Request - - -Client Blob Request: - -{} # Yes, this is an empty dictionary, in case something needs to go here in the future - # this blob data must match the info sent in the most recent Client Info Request - - -Server Blob Response (sent in response to Client Blob Request): -{ - 'received_blob': True -} - -Client may now send another Client Info Request - -""" -import json -import logging -from twisted.protocols.basic import FileSender -from twisted.internet.protocol import Protocol, ClientFactory -from twisted.internet import defer, error - - -log = logging.getLogger(__name__) - - -class IncompleteResponseError(Exception): - pass - - -class LBRYFileReflectorClient(Protocol): - - # Protocol stuff - - def connectionMade(self): - self.blob_manager = self.factory.blob_manager - self.response_buff = '' - self.outgoing_buff = '' - self.blob_hashes_to_send = [] - self.next_blob_to_send = None - self.blob_read_handle = None - self.received_handshake_response = False - self.protocol_version = None - self.file_sender = None - self.producer = None - self.streaming = False - d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) - d.addCallback(lambda _: self.send_handshake()) - d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) - - def dataReceived(self, data): - self.response_buff += data - try: - msg = self.parse_response(self.response_buff) - except IncompleteResponseError: - pass - else: - self.response_buff = '' - d = self.handle_response(msg) - d.addCallback(lambda _: self.send_next_request()) - d.addErrback(self.response_failure_handler) - - def connectionLost(self, reason): - if reason.check(error.ConnectionDone): - self.factory.finished_deferred.callback(True) - else: - self.factory.finished_deferred.callback(reason) - - # IConsumer stuff - - def registerProducer(self, producer, streaming): - self.producer = producer - self.streaming = streaming - if self.streaming is False: - from twisted.internet import reactor - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - self.producer = None - - def write(self, data): - self.transport.write(data) - if self.producer is not None and self.streaming is False: - from twisted.internet import reactor - reactor.callLater(0, self.producer.resumeProducing) - - def get_blobs_to_send(self, stream_info_manager, stream_hash): - d = stream_info_manager.get_blobs_for_stream(stream_hash) - - def set_blobs(blob_hashes): - for blob_hash, position, iv, length in blob_hashes: - if blob_hash is not None: - self.blob_hashes_to_send.append(blob_hash) - - d.addCallback(set_blobs) - - d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash)) - - def set_sd_blobs(sd_blob_hashes): - for sd_blob_hash in sd_blob_hashes: - self.blob_hashes_to_send.append(sd_blob_hash) - - d.addCallback(set_sd_blobs) - return d - - def send_handshake(self): - self.write(json.dumps({'version': 0})) - - def parse_response(self, buff): - try: - return json.loads(buff) - except ValueError: - raise IncompleteResponseError() - - def response_failure_handler(self, err): - log.warning("An error occurred handling the response: %s", err.getTraceback()) - - def handle_response(self, response_dict): - if self.received_handshake_response is False: - return self.handle_handshake_response(response_dict) - else: - return self.handle_normal_response(response_dict) - - def set_not_uploading(self): - if self.next_blob_to_send is not None: - self.next_blob_to_send.close_read_handle(self.read_handle) - self.read_handle = None - self.next_blob_to_send = None - self.file_sender = None - return defer.succeed(None) - - def start_transfer(self): - self.write(json.dumps({})) - assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" - d = self.file_sender.beginFileTransfer(self.read_handle, self) - return d - - def handle_handshake_response(self, response_dict): - if 'version' not in response_dict: - raise ValueError("Need protocol version number!") - self.protocol_version = int(response_dict['version']) - if self.protocol_version != 0: - raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) - self.received_handshake_response = True - return defer.succeed(True) - - def handle_normal_response(self, response_dict): - if self.file_sender is None: # Expecting Server Info Response - if 'send_blob' not in response_dict: - raise ValueError("I don't know whether to send the blob or not!") - if response_dict['send_blob'] is True: - self.file_sender = FileSender() - return defer.succeed(True) - else: - return self.set_not_uploading() - else: # Expecting Server Blob Response - if 'received_blob' not in response_dict: - raise ValueError("I don't know if the blob made it to the intended destination!") - else: - return self.set_not_uploading() - - def open_blob_for_reading(self, blob): - if blob.is_validated(): - read_handle = blob.open_for_reading() - if read_handle is not None: - self.next_blob_to_send = blob - self.read_handle = read_handle - return None - raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) - - def send_blob_info(self): - assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" - self.write(json.dumps({ - 'blob_hash': self.next_blob_to_send.blob_hash, - 'blob_size': self.next_blob_to_send.length - })) - - def send_next_request(self): - if self.file_sender is not None: - # send the blob - return self.start_transfer() - elif self.blob_hashes_to_send: - # open the next blob to send - blob_hash = self.blob_hashes_to_send[0] - self.blob_hashes_to_send = self.blob_hashes_to_send[1:] - d = self.blob_manager.get_blob(blob_hash, True) - d.addCallback(self.open_blob_for_reading) - # send the server the next blob hash + length - d.addCallback(lambda _: self.send_blob_info()) - return d - else: - # close connection - self.transport.loseConnection() - - -class LBRYFileReflectorClientFactory(ClientFactory): - protocol = LBRYFileReflectorClient - - def __init__(self, blob_manager, stream_info_manager, stream_hash): - self.blob_manager = blob_manager - self.stream_info_manager = stream_info_manager - self.stream_hash = stream_hash - self.p = None - self.finished_deferred = defer.Deferred() - - def buildProtocol(self, addr): - p = self.protocol() - p.factory = self - self.p = p - return p \ No newline at end of file diff --git a/tests/lbrynet/lbrynet/reflector/server/__init__.py b/tests/lbrynet/lbrynet/reflector/server/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/lbrynet/lbrynet/reflector/server/server.py b/tests/lbrynet/lbrynet/reflector/server/server.py deleted file mode 100644 index a8f36ae22..000000000 --- a/tests/lbrynet/lbrynet/reflector/server/server.py +++ /dev/null @@ -1,132 +0,0 @@ -import logging -from twisted.python import failure -from twisted.internet import error, defer -from twisted.internet.protocol import Protocol, ServerFactory -import json - -from lbrynet.core.utils import is_valid_blobhash - - -log = logging.getLogger(__name__) - - -class ReflectorServer(Protocol): - - def connectionMade(self): - peer_info = self.transport.getPeer() - self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) - self.blob_manager = self.factory.blob_manager - self.received_handshake = False - self.peer_version = None - self.receiving_blob = False - self.incoming_blob = None - self.blob_write = None - self.blob_finished_d = None - self.cancel_write = None - self.request_buff = "" - - def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): - pass - - def dataReceived(self, data): - if self.receiving_blob is False: - self.request_buff += data - msg, extra_data = self._get_valid_response(self.request_buff) - if msg is not None: - self.request_buff = '' - d = self.handle_request(msg) - d.addCallbacks(self.send_response, self.handle_error) - if self.receiving_blob is True and len(extra_data) != 0: - self.blob_write(extra_data) - else: - self.blob_write(data) - - def _get_valid_response(self, response_msg): - extra_data = None - response = None - curr_pos = 0 - while 1: - next_close_paren = response_msg.find('}', curr_pos) - if next_close_paren != -1: - curr_pos = next_close_paren + 1 - try: - response = json.loads(response_msg[:curr_pos]) - except ValueError: - pass - else: - extra_data = response_msg[curr_pos:] - break - else: - break - return response, extra_data - - def handle_request(self, request_dict): - if self.received_handshake is False: - return self.handle_handshake(request_dict) - else: - return self.handle_normal_request(request_dict) - - def handle_handshake(self, request_dict): - if 'version' not in request_dict: - raise ValueError("Client should send version") - self.peer_version = int(request_dict['version']) - if self.peer_version != 0: - raise ValueError("I don't know that version!") - self.received_handshake = True - return defer.succeed({'version': 0}) - - def determine_blob_needed(self, blob): - if blob.is_validated(): - return {'send_blob': False} - else: - self.incoming_blob = blob - self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) - return {'send_blob': True} - - def close_blob(self): - self.blob_finished_d = None - self.blob_write = None - self.cancel_write = None - self.incoming_blob = None - self.receiving_blob = False - - def handle_normal_request(self, request_dict): - if self.blob_write is None: - # we haven't opened a blob yet, meaning we must be waiting for the - # next message containing a blob hash and a length. this message - # should be it. if it's one we want, open the blob for writing, and - # return a nice response dict (in a Deferred) saying go ahead - if not 'blob_hash' in request_dict or not 'blob_size' in request_dict: - raise ValueError("Expected a blob hash and a blob size") - if not is_valid_blobhash(request_dict['blob_hash']): - raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash'])) - d = self.blob_manager.get_blob( - request_dict['blob_hash'], - True, - int(request_dict['blob_size']) - ) - d.addCallback(self.determine_blob_needed) - else: - # we have a blob open already, so this message should have nothing - # important in it. to the deferred that fires when the blob is done, - # add a callback which returns a nice response dict saying to keep - # sending, and then return that deferred - self.receiving_blob = True - d = self.blob_finished_d - d.addCallback(lambda _: self.close_blob()) - d.addCallback(lambda _: {'received_blob': True}) - return d - - def send_response(self, response_dict): - self.transport.write(json.dumps(response_dict)) - - def handle_error(self, err): - pass - - -class ReflectorServerFactory(ServerFactory): - protocol = ReflectorServer - - def __init__(self, peer_manager, blob_manager): - self.peer_manager = peer_manager - self.blob_manager = blob_manager \ No newline at end of file