import logging from datetime import datetime from threading import Thread from time import sleep import pytz from requests_sse import EventSource from core.constants import HTTP_HEADERS from spotproviders.spot_provider import SpotProvider # Spot provider using Server-Sent Events. class SSESpotProvider(SpotProvider): def __init__(self, provider_config, url): super().__init__(provider_config) self.url = url self.event_source = None self.thread = None self.stopped = False self.last_event_id = None def start(self): logging.info("Set up SSE connection to " + self.name + " spot API.") self.stopped = False self.thread = Thread(target=self.run) self.thread.daemon = True self.thread.start() def stop(self): self.stopped = True if self.event_source: self.event_source.close() if self.thread: self.thread.join() def _on_open(self): self.status = "Waiting for Data" def _on_error(self): self.status = "Connecting" def run(self): while not self.stopped: try: logging.debug("Connecting to " + self.name + " spot API...") self.status = "Connecting" with EventSource(self.url, headers=HTTP_HEADERS, latest_event_id=self.last_event_id, timeout=30, on_open=self._on_open, on_error=self._on_error) as event_source: self.event_source = event_source for event in self.event_source: if event.type == 'message': try: self.last_event_id = event.last_event_id new_spot = self.sse_message_to_spot(event.data) if new_spot: self.submit(new_spot) self.status = "OK" self.last_update_time = datetime.now(pytz.UTC) logging.debug("Received data from " + self.name + " spot API.") except Exception as e: logging.exception("Exception processing message from SSE Spot Provider (" + self.name + ")") except Exception as e: self.status = "Error" logging.exception("Exception in SSE Spot Provider (" + self.name + ")") else: self.status = "Disconnected" sleep(5) # Wait before trying to reconnect # Convert an SSE message received from the API into a spot. The whole message data is provided here so the subclass # implementations can handle the message as JSON, XML, text, whatever the API actually provides. def sse_message_to_spot(self, message_data): raise NotImplementedError("Subclasses must implement this method")