Get WWBOTA data via SSE. Thanks to Steven M1SDH for the patch. Closes #4

This commit is contained in:
Ian Renton
2025-10-05 08:09:06 +01:00
parent 74153a9d94
commit c4aac4973d
5 changed files with 103 additions and 39 deletions

View File

@@ -0,0 +1,65 @@
import logging
from datetime import datetime
from threading import Thread
from time import sleep
import pytz
from requests_sse import EventSource
from spotproviders.spot_provider import SpotProvider
# Spot provider using Server-Sent Events.
class SSESpotProvider(SpotProvider):
def __init__(self, provider_config, url):
super().__init__(provider_config)
self.url = url
self.event_source = None
self.thread = None
self.stopped = False
self.last_event_id = None
def start(self):
logging.info("Set up SSE connection to " + self.name + " spot API.")
self.stopped = False
self.thread = Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.stopped = True
if self.event_source:
self.event_source.close()
if self.thread:
self.thread.join()
def run(self):
while not self.stopped:
try:
logging.debug("Connecting to " + self.name + " spot API...")
with EventSource(self.url, headers=self.HTTP_HEADERS, latest_event_id=self.last_event_id, timeout=30) as event_source:
self.event_source = event_source
for event in self.event_source:
if event.type == 'message':
try:
self.last_event_id = event.last_event_id
new_spot = self.sse_message_to_spot(event.data)
if new_spot:
self.submit(new_spot)
self.status = "OK"
self.last_update_time = datetime.now(pytz.UTC)
logging.debug("Received data from " + self.name + " spot API.")
except Exception as e:
logging.exception("Exception processing message from SSE Spot Provider (" + self.name + ")")
except Exception as e:
self.status = "Error"
logging.exception("Exception in SSE Spot Provider (" + self.name + ")")
sleep(5) # Wait before trying to reconnect
# Convert an SSE message received from the API into a spot. The whole message data is provided here so the subclass
# implementations can handle the message as JSON, XML, text, whatever the API actually provides.
def sse_message_to_spot(self, message_data):
raise NotImplementedError("Subclasses must implement this method")