from datetime import datetime, timedelta import pytz from core.config import SERVER_OWNER_CALLSIGN, MAX_ALERT_AGE from core.constants import SOFTWARE_NAME, SOFTWARE_VERSION # Generic alert provider class. Subclasses of this query the individual APIs for alerts. class AlertProvider: # HTTP headers used for spot providers that use HTTP HTTP_HEADERS = { "User-Agent": SOFTWARE_NAME + " " + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")" } # Constructor def __init__(self, provider_config): self.name = provider_config["name"] self.enabled = provider_config["enabled"] self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC) self.status = "Not Started" if self.enabled else "Disabled" self.alerts = None # Set up the provider, e.g. giving it the alert list to work from def setup(self, alerts): self.alerts = alerts # Start the provider. This should return immediately after spawning threads to access the remote resources def start(self): raise NotImplementedError("Subclasses must implement this method") # Submit a batch of alerts retrieved from the provider. There is no timestamp checking like there is for spots, # because alerts could be created at any point for any time in the future. Rely on hashcode-based id matching # to deal with duplicates. def submit_batch(self, alerts): for alert in alerts: # Fill in any blanks alert.infer_missing() # Add to the list, provided it meets the semsible date test. If alerts have an end time, it must be in the # future, or if not, then the start date must be at least in the last 24 hours. if (alert.end_time and alert.end_time > datetime.now(pytz.UTC).timestamp()) or ( not alert.end_time and alert.start_time > (datetime.now( pytz.UTC) - timedelta(days=1)).timestamp()): self.alerts.add(alert.id, alert, expire=MAX_ALERT_AGE) # Stop any threads and prepare for application shutdown def stop(self): raise NotImplementedError("Subclasses must implement this method")