mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-04-30 10:45:57 +00:00
42 lines
1.4 KiB
Python
42 lines
1.4 KiB
Python
from datetime import datetime
|
|
|
|
import pytz
|
|
|
|
|
|
class SolarConditionsProvider:
|
|
"""Generic solar conditions provider class. Subclasses of this query individual APIs for space weather and
|
|
propagation data."""
|
|
|
|
def __init__(self, provider_config):
|
|
"""Constructor"""
|
|
|
|
self.name = provider_config["name"]
|
|
self.enabled = provider_config["enabled"]
|
|
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
|
|
self.status = "Not Started" if self.enabled else "Disabled"
|
|
self._solar_conditions = None
|
|
|
|
def setup(self, solar_conditions):
|
|
"""Set up the provider, giving it the solar conditions dict to update"""
|
|
|
|
self._solar_conditions = solar_conditions
|
|
|
|
def start(self):
|
|
"""Start the provider. This should return immediately after spawning threads to access the remote resources"""
|
|
|
|
raise NotImplementedError("Subclasses must implement this method")
|
|
|
|
def stop(self):
|
|
"""Stop any threads and prepare for application shutdown"""
|
|
|
|
raise NotImplementedError("Subclasses must implement this method")
|
|
|
|
def update_data(self, new_data):
|
|
"""Update the solar conditions object with new data"""
|
|
|
|
if new_data:
|
|
for key, value in new_data.items():
|
|
if hasattr(self._solar_conditions, key):
|
|
setattr(self._solar_conditions, key, value)
|
|
self._solar_conditions.infer_descriptions()
|