import logging from datetime import datetime from threading import Thread, Event import pytz import requests from core.constants import HTTP_HEADERS from solarconditionsproviders.solar_conditions_provider import SolarConditionsProvider class HTTPSolarConditionsProvider(SolarConditionsProvider): """Generic solar conditions provider for providers that request data via HTTP(S). Subclasses implement _http_response_to_solar_conditions() to parse the specific API response format.""" def __init__(self, provider_config, url, poll_interval): super().__init__(provider_config) self._url = url self._poll_interval = poll_interval self._thread = None self._stop_event = Event() def start(self): logging.info( "Set up query of " + self.name + " solar conditions API every " + str(self._poll_interval) + " seconds.") self._thread = Thread(target=self._run, daemon=True) self._thread.start() def stop(self): self._stop_event.set() def _run(self): while True: self._poll() if self._stop_event.wait(timeout=self._poll_interval): break def _poll(self): try: logging.debug("Polling " + self.name + " solar conditions API...") http_response = requests.get(self._url, headers=HTTP_HEADERS) new_data = self._http_response_to_solar_conditions(http_response) self.update_data(new_data) self.status = "OK" self.last_update_time = datetime.now(pytz.UTC) logging.debug("Received data from " + self.name + " solar conditions API.") except Exception: self.status = "Error" logging.exception("Exception in HTTP Solar Conditions Provider (" + self.name + ")") self._stop_event.wait(timeout=1) def _http_response_to_solar_conditions(self, http_response): """Convert an HTTP response into solar conditions data. Returns a dict mapping SolarConditions field names to their new values, or None if the response could not be parsed. Only the fields returned will be updated on the shared SolarConditions object; any fields not included will be left unchanged.""" raise NotImplementedError("Subclasses must implement this method")