mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-04-30 10:45:57 +00:00
105 lines
4.2 KiB
Python
105 lines
4.2 KiB
Python
import logging
|
|
from xml.etree import ElementTree
|
|
|
|
import pytz
|
|
from dateutil import parser as dateutil_parser, tz as dateutil_tz
|
|
|
|
from data.solar_conditions import HFBandCondition, VHFCondition
|
|
from solarconditionsproviders.http_solar_conditions_provider import HTTPSolarConditionsProvider
|
|
|
|
POLL_INTERVAL = 3600 # 1 hour
|
|
URL = "https://www.hamqsl.com/solarxml.php"
|
|
|
|
|
|
class HamQSL(HTTPSolarConditionsProvider):
|
|
"""Solar conditions provider using the HamQSL.com XML API (https://www.hamqsl.com/solarxml.php).
|
|
Provides solar flux index, geomagnetic indices, and HF/VHF propagation condition summaries."""
|
|
|
|
def __init__(self, provider_config):
|
|
super().__init__(provider_config, URL, POLL_INTERVAL)
|
|
|
|
def _http_response_to_solar_conditions(self, http_response):
|
|
if http_response.status_code != 200:
|
|
logging.warning("HamQSL solar conditions API returned HTTP " + str(http_response.status_code))
|
|
return None
|
|
|
|
root = ElementTree.fromstring(http_response.text)
|
|
sd = root.find("solardata")
|
|
if sd is None:
|
|
logging.warning("HamQSL solar conditions API returned unexpected XML structure")
|
|
return None
|
|
|
|
# Some error checking functions in case the data is janky.
|
|
|
|
def text(tag, default=None):
|
|
el = sd.find(tag)
|
|
return el.text.strip() if el is not None and el.text else default
|
|
|
|
def float_val(tag, default=None):
|
|
try:
|
|
return float(text(tag))
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
def int_val(tag, default=None):
|
|
try:
|
|
return int(text(tag))
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
# Process HF band conditions
|
|
hf_conditions = []
|
|
calc = sd.find("calculatedconditions")
|
|
if calc is not None:
|
|
for band_el in calc.findall("band"):
|
|
name = band_el.get("name")
|
|
time = band_el.get("time")
|
|
condition = band_el.text.strip() if band_el.text else None
|
|
if name and time and condition:
|
|
hf_conditions.append(HFBandCondition(band=name, time=time, condition=condition))
|
|
|
|
# Process VHF propagation conditions
|
|
vhf_conditions = []
|
|
vhf = sd.find("calculatedvhfconditions")
|
|
if vhf is not None:
|
|
for ph_el in vhf.findall("phenomenon"):
|
|
vhf_conditions.append(VHFCondition(
|
|
phenomenon=ph_el.get("name"),
|
|
location=ph_el.get("location"),
|
|
condition=ph_el.text.strip() if ph_el.text else None
|
|
))
|
|
|
|
# Parse the "updated" timestamp string (format: "28 Mar 2026 0949 GMT") to UTC epoch seconds.
|
|
updated = None
|
|
updated_str = text("updated")
|
|
if updated_str:
|
|
try:
|
|
tz_abbr = updated_str.split()[-1]
|
|
timezone = dateutil_tz.gettz(tz_abbr)
|
|
if timezone is None:
|
|
raise ValueError("Unknown timezone abbreviation: " + tz_abbr)
|
|
dt = dateutil_parser.parse(updated_str, tzinfos={tz_abbr: timezone})
|
|
updated = dt.astimezone(pytz.UTC).timestamp()
|
|
except (ValueError, IndexError):
|
|
logging.warning("HamQSL solar conditions API returned unrecognised timestamp format: " + updated_str)
|
|
|
|
# Return the data ready to be put into the solar conditions object.
|
|
return {
|
|
"updated": updated,
|
|
"sfi": int_val("solarflux"),
|
|
"a_index": int_val("aindex"),
|
|
"k_index": int_val("kindex"),
|
|
"x_ray": text("xray"),
|
|
"sunspots": int_val("sunspots"),
|
|
"proton_flux": int_val("protonflux"),
|
|
"electron_flux": int_val("electonflux"),
|
|
"aurora": int_val("aurora"),
|
|
"aurora_latitude": float_val("latdegree"),
|
|
"solar_wind": float_val("solarwind"),
|
|
"magnetic_field": float_val("magneticfield"),
|
|
"geomag_field": text("geomagfield"),
|
|
"geomag_noise": text("signalnoise"),
|
|
"hf_conditions": hf_conditions,
|
|
"vhf_conditions": vhf_conditions
|
|
}
|