import csv import logging from datetime import datetime, timezone, timedelta from threading import Thread, Event import pytz import requests from core.constants import HTTP_HEADERS from solarconditionsproviders.solar_conditions_provider import SolarConditionsProvider POLL_INTERVAL = 3600 # 1 hour STATIONS_INDEX = "datafiles/didbase-stations.csv" LGDC_URL = "https://lgdc.uml.edu/common/DIDBGetValues" HISTORY_HOURS = 24 class GIROIonosonde(SolarConditionsProvider): """Solar conditions provider using ionosonde data from the GIRO Data Center. Queries foF2 and MUF measurements for all stations in datafiles/didbase-stations.csv.""" def __init__(self, provider_config): super().__init__(provider_config) self._stations = self._load_stations() self._thread = None self._stop_event = Event() def _load_stations(self): """Load the CSV file containing the list of URSIs and Station Names for currently active ionosondes.""" stations = [] with open(STATIONS_INDEX, newline='') as f: for row in csv.reader(f): if len(row) >= 2: stations.append({"ursi": row[0].strip(), "name": row[1].strip()}) return stations def start(self): logging.info(f"Set up query of GIRO ionosonde data API every {POLL_INTERVAL} seconds.") self._thread = Thread(target=self._run, daemon=True) self._thread.start() def stop(self): self._stop_event.set() def _run(self): while True: self._poll() if self._stop_event.wait(timeout=POLL_INTERVAL): break def _poll(self): try: logging.debug(f"Polling {self.name} ionosonde data...") now = datetime.now(timezone.utc) from_time = now - timedelta(hours=HISTORY_HOURS) results = [] for station in self._stations: if self._stop_event.is_set(): break ursi = station["ursi"] name = station["name"] entry = {"ursi": ursi, "name": name, "fof2": None, "muf": None} try: fof2, muf = self._fetch_station_data(ursi, from_time, now) entry["fof2"] = fof2 entry["muf"] = muf if fof2 and muf: results.append(entry) except Exception: logging.debug(f"Could not fetch ionosonde data for {ursi} ({name})") self.update_data({"ionosonde_data": results}) self.status = "OK" self.last_update_time = datetime.now(pytz.UTC) logging.debug(f"Received ionosonde data for {len(results)} stations from {self.name}.") except Exception: self.status = "Error" logging.exception(f"Exception in GIRO Ionosonde data provider ({self.name})") self._stop_event.wait(timeout=1) def _fetch_station_data(self, ursi, from_time, to_time): """Fetch foF2 and MUF readings for a station. Returns (fof2_dict, muf_dict) keyed by UNIX timestamp.""" from_str = from_time.strftime("%Y.%m.%d+%H:%M:%S") to_str = to_time.strftime("%Y.%m.%d+%H:%M:%S") url = ( f"{LGDC_URL}?ursiCode={ursi}&charName=foF2,MUFD&DMUF=3000&fromDate={from_str}&toDate={to_str}" ) response = requests.get(url, headers=HTTP_HEADERS, timeout=(5, 15)) if response.status_code != 200: return None, None return self._parse_all(response.text) @staticmethod def _parse_all(text): """Parse web server response and return (fof2_dict, muf_dict) keyed by UNIX timestamp.""" fof2_data = {} muf_data = {} for line in text.splitlines(): line = line.strip() if not line or line.startswith('#'): continue # Data rows: timestamp CS foF2 QD MUFD QD parts = line.split() if len(parts) >= 5: try: ts = datetime.fromisoformat(parts[0].replace('Z', '+00:00')).timestamp() except ValueError: continue try: fof2_data[ts] = float(parts[2]) except ValueError: pass try: muf_data[ts] = float(parts[4]) except ValueError: pass return fof2_data, muf_data