mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2025-10-27 08:49:27 +00:00
65 lines
2.6 KiB
Python
65 lines
2.6 KiB
Python
import logging
|
|
from datetime import datetime
|
|
from threading import Thread
|
|
from time import sleep
|
|
|
|
import pytz
|
|
from requests_sse import EventSource
|
|
|
|
from spotproviders.spot_provider import SpotProvider
|
|
|
|
# Spot provider using Server-Sent Events.
|
|
class SSESpotProvider(SpotProvider):
|
|
|
|
def __init__(self, provider_config, url):
|
|
super().__init__(provider_config)
|
|
self.url = url
|
|
self.event_source = None
|
|
self.thread = None
|
|
self.stopped = False
|
|
self.last_event_id = None
|
|
|
|
def start(self):
|
|
logging.info("Set up SSE connection to " + self.name + " spot API.")
|
|
self.stopped = False
|
|
self.thread = Thread(target=self.run)
|
|
self.thread.daemon = True
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
self.stopped = True
|
|
if self.event_source:
|
|
self.event_source.close()
|
|
if self.thread:
|
|
self.thread.join()
|
|
|
|
def run(self):
|
|
while not self.stopped:
|
|
try:
|
|
logging.debug("Connecting to " + self.name + " spot API...")
|
|
with EventSource(self.url, headers=self.HTTP_HEADERS, latest_event_id=self.last_event_id, timeout=30) as event_source:
|
|
self.event_source = event_source
|
|
for event in self.event_source:
|
|
if event.type == 'message':
|
|
try:
|
|
self.last_event_id = event.last_event_id
|
|
new_spot = self.sse_message_to_spot(event.data)
|
|
if new_spot:
|
|
self.submit(new_spot)
|
|
|
|
self.status = "OK"
|
|
self.last_update_time = datetime.now(pytz.UTC)
|
|
logging.debug("Received data from " + self.name + " spot API.")
|
|
|
|
except Exception as e:
|
|
logging.exception("Exception processing message from SSE Spot Provider (" + self.name + ")")
|
|
|
|
except Exception as e:
|
|
self.status = "Error"
|
|
logging.exception("Exception in SSE Spot Provider (" + self.name + ")")
|
|
sleep(5) # Wait before trying to reconnect
|
|
|
|
# Convert an SSE message received from the API into a spot. The whole message data is provided here so the subclass
|
|
# implementations can handle the message as JSON, XML, text, whatever the API actually provides.
|
|
def sse_message_to_spot(self, message_data):
|
|
raise NotImplementedError("Subclasses must implement this method") |