import logging from datetime import datetime from threading import Thread from time import sleep import pytz from websocket import create_connection from core.constants import HTTP_HEADERS from spotproviders.spot_provider import SpotProvider # Spot provider using websockets. class WebsocketSpotProvider(SpotProvider): def __init__(self, provider_config, url): super().__init__(provider_config) self.url = url self.ws = None self.thread = None self.stopped = False self.last_event_id = None def start(self): logging.info("Set up websocket connection to " + self.name + " spot API.") self.stopped = False self.thread = Thread(target=self.run) self.thread.daemon = True self.thread.start() def stop(self): self.stopped = True if self.ws: self.ws.close() if self.thread: self.thread.join() def _on_open(self): self.status = "Waiting for Data" def _on_error(self): self.status = "Connecting" def run(self): while not self.stopped: try: logging.debug("Connecting to " + self.name + " spot API...") self.status = "Connecting" self.ws = create_connection(self.url, header=HTTP_HEADERS) self.status = "Connected" data = self.ws.recv() if data: try: new_spot = self.ws_message_to_spot(data) if new_spot: self.submit(new_spot) self.status = "OK" self.last_update_time = datetime.now(pytz.UTC) logging.debug("Received data from " + self.name + " spot API.") except Exception as e: logging.exception("Exception processing message from Websocket Spot Provider (" + self.name + ")") except Exception as e: self.status = "Error" logging.exception("Exception in Websocket Spot Provider (" + self.name + ")", e) else: self.status = "Disconnected" sleep(5) # Wait before trying to reconnect # Convert a WS message received from the API into a spot. The exact message data (in bytes) is provided here so the # subclass implementations can handle the message as string, JSON, XML, whatever the API actually provides. def ws_message_to_spot(self, bytes): raise NotImplementedError("Subclasses must implement this method")