import logging import re from datetime import datetime, timezone from solarconditionsproviders.http_solar_conditions_provider import HTTPSolarConditionsProvider POLL_INTERVAL = 10800 # Every 3 hours URL = "https://services.swpc.noaa.gov/text/3-day-forecast.txt" class NOAA3dayForecast(HTTPSolarConditionsProvider): """Solar conditions provider using the NOAA 3-day forecast text file. Parses the NOAA forecast and populates corresponding fields in the solar conditions object..""" def __init__(self, provider_config): super().__init__(provider_config, URL, POLL_INTERVAL) @staticmethod def _parse_percentage_table(lines, section_header, year): """Find and parse a forecast table using percentages, identified by section_header. This is common to the lookup of the solar storm and radio blackout forecast parsing.""" start_idx = None for i, line in enumerate(lines): if section_header in line: start_idx = i break if start_idx is None: logging.warning(f"NOAA 3-day forecast: could not find '{section_header}' section") return None # Find the date header line — the first line within the next few that contains month+day patterns date_header_idx = None for j in range(start_idx + 1, min(start_idx + 6, len(lines))): if re.search(r'[A-Za-z]{3}\s+\d{2}', lines[j]): date_header_idx = j break if date_header_idx is None: logging.warning(f"NOAA 3-day forecast: could not find date header after '{section_header}'") return None date_matches = re.findall(r'([A-Za-z]{3})\s+(\d{2})', lines[date_header_idx]) if not date_matches: logging.warning(f"NOAA 3-day forecast: no dates in header: {lines[date_header_idx]}") return None column_timestamps = [] for month_str, day_str in date_matches: try: dt = datetime.strptime(f"{day_str} {month_str} {year}", "%d %b %Y").replace(tzinfo=timezone.utc) column_timestamps.append(dt.timestamp()) except ValueError: logging.warning(f"NOAA 3-day forecast: could not parse date: {month_str} {day_str} {year}") return None # Parse data rows: each non-empty line should have a text label and percentage values result = {} for line in lines[date_header_idx + 1:]: line_stripped = line.strip() if not line_stripped: if result: break continue pct_matches = list(re.finditer(r'\b(\d+)%', line_stripped)) if not pct_matches: if result: break continue # Row label is everything before the first percentage value row_label = line_stripped[:line_stripped.index(pct_matches[0].group())].strip() row_data = {} for j, match in enumerate(pct_matches): if j >= len(column_timestamps): break row_data[column_timestamps[j]] = int(match.group(1)) if row_data: result[row_label] = row_data return result if result else None def _http_response_to_solar_conditions(self, http_response): if http_response.status_code != 200: logging.warning("NOAA K-index forecast API returned HTTP " + str(http_response.status_code)) return None lines = http_response.text.splitlines() # Find the "NOAA Kp index breakdown" section header start_idx = None for i, line in enumerate(lines): if "NOAA Kp index breakdown" in line: start_idx = i break if start_idx is None: logging.warning("NOAA K-index forecast: could not find 'NOAA Kp index breakdown' section") return None # Extract the year from the header line, e.g. "NOAA Kp index breakdown Apr 2-Apr 4, 2026" header_line = lines[start_idx] year_match = re.search(r'\b(\d{4})\b', header_line) if not year_match: logging.warning("NOAA K-index forecast: could not extract year from: " + header_line) return None year = int(year_match.group(1)) # Parse the column date headers on the next line, e.g. " Apr 02 Apr 03 Apr 04" if start_idx + 1 >= len(lines): logging.warning("NOAA K-index forecast: missing date header line") return None date_header_line = lines[start_idx + 2] date_matches = re.findall(r'([A-Za-z]{3})\s+(\d{2})', date_header_line) if not date_matches: logging.warning("NOAA K-index forecast: could not parse date headers from: " + date_header_line) return None column_dates = [] for month_str, day_str in date_matches: try: column_dates.append(datetime.strptime(f"{day_str} {month_str} {year}", "%d %b %Y").date()) except ValueError: logging.warning(f"NOAA K-index forecast: could not parse date: {month_str} {day_str} {year}") return None # Parse each data row, e.g. "00-03UT 2.00 3.00 2.00" k_index_forecast = {} for line in lines[start_idx + 3:]: time_match = re.match(r'^(\d{2})-(\d{2})UT\s+(.*)', line.strip()) if not time_match: if k_index_forecast: break continue start_hour = int(time_match.group(1)) # Split on 2 or more spaces so that e.g. "5.67 (G2)" stays as one token per column raw_values = re.split(r' {2,}', time_match.group(3).strip()) for i, val in enumerate(raw_values): if i >= len(column_dates): break # Take only the leading numeric part, discarding any bracketed section try: kp = float(val.split()[0]) except (ValueError, IndexError): continue date = column_dates[i] start_dt = datetime(date.year, date.month, date.day, start_hour, 0, 0, tzinfo=timezone.utc) # Key the data dict by start time key = start_dt.timestamp() k_index_forecast[key] = kp if not k_index_forecast: logging.warning("NOAA K-index forecast: no data rows parsed") return None # Parse Solar Radiation Storm Forecast (single row: "S1 or greater") solar_storm_forecast = None radiation_table = self._parse_percentage_table(lines, "Solar Radiation Storm Forecast", year) if radiation_table: solar_storm_forecast = radiation_table.get("S1 or greater") # Parse Radio Blackout Forecast (two rows: "R1-R2" and "R3 or greater") blackout_forecast_r1r2 = None blackout_forecast_r3_or_greater = None blackout_table = self._parse_percentage_table(lines, "Radio Blackout Forecast", year) if blackout_table: blackout_forecast_r1r2 = blackout_table.get("R1-R2") blackout_forecast_r3_or_greater = blackout_table.get("R3 or greater") return { "k_index_forecast": k_index_forecast, "solar_storm_forecast": solar_storm_forecast, "blackout_forecast_r1r2": blackout_forecast_r1r2, "blackout_forecast_r3_or_greater": blackout_forecast_r3_or_greater, }