Files
spothole/solarconditionsproviders/noaa3dayforecast.py
2026-04-03 18:11:45 +01:00

178 lines
7.5 KiB
Python

import logging
import re
from datetime import datetime, timezone
from solarconditionsproviders.http_solar_conditions_provider import HTTPSolarConditionsProvider
POLL_INTERVAL = 10800 # Every 3 hours
URL = "https://services.swpc.noaa.gov/text/3-day-forecast.txt"
class NOAA3dayForecast(HTTPSolarConditionsProvider):
"""Solar conditions provider using the NOAA 3-day forecast text file. Parses the NOAA forecast and populates
corresponding fields in the solar conditions object.."""
def __init__(self, provider_config):
super().__init__(provider_config, URL, POLL_INTERVAL)
@staticmethod
def _parse_percentage_table(lines, section_header, year):
"""Find and parse a forecast table using percentages, identified by section_header. This is common to the lookup
of the solar storm and radio blackout forecast parsing."""
start_idx = None
for i, line in enumerate(lines):
if section_header in line:
start_idx = i
break
if start_idx is None:
logging.warning(f"NOAA 3-day forecast: could not find '{section_header}' section")
return None
# Find the date header line — the first line within the next few that contains month+day patterns
date_header_idx = None
for j in range(start_idx + 1, min(start_idx + 6, len(lines))):
if re.search(r'[A-Za-z]{3}\s+\d{2}', lines[j]):
date_header_idx = j
break
if date_header_idx is None:
logging.warning(f"NOAA 3-day forecast: could not find date header after '{section_header}'")
return None
date_matches = re.findall(r'([A-Za-z]{3})\s+(\d{2})', lines[date_header_idx])
if not date_matches:
logging.warning(f"NOAA 3-day forecast: no dates in header: {lines[date_header_idx]}")
return None
column_timestamps = []
for month_str, day_str in date_matches:
try:
dt = datetime.strptime(f"{day_str} {month_str} {year}", "%d %b %Y").replace(tzinfo=timezone.utc)
column_timestamps.append(dt.timestamp())
except ValueError:
logging.warning(f"NOAA 3-day forecast: could not parse date: {month_str} {day_str} {year}")
return None
# Parse data rows: each non-empty line should have a text label and percentage values
result = {}
for line in lines[date_header_idx + 1:]:
line_stripped = line.strip()
if not line_stripped:
if result:
break
continue
pct_matches = list(re.finditer(r'\b(\d+)%', line_stripped))
if not pct_matches:
if result:
break
continue
# Row label is everything before the first percentage value
row_label = line_stripped[:line_stripped.index(pct_matches[0].group())].strip()
row_data = {}
for j, match in enumerate(pct_matches):
if j >= len(column_timestamps):
break
row_data[column_timestamps[j]] = int(match.group(1))
if row_data:
result[row_label] = row_data
return result if result else None
def _http_response_to_solar_conditions(self, http_response):
if http_response.status_code != 200:
logging.warning("NOAA K-index forecast API returned HTTP " + str(http_response.status_code))
return None
lines = http_response.text.splitlines()
# Find the "NOAA Kp index breakdown" section header
start_idx = None
for i, line in enumerate(lines):
if "NOAA Kp index breakdown" in line:
start_idx = i
break
if start_idx is None:
logging.warning("NOAA K-index forecast: could not find 'NOAA Kp index breakdown' section")
return None
# Extract the year from the header line, e.g. "NOAA Kp index breakdown Apr 2-Apr 4, 2026"
header_line = lines[start_idx]
year_match = re.search(r'\b(\d{4})\b', header_line)
if not year_match:
logging.warning("NOAA K-index forecast: could not extract year from: " + header_line)
return None
year = int(year_match.group(1))
# Parse the column date headers on the next line, e.g. " Apr 02 Apr 03 Apr 04"
if start_idx + 1 >= len(lines):
logging.warning("NOAA K-index forecast: missing date header line")
return None
date_header_line = lines[start_idx + 2]
date_matches = re.findall(r'([A-Za-z]{3})\s+(\d{2})', date_header_line)
if not date_matches:
logging.warning("NOAA K-index forecast: could not parse date headers from: " + date_header_line)
return None
column_dates = []
for month_str, day_str in date_matches:
try:
column_dates.append(datetime.strptime(f"{day_str} {month_str} {year}", "%d %b %Y").date())
except ValueError:
logging.warning(f"NOAA K-index forecast: could not parse date: {month_str} {day_str} {year}")
return None
# Parse each data row, e.g. "00-03UT 2.00 3.00 2.00"
k_index_forecast = {}
for line in lines[start_idx + 3:]:
time_match = re.match(r'^(\d{2})-(\d{2})UT\s+(.*)', line.strip())
if not time_match:
if k_index_forecast:
break
continue
start_hour = int(time_match.group(1))
# Split on 2 or more spaces so that e.g. "5.67 (G2)" stays as one token per column
raw_values = re.split(r' {2,}', time_match.group(3).strip())
for i, val in enumerate(raw_values):
if i >= len(column_dates):
break
# Take only the leading numeric part, discarding any bracketed section
try:
kp = float(val.split()[0])
except (ValueError, IndexError):
continue
date = column_dates[i]
start_dt = datetime(date.year, date.month, date.day, start_hour, 0, 0, tzinfo=timezone.utc)
# Key the data dict by start time
key = start_dt.timestamp()
k_index_forecast[key] = kp
if not k_index_forecast:
logging.warning("NOAA K-index forecast: no data rows parsed")
return None
# Parse Solar Radiation Storm Forecast (single row: "S1 or greater")
solar_storm_forecast = None
radiation_table = self._parse_percentage_table(lines, "Solar Radiation Storm Forecast", year)
if radiation_table:
solar_storm_forecast = radiation_table.get("S1 or greater")
# Parse Radio Blackout Forecast (two rows: "R1-R2" and "R3 or greater")
blackout_forecast_r1r2 = None
blackout_forecast_r3_or_greater = None
blackout_table = self._parse_percentage_table(lines, "Radio Blackout Forecast", year)
if blackout_table:
blackout_forecast_r1r2 = blackout_table.get("R1-R2")
blackout_forecast_r3_or_greater = blackout_table.get("R3 or greater")
return {
"k_index_forecast": k_index_forecast,
"solar_storm_forecast": solar_storm_forecast,
"blackout_forecast_r1r2": blackout_forecast_r1r2,
"blackout_forecast_r3_or_greater": blackout_forecast_r3_or_greater,
}