from datetime import datetime, timedelta import pytz from core.config import SERVER_OWNER_CALLSIGN, MAX_ALERT_AGE from core.constants import SOFTWARE_NAME, SOFTWARE_VERSION # Generic alert provider class. Subclasses of this query the individual APIs for alerts. class AlertProvider: # HTTP headers used for spot providers that use HTTP HTTP_HEADERS = { "User-Agent": SOFTWARE_NAME + " " + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")" } # Constructor def __init__(self, provider_config): self.name = provider_config["name"] self.enabled = provider_config["enabled"] self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC) self.status = "Not Started" if self.enabled else "Disabled" self.alerts = None # Set up the provider, e.g. giving it the alert list to work from def setup(self, alerts): self.alerts = alerts # Start the provider. This should return immediately after spawning threads to access the remote resources def start(self): raise NotImplementedError("Subclasses must implement this method") # Submit a batch of alerts retrieved from the provider. There is no timestamp checking like there is for spots, # because alerts could be created at any point for any time in the future. Rely on hashcode-based id matching # to deal with duplicates. def submit_batch(self, alerts): for alert in alerts: # Fill in any blanks alert.infer_missing() # Add to the list, provided it heas not already expired. if not alert.expired(): self.alerts.add(alert.id, alert, expire=MAX_ALERT_AGE) # Stop any threads and prepare for application shutdown def stop(self): raise NotImplementedError("Subclasses must implement this method")