mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-03-15 20:34:31 +00:00
56 lines
2.1 KiB
Python
56 lines
2.1 KiB
Python
from datetime import datetime
|
|
|
|
import pytz
|
|
|
|
from core.config import MAX_ALERT_AGE
|
|
|
|
|
|
class AlertProvider:
|
|
"""Generic alert provider class. Subclasses of this query the individual APIs for alerts."""
|
|
|
|
def __init__(self, provider_config):
|
|
"""Constructor"""
|
|
|
|
self.name = provider_config["name"]
|
|
self.enabled = provider_config["enabled"]
|
|
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
|
|
self.status = "Not Started" if self.enabled else "Disabled"
|
|
self._alerts = None
|
|
self._web_server = None
|
|
|
|
def setup(self, alerts, web_server):
|
|
"""Set up the provider, e.g. giving it the alert list to work from"""
|
|
|
|
self._alerts = alerts
|
|
self._web_server = web_server
|
|
|
|
def start(self):
|
|
"""Start the provider. This should return immediately after spawning threads to access the remote resources"""
|
|
|
|
raise NotImplementedError("Subclasses must implement this method")
|
|
|
|
def _submit_batch(self, alerts):
|
|
"""Submit a batch of alerts retrieved from the provider. There is no timestamp checking like there is for spots,
|
|
because alerts could be created at any point for any time in the future. Rely on hashcode-based id matching
|
|
to deal with duplicates."""
|
|
|
|
# Sort the batch so that earliest ones go in first. This helps keep the ordering correct when alerts are fired
|
|
# off to SSE listeners.
|
|
alerts = sorted(alerts, key=lambda a: (a.start_time if a and a.start_time else 0))
|
|
for alert in alerts:
|
|
# Fill in any blanks and add to the list
|
|
alert.infer_missing()
|
|
self._add_alert(alert)
|
|
|
|
def _add_alert(self, alert):
|
|
if not alert.expired():
|
|
self._alerts.add(alert.id, alert, expire=MAX_ALERT_AGE)
|
|
# Ping the web server in case we have any SSE connections that need to see this immediately
|
|
if self._web_server:
|
|
self._web_server.notify_new_alert(alert)
|
|
|
|
def stop(self):
|
|
"""Stop any threads and prepare for application shutdown"""
|
|
|
|
raise NotImplementedError("Subclasses must implement this method")
|