import csv import logging from datetime import datetime, timezone, timedelta from threading import Thread, Event import pytz import requests from core.constants import HTTP_HEADERS from solarconditionsproviders.solar_conditions_provider import SolarConditionsProvider POLL_INTERVAL = 3600 # 1 hour STATIONS_INDEX = "datafiles/didbase-stations.csv" LGDC_URL = "https://lgdc.uml.edu/common/DIDBGetValues" HISTORY_HOURS = 24 class GIROIonosonde(SolarConditionsProvider): """Solar conditions provider using ionosonde data from the GIRO Data Center. Queries foF2 and MUF measurements for all stations in datafiles/didbase-stations.csv.""" def __init__(self, provider_config): super().__init__(provider_config) self._stations = self._load_stations() self._thread = None self._stop_event = Event() def _load_stations(self): """Load the CSV file containing the list of URSIs and Station Names for currently active ionosondes.""" stations = [] with open(STATIONS_INDEX, newline='') as f: for row in csv.reader(f): if len(row) >= 2: stations.append({"ursi": row[0].strip(), "name": row[1].strip()}) return stations def setup(self, solar_conditions, solar_conditions_cache): """Prepopulate the ionosonde_data map with known URSI and station names, so that the API exposes this structure even before we actually have any data in it.""" super().setup(solar_conditions, solar_conditions_cache) self.update_data({"ionosonde_data": { s["ursi"]: {"ursi": s["ursi"], "name": s["name"], "fof2": None, "muf": None} for s in self._stations }}) def start(self): logging.info(f"Set up query of GIRO ionosonde data API every {POLL_INTERVAL} seconds.") self._thread = Thread(target=self._run, daemon=True) self._thread.start() def stop(self): self._stop_event.set() def _run(self): while True: self._poll() if self._stop_event.wait(timeout=POLL_INTERVAL): break def _poll(self): try: logging.debug(f"Polling GIRO ionosonde data...") now = datetime.now(timezone.utc) from_time = now - timedelta(hours=HISTORY_HOURS) ionosonde_data = dict(self._solar_conditions.ionosonde_data or {}) updated_count = 0 for station in self._stations: if self._stop_event.is_set(): break ursi = station["ursi"] name = station["name"] try: fof2, muf = self._fetch_station_data(ursi, from_time, now) if fof2 and muf: ionosonde_data[ursi] = {"ursi": ursi, "name": name, "fof2": fof2, "muf": muf} updated_count += 1 except Exception: logging.warning(f"Could not fetch ionosonde data for {ursi} ({name})") self.update_data({"ionosonde_data": ionosonde_data}) self.status = "OK" self.last_update_time = datetime.now(pytz.UTC) logging.debug(f"Updated ionosonde data for {updated_count} stations.") except Exception: self.status = "Error" logging.exception(f"Exception in GIRO Ionosonde data provider") self._stop_event.wait(timeout=1) def _fetch_station_data(self, ursi, from_time, to_time): """Fetch foF2 and MUF readings for a station. Returns (fof2_dict, muf_dict) keyed by UNIX timestamp.""" from_str = from_time.strftime("%Y.%m.%d+%H:%M:%S") to_str = to_time.strftime("%Y.%m.%d+%H:%M:%S") url = f"{LGDC_URL}?ursiCode={ursi}&charName=foF2,MUFD&DMUF=3000&fromDate={from_str}&toDate={to_str}" response = requests.get(url, headers=HTTP_HEADERS, timeout=(5, 15)) if response.status_code != 200: return None, None return self._parse_all(response.text) @staticmethod def _parse_all(text): """Parse web server response and return (fof2_dict, muf_dict) keyed by UNIX timestamp.""" fof2_data = {} muf_data = {} for line in text.splitlines(): line = line.strip() if not line or line.startswith('#'): continue # Data rows have the following format: timestamp CS foF2 QD MUFD QD parts = line.split() if len(parts) >= 5: try: # Python 3.8 TZ parsing fudge ts = datetime.fromisoformat(parts[0].replace('Z', '+00:00')).timestamp() except ValueError: continue try: fof2_data[ts] = float(parts[2]) except ValueError: pass try: muf_data[ts] = float(parts[4]) except ValueError: pass return fof2_data, muf_data