Files
spothole/solarconditionsproviders/giroionosonde.py

175 lines
6.8 KiB
Python

import csv
import logging
from datetime import datetime, timezone, timedelta
from threading import Thread, Event
import pytz
import requests
from core.constants import HTTP_HEADERS, BANDS
from solarconditionsproviders.solar_conditions_provider import SolarConditionsProvider
POLL_INTERVAL = 3600 # 1 hour
STATIONS_INDEX = "datafiles/didbase-stations.csv"
LGDC_URL = "https://lgdc.uml.edu/common/DIDBGetValues"
HISTORY_HOURS = 24
HF_BANDS = [b for b in BANDS if b.is_ham_hf]
class GIROIonosonde(SolarConditionsProvider):
"""Solar conditions provider using ionosonde data from the GIRO Data Center.
Queries foF2, MUF, and LUF measurements for all stations in datafiles/didbase-stations.csv."""
def __init__(self, provider_config):
super().__init__(provider_config)
self._stations = self._load_stations()
self._thread = None
self._stop_event = Event()
def _load_stations(self):
"""Load the CSV file containing the list of URSIs and Station Names for currently active ionosondes."""
stations = []
with open(STATIONS_INDEX, newline='') as f:
for row in csv.reader(f):
if len(row) >= 2:
stations.append({"ursi": row[0].strip(), "name": row[1].strip()})
return stations
def setup(self, solar_conditions, solar_conditions_cache):
"""Prepopulate the ionosonde_data map with known URSI and station names, so that the API exposes this structure
even before we actually have any data in it."""
super().setup(solar_conditions, solar_conditions_cache)
self.update_data({"ionosonde_data": {
s["ursi"]: {"ursi": s["ursi"], "name": s["name"], "fof2": None, "muf": None, "luf": None,
"band_states": None}
for s in self._stations
}})
def start(self):
logging.info(f"Set up query of GIRO ionosonde data API every {POLL_INTERVAL} seconds.")
self._thread = Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
def _run(self):
while True:
self._poll()
if self._stop_event.wait(timeout=POLL_INTERVAL):
break
def _poll(self):
try:
logging.debug(f"Polling GIRO ionosonde data...")
now = datetime.now(timezone.utc)
from_time = now - timedelta(hours=HISTORY_HOURS)
ionosonde_data = dict(self._solar_conditions.ionosonde_data or {})
updated_count = 0
for station in self._stations:
if self._stop_event.is_set():
break
ursi = station["ursi"]
name = station["name"]
try:
fof2, muf, luf = self._fetch_station_data(ursi, from_time, now)
if fof2 and muf:
band_states = self._compute_band_statess(fof2, muf, luf or {})
ionosonde_data[ursi] = {"ursi": ursi, "name": name, "fof2": fof2, "muf": muf,
"luf": luf or None, "band_states": band_states}
updated_count += 1
except Exception:
logging.warning(f"Could not fetch ionosonde data for {ursi} ({name})")
self.update_data({"ionosonde_data": ionosonde_data})
self.status = "OK"
self.last_update_time = datetime.now(pytz.UTC)
logging.debug(f"Updated ionosonde data for {updated_count} stations.")
except Exception:
self.status = "Error"
logging.exception(f"Exception in GIRO Ionosonde data provider")
self._stop_event.wait(timeout=1)
def _fetch_station_data(self, ursi, from_time, to_time):
"""Fetch foF2, MUF and LUF readings for a station. Returns (fof2_dict, muf_dict, luf_dict) keyed by UNIX timestamp."""
from_str = from_time.strftime("%Y.%m.%d+%H:%M:%S")
to_str = to_time.strftime("%Y.%m.%d+%H:%M:%S")
url = f"{LGDC_URL}?ursiCode={ursi}&charName=foF2,MUFD,fmin&DMUF=3000&fromDate={from_str}&toDate={to_str}"
response = requests.get(url, headers=HTTP_HEADERS, timeout=(5, 15))
if response.status_code != 200:
return None, None, None
return self._parse_all(response.text)
@staticmethod
def _latest(d):
"""Return the value with the highest timestamp key, or None if the dict is empty."""
return d[max(d.keys())] if d else None
@staticmethod
def _compute_band_statess(fof2_dict, muf_dict, luf_dict):
"""Compute HF band states from the latest foF2, MUF and LUF values.
States:
Closed if band frequency is below LUF (if known) or above MUF
Short if band frequency is >= LUF and < foF2 (good for NVIS)
Long if band frequency is >= foF2 and < MUF (good for DX)
"""
# We have a list of timestamped data for each value, but for this we only want the latest value
fof2 = GIROIonosonde._latest(fof2_dict)
muf = GIROIonosonde._latest(muf_dict)
luf = GIROIonosonde._latest(luf_dict)
if fof2 is None or muf is None:
return {}
band_states = {}
# Iterate over all ham HF bands, we don't care about the others at this point
for band in HF_BANDS:
freq = band.start_freq / 1000000
if freq > muf or (luf is not None and freq < luf):
band_states[band.name] = "Closed"
elif freq < fof2:
band_states[band.name] = "Short"
else:
band_states[band.name] = "Long"
return band_states
@staticmethod
def _parse_all(text):
"""Parse web server response and return (fof2_dict, muf_dict, luf_dict) keyed by UNIX timestamp."""
fof2_data = {}
muf_data = {}
luf_data = {}
for line in text.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
# Data rows have the following format: timestamp CS foF2 QD MUFD QD fmin QD
parts = line.split()
if len(parts) >= 5:
try:
# Python 3.8 TZ parsing fudge
ts = datetime.fromisoformat(parts[0].replace('Z', '+00:00')).timestamp()
except ValueError:
continue
try:
fof2_data[ts] = float(parts[2])
except ValueError:
pass
try:
muf_data[ts] = float(parts[4])
except ValueError:
pass
if len(parts) >= 7:
try:
luf_data[ts] = float(parts[6])
except ValueError:
pass
return fof2_data, muf_data, luf_data