Files
spothole/solarconditionsproviders/giroionosonde.py

166 lines
6.9 KiB
Python

import csv
import logging
from datetime import datetime, timezone, timedelta
from threading import Thread, Event
import pytz
import requests
from core.constants import HTTP_HEADERS
from solarconditionsproviders.ionosonde_utils import compute_band_states
from solarconditionsproviders.solar_conditions_provider import SolarConditionsProvider
POLL_INTERVAL = 3600 # 1 hour
STATIONS_INDEX = "datafiles/didbase-stations.csv"
LGDC_URL = "https://lgdc.uml.edu/common/DIDBGetValues"
HISTORY_HOURS = 24
class GIROIonosonde(SolarConditionsProvider):
"""Solar conditions provider using ionosonde data from the GIRO Data Center.
Queries foF2, MUF, and LUF measurements for all stations in datafiles/didbase-stations.csv.
Can run alongside KC2GProp: GIRO supplements KC2G's foF2/MUF data with LUF readings, and
stations from each source that the other does not cover are preserved."""
def __init__(self, provider_config):
super().__init__(provider_config)
self._stations = self._load_stations()
self._thread = None
self._stop_event = Event()
def _load_stations(self):
stations = []
with open(STATIONS_INDEX, newline='') as f:
for row in csv.reader(f):
if len(row) >= 2:
stations.append({"ursi": row[0].strip(), "name": row[1].strip()})
return stations
def setup(self, solar_conditions, solar_conditions_cache):
"""Pre-populate ionosonde_data with known station names for stations not already present,
so the station dropdown is available before the first poll. Does not overwrite existing
entries so KC2G cache data is preserved."""
super().setup(solar_conditions, solar_conditions_cache)
existing = solar_conditions.ionosonde_data or {}
new_entries = {
s["ursi"]: {"ursi": s["ursi"], "name": s["name"], "fof2": None, "muf": None,
"luf": None, "band_states": None}
for s in self._stations if s["ursi"] not in existing
}
if new_entries:
self.update_data({"ionosonde_data": {**existing, **new_entries}})
def start(self):
logging.info(f"Set up query of GIRO ionosonde data API every {POLL_INTERVAL} seconds.")
self._thread = Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
def _run(self):
while True:
self._poll()
if self._stop_event.wait(timeout=POLL_INTERVAL):
break
def _poll(self):
try:
logging.debug("Polling GIRO ionosonde data...")
now = datetime.now(timezone.utc)
from_time = now - timedelta(hours=HISTORY_HOURS)
cutoff_ts = from_time.timestamp()
# Start from the existing ionosonde_data so stations provided by other providers
# (e.g. KC2GProp) are preserved for stations GIRO does not cover.
ionosonde_data = dict(self._solar_conditions.ionosonde_data or {})
updated_count = 0
for station in self._stations:
if self._stop_event.is_set():
break
ursi = station["ursi"]
name = station["name"]
try:
fof2, muf, luf = self._fetch_station_data(ursi, from_time, now)
if not fof2 or not muf:
continue
# Merge GIRO's readings into any existing data for this station.
existing = ionosonde_data.get(ursi, {})
merged_fof2 = {**{float(t): v for t, v in (existing.get("fof2") or {}).items()}, **fof2}
merged_muf = {**{float(t): v for t, v in (existing.get("muf") or {}).items()}, **muf}
merged_luf = dict(luf) if luf else {}
merged_fof2 = {t: v for t, v in merged_fof2.items() if t >= cutoff_ts}
merged_muf = {t: v for t, v in merged_muf.items() if t >= cutoff_ts}
merged_luf = {t: v for t, v in merged_luf.items() if t >= cutoff_ts}
band_states = compute_band_states(merged_fof2, merged_muf, merged_luf)
ionosonde_data[ursi] = {
"ursi": ursi, "name": name,
"fof2": merged_fof2 or None,
"muf": merged_muf or None,
"luf": merged_luf or None,
"band_states": band_states,
}
updated_count += 1
except Exception:
logging.warning(f"Could not fetch ionosonde data for {ursi} ({name})")
self.update_data({"ionosonde_data": ionosonde_data})
self.status = "OK"
self.last_update_time = datetime.now(pytz.UTC)
logging.debug(f"Updated ionosonde data for {updated_count} stations.")
except Exception:
self.status = "Error"
logging.exception("Exception in GIRO Ionosonde data provider")
self._stop_event.wait(timeout=1)
def _fetch_station_data(self, ursi, from_time, to_time):
"""Fetch foF2, MUF and LUF readings for a station. Returns (fof2_dict, muf_dict, luf_dict) keyed by UNIX timestamp."""
from_str = from_time.strftime("%Y.%m.%d+%H:%M:%S")
to_str = to_time.strftime("%Y.%m.%d+%H:%M:%S")
url = f"{LGDC_URL}?ursiCode={ursi}&charName=foF2,MUFD,fmin&DMUF=3000&fromDate={from_str}&toDate={to_str}"
response = requests.get(url, headers=HTTP_HEADERS, timeout=(5, 15))
if response.status_code != 200:
return None, None, None
return self._parse_all(response.text)
@staticmethod
def _parse_all(text):
"""Parse web server response and return (fof2_dict, muf_dict, luf_dict) keyed by UNIX timestamp."""
fof2_data = {}
muf_data = {}
luf_data = {}
for line in text.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
# Data rows have the following format: timestamp CS foF2 QD MUFD QD fmin QD
parts = line.split()
if len(parts) >= 5:
try:
# Python 3.8 TZ parsing fudge
ts = datetime.fromisoformat(parts[0].replace('Z', '+00:00')).timestamp()
except ValueError:
continue
try:
fof2_data[ts] = float(parts[2])
except ValueError:
pass
try:
muf_data[ts] = float(parts[4])
except ValueError:
pass
if len(parts) >= 7:
try:
luf_data[ts] = float(parts[6])
except ValueError:
pass
return fof2_data, muf_data, luf_data