mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-04-30 02:35:57 +00:00
Fetch solar conditions from HamQSL #92
This commit is contained in:
104
solarconditionsproviders/hamqsl.py
Normal file
104
solarconditionsproviders/hamqsl.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import logging
|
||||
from xml.etree import ElementTree
|
||||
|
||||
import pytz
|
||||
from dateutil import parser as dateutil_parser, tz as dateutil_tz
|
||||
|
||||
from data.solar_conditions import HFBandCondition, VHFCondition
|
||||
from solarconditionsproviders.http_solar_conditions_provider import HTTPSolarConditionsProvider
|
||||
|
||||
POLL_INTERVAL = 3600 # 1 hour
|
||||
URL = "https://www.hamqsl.com/solarxml.php"
|
||||
|
||||
|
||||
class HamQSL(HTTPSolarConditionsProvider):
|
||||
"""Solar conditions provider using the HamQSL.com XML API (https://www.hamqsl.com/solarxml.php).
|
||||
Provides solar flux index, geomagnetic indices, and HF/VHF propagation condition summaries."""
|
||||
|
||||
def __init__(self, provider_config):
|
||||
super().__init__(provider_config, URL, POLL_INTERVAL)
|
||||
|
||||
def _http_response_to_solar_conditions(self, http_response):
|
||||
if http_response.status_code != 200:
|
||||
logging.warning("HamQSL solar conditions API returned HTTP " + str(http_response.status_code))
|
||||
return None
|
||||
|
||||
root = ElementTree.fromstring(http_response.text)
|
||||
sd = root.find("solardata")
|
||||
if sd is None:
|
||||
logging.warning("HamQSL solar conditions API returned unexpected XML structure")
|
||||
return None
|
||||
|
||||
# Some error checking functions in case the data is janky.
|
||||
|
||||
def text(tag, default=None):
|
||||
el = sd.find(tag)
|
||||
return el.text.strip() if el is not None and el.text else default
|
||||
|
||||
def float_val(tag, default=None):
|
||||
try:
|
||||
return float(text(tag))
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
def int_val(tag, default=None):
|
||||
try:
|
||||
return int(text(tag))
|
||||
except (ValueError, TypeError):
|
||||
return default
|
||||
|
||||
# Process HF band conditions
|
||||
hf_conditions = []
|
||||
calc = sd.find("calculatedconditions")
|
||||
if calc is not None:
|
||||
for band_el in calc.findall("band"):
|
||||
name = band_el.get("name")
|
||||
time = band_el.get("time")
|
||||
condition = band_el.text.strip() if band_el.text else None
|
||||
if name and time and condition:
|
||||
hf_conditions.append(HFBandCondition(band=name, time=time, condition=condition))
|
||||
|
||||
# Process VHF propagation conditions
|
||||
vhf_conditions = []
|
||||
vhf = sd.find("calculatedvhfconditions")
|
||||
if vhf is not None:
|
||||
for ph_el in vhf.findall("phenomenon"):
|
||||
vhf_conditions.append(VHFCondition(
|
||||
phenomenon=ph_el.get("name"),
|
||||
location=ph_el.get("location"),
|
||||
condition=ph_el.text.strip() if ph_el.text else None
|
||||
))
|
||||
|
||||
# Parse the "updated" timestamp string (format: "28 Mar 2026 0949 GMT") to UTC epoch seconds.
|
||||
updated = None
|
||||
updated_str = text("updated")
|
||||
if updated_str:
|
||||
try:
|
||||
tz_abbr = updated_str.split()[-1]
|
||||
timezone = dateutil_tz.gettz(tz_abbr)
|
||||
if timezone is None:
|
||||
raise ValueError("Unknown timezone abbreviation: " + tz_abbr)
|
||||
dt = dateutil_parser.parse(updated_str, tzinfos={tz_abbr: timezone})
|
||||
updated = dt.astimezone(pytz.UTC).timestamp()
|
||||
except (ValueError, IndexError):
|
||||
logging.warning("HamQSL solar conditions API returned unrecognised timestamp format: " + updated_str)
|
||||
|
||||
# Return the data ready to be put into the solar conditions object.
|
||||
return {
|
||||
"updated": updated,
|
||||
"sfi": int_val("solarflux"),
|
||||
"a_index": int_val("aindex"),
|
||||
"k_index": int_val("kindex"),
|
||||
"x_ray": text("xray"),
|
||||
"sunspots": int_val("sunspots"),
|
||||
"proton_flux": int_val("protonflux"),
|
||||
"electron_flux": int_val("electonflux"),
|
||||
"aurora": int_val("aurora"),
|
||||
"aurora_latitude": float_val("latdegree"),
|
||||
"solar_wind": float_val("solarwind"),
|
||||
"magnetic_field": float_val("magneticfield"),
|
||||
"geomag_field": text("geomagfield"),
|
||||
"geomag_noise": text("signalnoise"),
|
||||
"hf_conditions": hf_conditions,
|
||||
"vhf_conditions": vhf_conditions
|
||||
}
|
||||
61
solarconditionsproviders/http_solar_conditions_provider.py
Normal file
61
solarconditionsproviders/http_solar_conditions_provider.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from threading import Thread, Event
|
||||
|
||||
import pytz
|
||||
import requests
|
||||
|
||||
from core.constants import HTTP_HEADERS
|
||||
from solarconditionsproviders.solar_conditions_provider import SolarConditionsProvider
|
||||
|
||||
|
||||
class HTTPSolarConditionsProvider(SolarConditionsProvider):
|
||||
"""Generic solar conditions provider for providers that request data via HTTP(S). Subclasses implement
|
||||
_http_response_to_solar_conditions() to parse the specific API response format."""
|
||||
|
||||
def __init__(self, provider_config, url, poll_interval):
|
||||
super().__init__(provider_config)
|
||||
self._url = url
|
||||
self._poll_interval = poll_interval
|
||||
self._thread = None
|
||||
self._stop_event = Event()
|
||||
|
||||
def start(self):
|
||||
logging.info("Set up query of " + self.name + " solar conditions API every " + str(self._poll_interval) + " seconds.")
|
||||
self._thread = Thread(target=self._run, daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self):
|
||||
self._stop_event.set()
|
||||
|
||||
def _run(self):
|
||||
while True:
|
||||
self._poll()
|
||||
if self._stop_event.wait(timeout=self._poll_interval):
|
||||
break
|
||||
|
||||
def _poll(self):
|
||||
try:
|
||||
logging.debug("Polling " + self.name + " solar conditions API...")
|
||||
http_response = requests.get(self._url, headers=HTTP_HEADERS)
|
||||
new_data = self._http_response_to_solar_conditions(http_response)
|
||||
if new_data:
|
||||
for key, value in new_data.items():
|
||||
if hasattr(self._solar_conditions, key):
|
||||
setattr(self._solar_conditions, key, value)
|
||||
|
||||
self.status = "OK"
|
||||
self.last_update_time = datetime.now(pytz.UTC)
|
||||
logging.debug("Received data from " + self.name + " solar conditions API.")
|
||||
|
||||
except Exception:
|
||||
self.status = "Error"
|
||||
logging.exception("Exception in HTTP Solar Conditions Provider (" + self.name + ")")
|
||||
self._stop_event.wait(timeout=1)
|
||||
|
||||
def _http_response_to_solar_conditions(self, http_response):
|
||||
"""Convert an HTTP response into solar conditions data. Returns a dict mapping SolarConditions field
|
||||
names to their new values, or None if the response could not be parsed. Only the fields returned will
|
||||
be updated on the shared SolarConditions object; any fields not included will be left unchanged."""
|
||||
|
||||
raise NotImplementedError("Subclasses must implement this method")
|
||||
32
solarconditionsproviders/solar_conditions_provider.py
Normal file
32
solarconditionsproviders/solar_conditions_provider.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from datetime import datetime
|
||||
|
||||
import pytz
|
||||
|
||||
|
||||
class SolarConditionsProvider:
|
||||
"""Generic solar conditions provider class. Subclasses of this query individual APIs for space weather and
|
||||
propagation data."""
|
||||
|
||||
def __init__(self, provider_config):
|
||||
"""Constructor"""
|
||||
|
||||
self.name = provider_config["name"]
|
||||
self.enabled = provider_config["enabled"]
|
||||
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
|
||||
self.status = "Not Started" if self.enabled else "Disabled"
|
||||
self._solar_conditions = None
|
||||
|
||||
def setup(self, solar_conditions):
|
||||
"""Set up the provider, giving it the solar conditions dict to update"""
|
||||
|
||||
self._solar_conditions = solar_conditions
|
||||
|
||||
def start(self):
|
||||
"""Start the provider. This should return immediately after spawning threads to access the remote resources"""
|
||||
|
||||
raise NotImplementedError("Subclasses must implement this method")
|
||||
|
||||
def stop(self):
|
||||
"""Stop any threads and prepare for application shutdown"""
|
||||
|
||||
raise NotImplementedError("Subclasses must implement this method")
|
||||
Reference in New Issue
Block a user