Files
spothole/spotproviders/spot_provider.py
2026-02-27 20:33:45 +00:00

71 lines
3.0 KiB
Python

from datetime import datetime
import pytz
from core.config import MAX_SPOT_AGE
class SpotProvider:
"""Generic spot provider class. Subclasses of this query the individual APIs for data."""
def __init__(self, provider_config):
"""Constructor"""
self.name = provider_config["name"]
self.enabled = provider_config["enabled"]
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
self.last_spot_time = datetime.min.replace(tzinfo=pytz.UTC)
self.status = "Not Started" if self.enabled else "Disabled"
self._spots = None
self._web_server = None
def setup(self, spots, web_server):
"""Set up the provider, e.g. giving it the spot list to work from"""
self._spots = spots
self._web_server = web_server
def start(self):
"""Start the provider. This should return immediately after spawning threads to access the remote resources"""
raise NotImplementedError("Subclasses must implement this method")
def _submit_batch(self, spots):
"""Submit a batch of spots retrieved from the provider. Only spots that are newer than the last spot retrieved
by this provider will be added to the spot list, to prevent duplications. Spots passing the check will also have
their infer_missing() method called to complete their data set. This is called by the API-querying
subclasses on receiving spots."""
# Sort the batch so that earliest ones go in first. This helps keep the ordering correct when spots are fired
# off to SSE listeners.
spots = sorted(spots, key=lambda s: (s.time if s and s.time else 0))
for spot in spots:
if datetime.fromtimestamp(spot.time, pytz.UTC) > self.last_spot_time:
# Fill in any blanks and add to the list
spot.infer_missing()
self._add_spot(spot)
if spots:
self.last_spot_time = datetime.fromtimestamp(max(map(lambda s: s.time, spots)), pytz.UTC)
def _submit(self, spot):
"""Submit a single spot retrieved from the provider. This will be added to the list regardless of its age. Spots
passing the check will also have their infer_missing() method called to complete their data set. This is called by
the data streaming subclasses, which can be relied upon not to re-provide old spots."""
# Fill in any blanks and add to the list
spot.infer_missing()
self._add_spot(spot)
self.last_spot_time = datetime.fromtimestamp(spot.time, pytz.UTC)
def _add_spot(self, spot):
if not spot.expired():
self._spots.add(spot.id, spot, expire=MAX_SPOT_AGE)
# Ping the web server in case we have any SSE connections that need to see this immediately
if self._web_server:
self._web_server.notify_new_spot(spot)
def stop(self):
"""Stop any threads and prepare for application shutdown"""
raise NotImplementedError("Subclasses must implement this method")