import logging from datetime import datetime from threading import Timer, Thread from time import sleep import pytz import requests from alertproviders.alert_provider import AlertProvider from core.constants import HTTP_HEADERS # Generic alert provider class for providers that request data via HTTP(S). Just for convenience to avoid code # duplication. Subclasses of this query the individual APIs for data. class HTTPAlertProvider(AlertProvider): def __init__(self, provider_config, url, poll_interval): super().__init__(provider_config) self.url = url self.poll_interval = poll_interval self.poll_timer = None def start(self): # Fire off a one-shot thread to run poll() for the first time, just to ensure start() returns immediately and # the application can continue starting. The thread itself will then die, and the timer will kick in on its own # thread. logging.info("Set up query of " + self.name + " alert API every " + str(self.poll_interval) + " seconds.") thread = Thread(target=self.poll) thread.daemon = True thread.start() def stop(self): self.poll_timer.cancel() def poll(self): try: # Request data from API logging.debug("Polling " + self.name + " alert API...") http_response = requests.get(self.url, headers=HTTP_HEADERS) # Pass off to the subclass for processing new_alerts = self.http_response_to_alerts(http_response) # Submit the new alerts for processing. There might not be any alerts for the less popular programs. if new_alerts: self.submit_batch(new_alerts) self.status = "OK" self.last_update_time = datetime.now(pytz.UTC) logging.debug("Received data from " + self.name + " alert API.") except Exception as e: self.status = "Error" logging.exception("Exception in HTTP JSON Alert Provider (" + self.name + ")") sleep(1) self.poll_timer = Timer(self.poll_interval, self.poll) self.poll_timer.start() # Convert an HTTP response returned by the API into alert data. The whole response is provided here so the subclass # implementations can check for HTTP status codes if necessary, and handle the response as JSON, XML, text, whatever # the API actually provides. def http_response_to_alerts(self, http_response): raise NotImplementedError("Subclasses must implement this method")