mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-04-30 10:45:57 +00:00
178 lines
7.5 KiB
Python
178 lines
7.5 KiB
Python
import logging
|
|
import re
|
|
from datetime import datetime, timezone
|
|
|
|
from solarconditionsproviders.http_solar_conditions_provider import HTTPSolarConditionsProvider
|
|
|
|
POLL_INTERVAL = 10800 # Every 3 hours
|
|
URL = "https://services.swpc.noaa.gov/text/3-day-forecast.txt"
|
|
|
|
|
|
class NOAA3dayForecast(HTTPSolarConditionsProvider):
|
|
"""Solar conditions provider using the NOAA 3-day forecast text file. Parses the NOAA forecast and populates
|
|
corresponding fields in the solar conditions object.."""
|
|
|
|
def __init__(self, provider_config):
|
|
super().__init__(provider_config, URL, POLL_INTERVAL)
|
|
|
|
@staticmethod
|
|
def _parse_percentage_table(lines, section_header, year):
|
|
"""Find and parse a forecast table using percentages, identified by section_header. This is common to the lookup
|
|
of the solar storm and radio blackout forecast parsing."""
|
|
start_idx = None
|
|
for i, line in enumerate(lines):
|
|
if section_header in line:
|
|
start_idx = i
|
|
break
|
|
if start_idx is None:
|
|
logging.warning(f"NOAA 3-day forecast: could not find '{section_header}' section")
|
|
return None
|
|
|
|
# Find the date header line — the first line within the next few that contains month+day patterns
|
|
date_header_idx = None
|
|
for j in range(start_idx + 1, min(start_idx + 6, len(lines))):
|
|
if re.search(r'[A-Za-z]{3}\s+\d{2}', lines[j]):
|
|
date_header_idx = j
|
|
break
|
|
if date_header_idx is None:
|
|
logging.warning(f"NOAA 3-day forecast: could not find date header after '{section_header}'")
|
|
return None
|
|
|
|
date_matches = re.findall(r'([A-Za-z]{3})\s+(\d{2})', lines[date_header_idx])
|
|
if not date_matches:
|
|
logging.warning(f"NOAA 3-day forecast: no dates in header: {lines[date_header_idx]}")
|
|
return None
|
|
|
|
column_timestamps = []
|
|
for month_str, day_str in date_matches:
|
|
try:
|
|
dt = datetime.strptime(f"{day_str} {month_str} {year}", "%d %b %Y").replace(tzinfo=timezone.utc)
|
|
column_timestamps.append(dt.timestamp())
|
|
except ValueError:
|
|
logging.warning(f"NOAA 3-day forecast: could not parse date: {month_str} {day_str} {year}")
|
|
return None
|
|
|
|
# Parse data rows: each non-empty line should have a text label and percentage values
|
|
result = {}
|
|
for line in lines[date_header_idx + 1:]:
|
|
line_stripped = line.strip()
|
|
if not line_stripped:
|
|
if result:
|
|
break
|
|
continue
|
|
pct_matches = list(re.finditer(r'\b(\d+)%', line_stripped))
|
|
if not pct_matches:
|
|
if result:
|
|
break
|
|
continue
|
|
# Row label is everything before the first percentage value
|
|
row_label = line_stripped[:line_stripped.index(pct_matches[0].group())].strip()
|
|
row_data = {}
|
|
for j, match in enumerate(pct_matches):
|
|
if j >= len(column_timestamps):
|
|
break
|
|
row_data[column_timestamps[j]] = int(match.group(1))
|
|
if row_data:
|
|
result[row_label] = row_data
|
|
|
|
return result if result else None
|
|
|
|
def _http_response_to_solar_conditions(self, http_response):
|
|
if http_response.status_code != 200:
|
|
logging.warning("NOAA K-index forecast API returned HTTP " + str(http_response.status_code))
|
|
return None
|
|
|
|
lines = http_response.text.splitlines()
|
|
|
|
# Find the "NOAA Kp index breakdown" section header
|
|
start_idx = None
|
|
for i, line in enumerate(lines):
|
|
if "NOAA Kp index breakdown" in line:
|
|
start_idx = i
|
|
break
|
|
|
|
if start_idx is None:
|
|
logging.warning("NOAA K-index forecast: could not find 'NOAA Kp index breakdown' section")
|
|
return None
|
|
|
|
# Extract the year from the header line, e.g. "NOAA Kp index breakdown Apr 2-Apr 4, 2026"
|
|
header_line = lines[start_idx]
|
|
year_match = re.search(r'\b(\d{4})\b', header_line)
|
|
if not year_match:
|
|
logging.warning("NOAA K-index forecast: could not extract year from: " + header_line)
|
|
return None
|
|
year = int(year_match.group(1))
|
|
|
|
# Parse the column date headers on the next line, e.g. " Apr 02 Apr 03 Apr 04"
|
|
if start_idx + 1 >= len(lines):
|
|
logging.warning("NOAA K-index forecast: missing date header line")
|
|
return None
|
|
|
|
date_header_line = lines[start_idx + 2]
|
|
date_matches = re.findall(r'([A-Za-z]{3})\s+(\d{2})', date_header_line)
|
|
if not date_matches:
|
|
logging.warning("NOAA K-index forecast: could not parse date headers from: " + date_header_line)
|
|
return None
|
|
|
|
column_dates = []
|
|
for month_str, day_str in date_matches:
|
|
try:
|
|
column_dates.append(datetime.strptime(f"{day_str} {month_str} {year}", "%d %b %Y").date())
|
|
except ValueError:
|
|
logging.warning(f"NOAA K-index forecast: could not parse date: {month_str} {day_str} {year}")
|
|
return None
|
|
|
|
# Parse each data row, e.g. "00-03UT 2.00 3.00 2.00"
|
|
k_index_forecast = {}
|
|
for line in lines[start_idx + 3:]:
|
|
time_match = re.match(r'^(\d{2})-(\d{2})UT\s+(.*)', line.strip())
|
|
if not time_match:
|
|
if k_index_forecast:
|
|
break
|
|
continue
|
|
|
|
start_hour = int(time_match.group(1))
|
|
# Split on 2 or more spaces so that e.g. "5.67 (G2)" stays as one token per column
|
|
raw_values = re.split(r' {2,}', time_match.group(3).strip())
|
|
|
|
for i, val in enumerate(raw_values):
|
|
if i >= len(column_dates):
|
|
break
|
|
# Take only the leading numeric part, discarding any bracketed section
|
|
try:
|
|
kp = float(val.split()[0])
|
|
except (ValueError, IndexError):
|
|
continue
|
|
|
|
date = column_dates[i]
|
|
start_dt = datetime(date.year, date.month, date.day, start_hour, 0, 0, tzinfo=timezone.utc)
|
|
|
|
# Key the data dict by start time
|
|
key = start_dt.timestamp()
|
|
k_index_forecast[key] = kp
|
|
|
|
if not k_index_forecast:
|
|
logging.warning("NOAA K-index forecast: no data rows parsed")
|
|
return None
|
|
|
|
# Parse Solar Radiation Storm Forecast (single row: "S1 or greater")
|
|
solar_storm_forecast = None
|
|
radiation_table = self._parse_percentage_table(lines, "Solar Radiation Storm Forecast", year)
|
|
if radiation_table:
|
|
solar_storm_forecast = radiation_table.get("S1 or greater")
|
|
|
|
# Parse Radio Blackout Forecast (two rows: "R1-R2" and "R3 or greater")
|
|
blackout_forecast_r1r2 = None
|
|
blackout_forecast_r3_or_greater = None
|
|
blackout_table = self._parse_percentage_table(lines, "Radio Blackout Forecast", year)
|
|
if blackout_table:
|
|
blackout_forecast_r1r2 = blackout_table.get("R1-R2")
|
|
blackout_forecast_r3_or_greater = blackout_table.get("R3 or greater")
|
|
|
|
return {
|
|
"k_index_forecast": k_index_forecast,
|
|
"solar_storm_forecast": solar_storm_forecast,
|
|
"blackout_forecast_r1r2": blackout_forecast_r1r2,
|
|
"blackout_forecast_r3_or_greater": blackout_forecast_r3_or_greater,
|
|
}
|