diff --git a/alertproviders/bota.py b/alertproviders/bota.py
index 94f365e..75ce33f 100644
--- a/alertproviders/bota.py
+++ b/alertproviders/bota.py
@@ -21,19 +21,30 @@ class BOTA(HTTPAlertProvider):
new_alerts = []
# Find the table of upcoming alerts
bs = BeautifulSoup(http_response.content.decode(), features="lxml")
+ if not bs.body:
+ return new_alerts
div = bs.body.find('div', attrs={'class': 'view-activations-public'})
if div:
table = div.find('table', attrs={'class': 'views-table'})
if table:
tbody = table.find('tbody')
+ if not tbody:
+ return new_alerts
for row in tbody.find_all('tr'):
cells = row.find_all('td')
- first_cell_text = str(cells[0].find('a').contents[0]).strip()
+ first_cell_anchor = cells[0].find('a') if len(cells) > 0 else None
+ second_cell_anchor = cells[1].find('a') if len(cells) > 1 else None
+ if not first_cell_anchor or not second_cell_anchor:
+ continue
+ first_cell_text = first_cell_anchor.get_text().strip()
ref_name = first_cell_text.split(" by ")[0]
- dx_call = str(cells[1].find('a').contents[0]).strip().upper()
+ dx_call = second_cell_anchor.get_text().strip().upper()
# Get the date, dealing with the fact we get no year so have to figure out if it's last year or next year
- date_text = str(cells[2].find('span').contents[0]).strip()
+ date_span = cells[2].find('span') if len(cells) > 2 else None
+ if not date_span:
+ continue
+ date_text = date_span.get_text().strip()
date_time = datetime.strptime(date_text, "%d %b - %H:%M UTC").replace(tzinfo=pytz.UTC)
date_time = date_time.replace(year=datetime.now(pytz.UTC).year)
# If this was more than a day ago, activation is actually next year
diff --git a/alertproviders/ng3k.py b/alertproviders/ng3k.py
index 0b1205b..ac54b0e 100644
--- a/alertproviders/ng3k.py
+++ b/alertproviders/ng3k.py
@@ -1,8 +1,10 @@
import re
from datetime import datetime
+from typing import cast
import pytz
from rss_parser import Parser
+from rss_parser.models.rss import RSS
from alertproviders.http_alert_provider import HTTPAlertProvider
from data.alert import Alert
@@ -20,7 +22,7 @@ class NG3K(HTTPAlertProvider):
def _http_response_to_alerts(self, http_response):
new_alerts = []
- rss = Parser.parse(http_response.content.decode())
+ rss = cast(RSS, Parser.parse(http_response.content.decode()))
# Iterate through source data
for source_alert in rss.channel.items:
# Deal with "the format"...
diff --git a/alertproviders/pota.py b/alertproviders/pota.py
index e1f6829..2bff164 100644
--- a/alertproviders/pota.py
+++ b/alertproviders/pota.py
@@ -37,6 +37,6 @@ class POTA(HTTPAlertProvider):
# Add to our list, but exclude any old spots that POTA can sometimes give us where even the end time is
# in the past. Don't worry about de-duping, removing old alerts etc. at this point; other code will do
# that for us.
- if alert.end_time > datetime.now(pytz.UTC).timestamp():
+ if alert.end_time and alert.end_time > datetime.now(pytz.UTC).timestamp():
new_alerts.append(alert)
return new_alerts
diff --git a/alertproviders/wota.py b/alertproviders/wota.py
index 557b876..f91e525 100644
--- a/alertproviders/wota.py
+++ b/alertproviders/wota.py
@@ -1,7 +1,9 @@
from datetime import datetime
+from typing import cast
import pytz
from rss_parser import Parser as RSSParser
+from rss_parser.models.rss import RSS
from alertproviders.http_alert_provider import HTTPAlertProvider
from data.alert import Alert
@@ -20,7 +22,7 @@ class WOTA(HTTPAlertProvider):
def _http_response_to_alerts(self, http_response):
new_alerts = []
- rss = RSSParser.parse(http_response.content.decode())
+ rss = cast(RSS, RSSParser.parse(http_response.content.decode()))
# Iterate through source data
for source_alert in rss.channel.items:
@@ -35,9 +37,9 @@ class WOTA(HTTPAlertProvider):
ref_name = None
if len(title_split) > 1:
ref_split = title_split[1].split(" - ")
- ref = ref_split[0]
+ ref = str(ref_split[0])
if len(ref_split) > 1:
- ref_name = ref_split[1]
+ ref_name = str(ref_split[1])
# Pick apart the description
desc_split = source_alert.description.split(". ")
diff --git a/core/lookup_helper.py b/core/lookup_helper.py
index 0e42525..b9d81e0 100644
--- a/core/lookup_helper.py
+++ b/core/lookup_helper.py
@@ -104,7 +104,7 @@ class LookupHelper:
self._hamqth_callsign_data_cache = Cache('cache/hamqth_callsign_lookup_cache')
- self._clublog_api_key = config["clublog-api-key"]
+ self._clublog_api_key = str(config["clublog-api-key"])
self._clublog_cty_xml_cache = CachedSession("cache/clublog_cty_xml_cache", expire_after=timedelta(days=10))
self._clublog_api_available = self._clublog_api_key != ""
self._clublog_xml_download_location = "cache/cty.xml"
@@ -184,6 +184,7 @@ class LookupHelper:
open(self._clublog_xml_download_location + ".gz", 'wb').write(response.content)
with gzip.open(self._clublog_xml_download_location + ".gz", "rb") as uncompressed:
file_content = uncompressed.read()
+ assert isinstance(file_content, bytes)
logging.info("Caching Clublog cty.xml...")
with open(self._clublog_xml_download_location, "wb") as f:
f.write(file_content)
@@ -446,15 +447,16 @@ class LookupHelper:
def infer_grid_from_callsign_dxcc(self, call):
"""Infer a grid locator from a callsign (using DXCC, probably very inaccurate)"""
- latlon = self.infer_latlon_from_callsign_dxcc(call)
+ latlon = self.infer_latlon_from_callsign_dxcc(call) or []
grid = None
- try:
- grid = latlong_to_locator(latlon[0], latlon[1], 8)
- except:
- logging.debug("Invalid lat/lon received for DXCC")
+ if latlon:
+ try:
+ grid = latlong_to_locator(latlon[0], latlon[1], 8)
+ except:
+ logging.debug("Invalid lat/lon received for DXCC")
return grid
- def _get_qrz_data_for_callsign(self, call, credentials):
+ def _get_qrz_data_for_callsign(self, call, credentials) -> dict | None:
"""Utility method to get QRZ.com data from cache if possible, if not get it from the API and cache it.
Returns None immediately if no credentials are provided."""
@@ -475,7 +477,7 @@ class LookupHelper:
login_data = xmltodict.parse(login_response)
session = login_data.get("QRZDatabase", {}).get("Session", {})
if "Key" in session:
- session_key = session["Key"]
+ session_key = str(session["Key"])
else:
logging.warning("QRZ.com login details incorrect, failed to look up with QRZ.")
return None
@@ -512,7 +514,7 @@ class LookupHelper:
self._qrz_callsign_data_cache.add(call, None, expire=604800) # 1 week in seconds
return None
- def _get_hamqth_data_for_callsign(self, call, credentials):
+ def _get_hamqth_data_for_callsign(self, call, credentials) -> dict | None:
"""Utility method to get HamQTH data from cache if possible, if not get it from the API and cache it.
Returns None immediately if no credentials are provided."""
@@ -531,7 +533,7 @@ class LookupHelper:
"&p=" + urllib.parse.quote_plus(credentials.hamqth_password), headers=HTTP_HEADERS).content
dict_data = xmltodict.parse(session_data)
if "session_id" in dict_data["HamQTH"]["session"]:
- session_id = dict_data["HamQTH"]["session"]["session_id"]
+ session_id = str(dict_data["HamQTH"]["session"]["session_id"])
else:
logging.warning("HamQTH login details incorrect, failed to look up with HamQTH.")
return None
@@ -566,7 +568,7 @@ class LookupHelper:
self._hamqth_callsign_data_cache.add(call, None, expire=604800) # 1 week in seconds
return None
- def _get_clublog_api_data_for_callsign(self, call):
+ def _get_clublog_api_data_for_callsign(self, call) -> dict | None:
"""Utility method to get Clublog API data from cache if possible, if not get it from the API and cache it"""
# Fetch from cache if we can, otherwise fetch from the API and cache it
@@ -594,7 +596,7 @@ class LookupHelper:
else:
return None
- def _get_clublog_xml_data_for_callsign(self, call):
+ def _get_clublog_xml_data_for_callsign(self, call) -> dict | None:
"""Utility method to get Clublog XML data from file"""
if self._clublog_xml_available:
@@ -608,7 +610,7 @@ class LookupHelper:
else:
return None
- def _get_dxcc_data_for_callsign(self, call):
+ def _get_dxcc_data_for_callsign(self, call) -> dict | None:
"""Utility method to get generic DXCC data from our lookup table, if we can find it"""
for entry in self._dxcc_data.values():
diff --git a/core/sig_utils.py b/core/sig_utils.py
index ff19956..4f24a96 100644
--- a/core/sig_utils.py
+++ b/core/sig_utils.py
@@ -25,13 +25,13 @@ def populate_sig_ref_info(sig_ref):
if sig_ref.sig is None or sig_ref.id is None:
logging.warning("Failed to look up sig_ref info, sig or id were not set.")
- sig = sig_ref.sig
+ sig = sig_ref.sig or ""
ref_id = sig_ref.id
try:
if sig.upper() == "POTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.pota.app/park/" + ref_id, headers=HTTP_HEADERS).json()
if data:
- fullname = data["name"] if "name" in data else None
+ fullname = str(data["name"]) if "name" in data else None
if fullname and "parktypeDesc" in data and data["parktypeDesc"] != "":
fullname = fullname + " " + data["parktypeDesc"]
sig_ref.name = fullname
@@ -129,9 +129,9 @@ def populate_sig_ref_info(sig_ref):
if data:
for ref in data:
if ref["reference_code"] == ref_id:
- sig_ref.name = ref["name"]
+ sig_ref.name = str(ref["name"])
sig_ref.url = "https://llota.app/list/ref/" + ref_id
- sig_ref.grid = ref["grid_locator"]
+ sig_ref.grid = str(ref["grid_locator"])
ll = locator_to_latlong(sig_ref.grid)
sig_ref.latitude = ll[0]
sig_ref.longitude = ll[1]
@@ -139,7 +139,7 @@ def populate_sig_ref_info(sig_ref):
elif sig.upper() == "WWTOTA":
if not sig_ref.name:
sig_ref.name = sig_ref.id
- sig_ref.url = "https://wwtota.com/seznam/karta_rozhledny.php?ref=" + sig_ref.name
+ sig_ref.url = "https://wwtota.com/seznam/karta_rozhledny.php?ref=" + str(sig_ref.name)
elif sig.upper() == "TILES":
# Tiles on the Air just uses Maidenhead 6-digit squares, so ID, Name and Grid are all the same
if not sig_ref.name:
@@ -147,7 +147,7 @@ def populate_sig_ref_info(sig_ref):
if not sig_ref.grid:
sig_ref.grid = sig_ref.id
if sig_ref.grid and not sig_ref.latitude:
- ll = locator_to_latlong(sig_ref.grid)
+ ll = locator_to_latlong(str(sig_ref.grid))
sig_ref.latitude = ll[0]
sig_ref.longitude = ll[1]
elif sig.upper() == "WAB" or sig.upper() == "WAI":
diff --git a/data/alert.py b/data/alert.py
index 25187a0..2052d09 100644
--- a/data/alert.py
+++ b/data/alert.py
@@ -15,51 +15,51 @@ class Alert:
"""Data class that defines an alert."""
# Unique identifier for the alert
- id: str = None
+ id: str | None = None
# Callsigns of the operators that has been alerted
- dx_calls: list = None
+ dx_calls: list | None = None
# Names of the operators that has been alerted
- dx_names: list = None
+ dx_names: list | None = None
# Country of the DX operator
- dx_country: str = None
+ dx_country: str | None = None
# Country flag of the DX operator
- dx_flag: str = None
+ dx_flag: str | None = None
# Continent of the DX operator
- dx_continent: str = None
+ dx_continent: str | None = None
# DXCC ID of the DX operator
- dx_dxcc_id: int = None
+ dx_dxcc_id: int | None = None
# CQ zone of the DX operator
- dx_cq_zone: int = None
+ dx_cq_zone: int | None = None
# ITU zone of the DX operator
- dx_itu_zone: int = None
+ dx_itu_zone: int | None = None
# Intended frequencies & modes of operation. Essentially just a different kind of comment field.
- freqs_modes: str = None
+ freqs_modes: str | None = None
# Start time of the activation, UTC seconds since UNIX epoch
- start_time: float = None
+ start_time: float | None = None
# Start time of the activation of the alert, ISO 8601
- start_time_iso: str = None
+ start_time_iso: str | None = None
# End time of the activation, UTC seconds since UNIX epoch. Optional
- end_time: float = None
+ end_time: float | None = None
# End time of the activation of the alert, ISO 8601
- end_time_iso: str = None
+ end_time_iso: str | None = None
# Time that this software received the alert, UTC seconds since UNIX epoch. This is used with the "since_received"
# call to our API to receive all data that is new to us, even if by a quirk of the API it might be older than the
# list time the client polled the API.
- received_time: float = None
+ received_time: float | None = None
# Time that this software received the alert, ISO 8601
- received_time_iso: str = None
+ received_time_iso: str | None = None
# Comment made by the alerter, if any
- comment: str = None
+ comment: str | None = None
# Special Interest Group (SIG), e.g. outdoor activity programme such as POTA
- sig: str = None
+ sig: str | None = None
# SIG references. We allow multiple here for e.g. n-fer activations, unlike ADIF SIG_INFO
- sig_refs: list = None
+ sig_refs: list | None = None
# Whether this alert is for a DXpedition, as opposed to e.g. an xOTA programme.
is_dxpedition: bool = False
# Where we got the alert from, e.g. "POTA", "SOTA"...
- source: str = None
+ source: str | None = None
# The ID the source gave it, if any.
- source_id: str = None
+ source_id: str | None = None
def infer_missing(self, credentials=None):
"""Infer missing parameters where possible"""
diff --git a/data/sig_ref.py b/data/sig_ref.py
index 71fb33d..1d2b32c 100644
--- a/data/sig_ref.py
+++ b/data/sig_ref.py
@@ -11,14 +11,14 @@ class SIGRef:
# SIG that this reference is in, e.g. "POTA".
sig: str
# Name of the reference, e.g. "Null Country Park", if known.
- name: str = None
+ name: str | None = None
# URL to look up more information about the reference, if known.
- url: str = None
+ url: str | None = None
# Latitude of the reference, if known.
- latitude: float = None
+ latitude: float | None = None
# Longitude of the reference, if known.
- longitude: float = None
+ longitude: float | None = None
# Maidenhead grid reference of the reference, if known.
- grid: str = None
+ grid: str | None = None
# Activation score. SOTA only
- activation_score: int = None
+ activation_score: int | None = None
diff --git a/data/solar_conditions.py b/data/solar_conditions.py
index fde816a..15a9dae 100644
--- a/data/solar_conditions.py
+++ b/data/solar_conditions.py
@@ -110,11 +110,11 @@ class HFBandCondition:
"""Data class representing HF propagation conditions for certain bands and time of day."""
# Band name, e.g. "80m-40m", "20m-17m", "10m-6m"
- band: str = None
+ band: str | None = None
# Time of day: "day" or "night"
- time: str = None
+ time: str | None = None
# Propagation condition: "Good", "Fair", or "Poor"
- condition: str = None
+ condition: str | None = None
@dataclass
@@ -122,66 +122,66 @@ class SolarConditions:
"""Data class representing current solar and propagation conditions."""
# Time the data was last updated at the source, UTC seconds since UNIX epoch
- updated: float = None
+ updated: float | None = None
# Solar Flux Index (SFI)
- sfi: int = None
+ sfi: int | None = None
# A-index (daily geomagnetic activity)
- a_index: int = None
+ a_index: int | None = None
# K-index (3-hour geomagnetic activity)
- k_index: int = None
+ k_index: int | None = None
# X-ray flux class, e.g. "B2.3", "C1.0"
- xray: str = None
+ xray: str | None = None
# Proton flux
- proton_flux: int = None
+ proton_flux: int | None = None
# Electron flux
- electron_flux: int = None
+ electron_flux: int | None = None
# Aurora activity level
- aurora: int = None
+ aurora: int | None = None
# Latitude in degrees of the aurora boundary
- aurora_latitude: float = None
+ aurora_latitude: float | None = None
# Sunspot count
- sunspots: int = None
+ sunspots: int | None = None
# Solar wind speed in km/s
- solar_wind: float = None
+ solar_wind: float | None = None
# Interplanetary magnetic field strength in nT
- magnetic_field: float = None
+ magnetic_field: float | None = None
# Geomagnetic field condition, e.g. "Quiet", "Unsettled", "Active", "Storm"
- geomag_field: str = None
+ geomag_field: str | None = None
# Geomagnetic background noise level, e.g. "S0", "S1", "S2"
- geomag_noise: str = None
+ geomag_noise: str | None = None
# HF band propagation conditions, keyed by "{band}-{time}" e.g. "80m-40m-day"
- hf_conditions: dict = None
+ hf_conditions: dict | None = None
# VHF propagation conditions, keyed by condition name
- vhf_conditions: dict = None
+ vhf_conditions: dict | None = None
# NOAA Kp index 3-day forecast, keyed by UNIX timestamp of the start of each 3-hour UTC period
- k_index_forecast: dict = None
+ k_index_forecast: dict | None = None
# NOAA Solar Radiation Storm (S1 or greater) probability forecast, keyed by UNIX timestamp of start of day UTC
- solar_storm_forecast: dict = None
+ solar_storm_forecast: dict | None = None
# NOAA Radio Blackout (R1-R2) probability forecast, keyed by UNIX timestamp of start of day UTC
- blackout_forecast_r1r2: dict = None
+ blackout_forecast_r1r2: dict | None = None
# NOAA Radio Blackout (R3 or greater) probability forecast, keyed by UNIX timestamp of start of day UTC
- blackout_forecast_r3_or_greater: dict = None
+ blackout_forecast_r3_or_greater: dict | None = None
# Ionosonde measurements, dict keyed by URSI code, values are dicts with keys: ursi, name, fof2, muf, luf,
# band_states. Populated by GIROIonosonde or KC2GProp providers.
- ionosonde_data: dict = None
+ ionosonde_data: dict | None = None
# Derived values (populated by infer_descriptions())
# HF radio blackout risk description, derived from xray
- xray_desc: str = None
+ xray_desc: str | None = None
# HF radio blackout scale number (R0-R5), derived from xray
- radio_blackout_scale: int = None
+ radio_blackout_scale: int | None = None
# Solar radiation storm level description, derived from proton_flux
- proton_flux_desc: str = None
+ proton_flux_desc: str | None = None
# Solar radiation storm scale number (S0-S5), derived from proton_flux
- solar_storm_scale: int = None
+ solar_storm_scale: int | None = None
# Geomagnetic storm level description, derived from k_index
- geomag_storm_desc: str = None
+ geomag_storm_desc: str | None = None
# Geomagnetic storm scale number (G0-G5), derived from k_index
- geomag_storm_scale: int = None
+ geomag_storm_scale: int | None = None
# Overall HF band conditions summary, derived from sfi
- band_conditions_desc: str = None
+ band_conditions_desc: str | None = None
# Electron flux description, derived from electron_flux
- electron_flux_desc: str = None
+ electron_flux_desc: str | None = None
def infer_descriptions(self):
"""Populate derived text description fields from the current numeric/raw field values."""
diff --git a/data/spot.py b/data/spot.py
index ff3c3a2..5e80bcb 100644
--- a/data/spot.py
+++ b/data/spot.py
@@ -23,38 +23,38 @@ class Spot:
"""Data class that defines a spot."""
# Unique identifier for the spot
- id: str = None
+ id: str | None = None
# DX (spotted) operator info
# Callsign of the operator that has been spotted
- dx_call: str = None
+ dx_call: str | None = None
# Name of the operator that has been spotted
- dx_name: str = None
+ dx_name: str | None = None
# QTH of the operator that has been spotted. This could be from any SIG refs or could be from online lookup of their
# home QTH.
- dx_qth: str = None
+ dx_qth: str | None = None
# Country of the DX operator
- dx_country: str = None
+ dx_country: str | None = None
# Country flag of the DX operator
- dx_flag: str = None
+ dx_flag: str | None = None
# Continent of the DX operator
- dx_continent: str = None
+ dx_continent: str | None = None
# DXCC ID of the DX operator
- dx_dxcc_id: int = None
+ dx_dxcc_id: int | None = None
# CQ zone of the DX operator
- dx_cq_zone: int = None
+ dx_cq_zone: int | None = None
# ITU zone of the DX operator
- dx_itu_zone: int = None
+ dx_itu_zone: int | None = None
# If this is an APRS/Packet/etc spot, what SSID was the DX operator using?
- dx_ssid: str = None
+ dx_ssid: str | None = None
# Maidenhead grid locator for the DX. This could be from a geographical reference e.g. POTA, or just from the
# country
- dx_grid: str = None
+ dx_grid: str | None = None
# Latitude & longitude of the DX, in degrees. This could be from a geographical reference e.g. POTA, or from a QRZ
# lookup
- dx_latitude: float = None
- dx_longitude: float = None
+ dx_latitude: float | None = None
+ dx_longitude: float | None = None
# DX Location source. Indicates how accurate the location might be. Values: "SPOT", "WAB/WAI GRID", "HOME QTH",
# "DXCC", "NONE"
dx_location_source: str = "NONE"
@@ -66,70 +66,70 @@ class Spot:
# DE (Spotter) info
# Callsign of the spotter
- de_call: str = None
+ de_call: str | None = None
# Country of the spotter
- de_country: str = None
+ de_country: str | None = None
# Country flag of the spotter
- de_flag: str = None
+ de_flag: str | None = None
# Continent of the spotter
- de_continent: str = None
+ de_continent: str | None = None
# DXCC ID of the spotter
- de_dxcc_id: int = None
+ de_dxcc_id: int | None = None
# If this is an APRS/Packet/etc spot, what SSID was the spotter/receiver using?
- de_ssid: str = None
+ de_ssid: str | None = None
# Maidenhead grid locator for the spotter. This is not going to be from a xOTA reference so it will likely just be
# a QRZ or DXCC lookup. If the spotter is also portable, this is probably wrong, but it's good enough for some
# simple mapping.
- de_grid: str = None
+ de_grid: str | None = None
# Latitude & longitude of the DX, in degrees. This is not going to be from a xOTA reference so it will likely just
# be a QRZ or DXCC lookup. If the spotter is also portable, this is probably wrong, but it's good enough for some
# simple mapping.
- de_latitude: float = None
- de_longitude: float = None
+ de_latitude: float | None = None
+ de_longitude: float | None = None
# General QSO info
# Reported mode, such as SSB, PHONE, CW, FT8...
- mode: str = None
+ mode: str | None = None
# Inferred mode "family". One of "CW", "PHONE" or "DIGI".
- mode_type: str = None
+ mode_type: str | None = None
# Source of the mode information. "SPOT", "COMMENT", "BANDPLAN" or "NONE"
mode_source: str = "NONE"
# Frequency, in Hz
- freq: float = None
+ freq: float | None = None
# Band, defined by the frequency, e.g. "40m" or "70cm"
- band: str = None
+ band: str | None = None
# Comment left by the spotter, if any
- comment: str = None
+ comment: str | None = None
# QRT state. Some APIs return spots marked as QRT. Otherwise we can check the comments.
qrt: bool = False
# Special Interest Group info
# Special Interest Group (SIG), e.g. outdoor activity programme such as POTA
- sig: str = None
+ sig: str | None = None
# SIG references. We allow multiple here for e.g. n-fer activations, unlike ADIF SIG_INFO
- sig_refs: list = None
+ sig_refs: list | None = None
# Timing info
# Time of the spot, UTC seconds since UNIX epoch
- time: float = None
+ time: float | None = None
# Time of the spot, ISO 8601
- time_iso: str = None
+ time_iso: str | None = None
# Time that this software received the spot, UTC seconds since UNIX epoch. This is used with the "since_received"
# call to our API to receive all data that is new to us, even if by a quirk of the API it might be older than the
# list time the client polled the API.
- received_time: float = None
+ received_time: float | None = None
# Time that this software received the spot, ISO 8601
- received_time_iso: str = None
+ received_time_iso: str | None = None
# Source info
# Where we got the spot from, e.g. "POTA", "Cluster"...
- source: str = None
+ source: str | None = None
# The ID the source gave it, if any.
- source_id: str = None
+ source_id: str | None = None
def infer_missing(self, credentials=None):
"""Infer missing parameters where possible"""
@@ -335,9 +335,10 @@ class Spot:
# Determine a "QTH" string. If we have a SIG ref, pick the first one and turn it into a suitable string,
# otherwise see what they have set on an online lookup service.
if self.sig_refs and len(self.sig_refs) > 0:
- self.dx_qth = self.sig_refs[0].id
+ qth = self.sig_refs[0].id
if self.sig_refs[0].name:
- self.dx_qth = self.dx_qth + " " + self.sig_refs[0].name
+ qth += " " + self.sig_refs[0].name
+ self.dx_qth = qth
else:
self.dx_qth = lookup_helper.infer_qth_from_callsign_online_lookup(self.dx_call, credentials)
@@ -353,10 +354,10 @@ class Spot:
# It looks like we can sometimes get a string into lat/lon, so try to parse as float, reject if not valid
if isinstance(self.dx_latitude, str) or isinstance(self.dx_longitude, str):
try:
- self.dx_latitude = float(self.dx_latitude)
- self.dx_longitude = float(self.dx_longitude)
+ self.dx_latitude = float(str(self.dx_latitude))
+ self.dx_longitude = float(str(self.dx_longitude))
except (TypeError, ValueError):
- logging.warning("Received non-numeric strings in lat/lon (" + str(self.dx_latitude) + ", " + str(self.dx_longitude) + ") for call " + self.dx_call + ", rejecting it")
+ logging.warning("Received non-numeric strings in lat/lon (" + str(self.dx_latitude) + ", " + str(self.dx_longitude) + ") for call " + str(self.dx_call) + ", rejecting it")
self.dx_latitude = None
self.dx_longitude = None
@@ -374,10 +375,10 @@ class Spot:
# DX Location is "good" if it is from a spot, or from QRZ if the callsign doesn't contain a slash, so the operator
# is likely at home.
- self.dx_location_good = self.dx_latitude and self.dx_longitude and (
+ self.dx_location_good = bool(self.dx_latitude and self.dx_longitude and (
self.dx_location_source == "SPOT" or self.dx_location_source == "SIG REF LOOKUP"
or self.dx_location_source == "WAB/WAI GRID"
- or (self.dx_location_source == "HOME QTH" and not "/" in self.dx_call))
+ or (self.dx_location_source == "HOME QTH" and "/" not in (self.dx_call or ""))))
# DE with no digits and APRS servers starting "T2" are not things we can look up location for
if self.de_call and any(char.isdigit() for char in self.de_call) and not (
@@ -406,16 +407,16 @@ class Spot:
def _append_sig_ref_if_missing(self, new_sig_ref):
"""Append a sig_ref to the list, so long as it's not already there."""
- if not self.sig_refs:
- self.sig_refs = []
+ sig_refs = self.sig_refs or []
+ self.sig_refs = sig_refs
new_sig_ref.id = new_sig_ref.id.strip().upper()
new_sig_ref.sig = new_sig_ref.sig.strip().upper()
if new_sig_ref.id == "":
return
- for sig_ref in self.sig_refs:
+ for sig_ref in sig_refs:
if sig_ref.id == new_sig_ref.id and sig_ref.sig == new_sig_ref.sig:
return
- self.sig_refs.append(new_sig_ref)
+ sig_refs.append(new_sig_ref)
def expired(self):
"""Decide if this spot has expired (in which case it should not be added to the system in the first place, and not
diff --git a/server/handlers/api/addspot.py b/server/handlers/api/addspot.py
index 1f82b92..7ef5da3 100644
--- a/server/handlers/api/addspot.py
+++ b/server/handlers/api/addspot.py
@@ -2,9 +2,12 @@ import json
import logging
import re
from datetime import datetime
+from typing import Any
import pytz
import tornado
+from tornado import httputil
+from tornado.web import Application
from core.config import ALLOW_SPOTTING, MAX_SPOT_AGE
from core.constants import UNKNOWN_BAND
@@ -19,6 +22,11 @@ from data.spot import Spot
class APISpotHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/spot (POST)"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._spots = None
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, spots, web_server_metrics):
self._spots = spots
self._web_server_metrics = web_server_metrics
diff --git a/server/handlers/api/alerts.py b/server/handlers/api/alerts.py
index ae61f9c..f91d6f1 100644
--- a/server/handlers/api/alerts.py
+++ b/server/handlers/api/alerts.py
@@ -3,16 +3,18 @@ import json
import logging
from datetime import datetime
from queue import Queue
+from typing import Any
import pytz
import tornado
import tornado_eventsource.handler
+from tornado import httputil
+from tornado.web import Application
from core.prometheus_metrics_handler import api_requests_counter
from core.utils import serialize_everything, empty_queue
from data.lookup_credentials import extract_credentials
-
SSE_HANDLER_MAX_QUEUE_SIZE = 100
SSE_HANDLER_QUEUE_CHECK_INTERVAL = 5000
@@ -20,6 +22,11 @@ SSE_HANDLER_QUEUE_CHECK_INTERVAL = 5000
class APIAlertsHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/alerts"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._alerts = None
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, alerts, web_server_metrics):
self._alerts = alerts
self._web_server_metrics = web_server_metrics
@@ -67,6 +74,15 @@ class APIAlertsHandler(tornado.web.RequestHandler):
class APIAlertsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
"""API request handler for /api/v1/alerts/stream"""
+ def __init__(self, application, request, **kwargs: Any):
+ self._sse_alert_queues = None
+ self._web_server_metrics = None
+ self._query_params = None
+ self._credentials = None
+ self._alert_queue = None
+ self._heartbeat = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, sse_alert_queues, web_server_metrics):
self._sse_alert_queues = sse_alert_queues
self._web_server_metrics = web_server_metrics
diff --git a/server/handlers/api/dxstats.py b/server/handlers/api/dxstats.py
index f96bcf8..e6f3b03 100644
--- a/server/handlers/api/dxstats.py
+++ b/server/handlers/api/dxstats.py
@@ -1,9 +1,12 @@
import json
from collections import Counter
from datetime import datetime, timedelta
+from typing import Any
import pytz
import tornado
+from tornado import httputil
+from tornado.web import Application
from core.prometheus_metrics_handler import api_requests_counter
@@ -16,6 +19,11 @@ BANDS_SET = frozenset(BANDS)
class APIDxStatsHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/dxstats"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._spots = None
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, spots, web_server_metrics):
self._spots = spots
self._web_server_metrics = web_server_metrics
diff --git a/server/handlers/api/lookups.py b/server/handlers/api/lookups.py
index 42357e6..4f525f2 100644
--- a/server/handlers/api/lookups.py
+++ b/server/handlers/api/lookups.py
@@ -2,9 +2,12 @@ import json
import logging
import re
from datetime import datetime
+from typing import Any
import pytz
import tornado
+from tornado import httputil
+from tornado.web import Application
from core.constants import SIGS
from core.geo_utils import lat_lon_for_grid_sw_corner_plus_size, lat_lon_to_cq_zone, lat_lon_to_itu_zone
@@ -19,6 +22,10 @@ from data.spot import Spot
class APILookupCallHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/lookup/call"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, web_server_metrics):
self._web_server_metrics = web_server_metrics
@@ -36,7 +43,7 @@ class APILookupCallHandler(tornado.web.RequestHandler):
# The "call" query param must exist and look like a callsign
if "call" in query_params.keys():
- call = query_params.get("call").upper()
+ call = str(query_params.get("call")).upper()
if re.match(r"^[A-Z0-9/\-]*$", call):
# Take the callsign, make a "fake spot" so we can run infer_missing() on it, then repack the
# resulting data in the correct way for the API response.
@@ -80,6 +87,10 @@ class APILookupCallHandler(tornado.web.RequestHandler):
class APILookupSIGRefHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/lookup/sigref"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, web_server_metrics):
self._web_server_metrics = web_server_metrics
@@ -98,8 +109,8 @@ class APILookupSIGRefHandler(tornado.web.RequestHandler):
# "sig" and "id" query params must exist, SIG must be known, and if we have a reference regex for that SIG,
# the provided id must match it.
if "sig" in query_params.keys() and "id" in query_params.keys():
- sig = query_params.get("sig").upper()
- ref_id = query_params.get("id").upper()
+ sig = str(query_params.get("sig")).upper()
+ ref_id = str(query_params.get("id")).upper()
if sig in list(map(lambda p: p.name, SIGS)):
if not get_ref_regex_for_sig(sig) or re.match(get_ref_regex_for_sig(sig), ref_id):
data = populate_sig_ref_info(SIGRef(id=ref_id, sig=sig))
@@ -129,6 +140,10 @@ class APILookupSIGRefHandler(tornado.web.RequestHandler):
class APILookupGridHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/lookup/grid"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, web_server_metrics):
self._web_server_metrics = web_server_metrics
@@ -146,7 +161,7 @@ class APILookupGridHandler(tornado.web.RequestHandler):
# "grid" query param must exist.
if "grid" in query_params.keys():
- grid = query_params.get("grid").upper()
+ grid = str(query_params.get("grid")).upper()
lat, lon, lat_cell_size, lon_cell_size = lat_lon_for_grid_sw_corner_plus_size(grid)
if lat is not None and lon is not None and lat_cell_size is not None and lon_cell_size is not None:
center_lat = lat + lat_cell_size / 2.0
diff --git a/server/handlers/api/options.py b/server/handlers/api/options.py
index b442a5c..f9b66dd 100644
--- a/server/handlers/api/options.py
+++ b/server/handlers/api/options.py
@@ -1,8 +1,11 @@
import json
from datetime import datetime
+from typing import Any
import pytz
import tornado
+from tornado import httputil
+from tornado.web import Application
from core.config import MAX_SPOT_AGE, ALLOW_SPOTTING
from core.constants import BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS
@@ -13,6 +16,11 @@ from core.utils import serialize_everything
class APIOptionsHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/options"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._status_data = None
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, status_data, web_server_metrics):
self._status_data = status_data
self._web_server_metrics = web_server_metrics
diff --git a/server/handlers/api/solar_conditions.py b/server/handlers/api/solar_conditions.py
index 1d684bc..5fdcc46 100644
--- a/server/handlers/api/solar_conditions.py
+++ b/server/handlers/api/solar_conditions.py
@@ -1,7 +1,10 @@
from datetime import datetime
+from typing import Any
import pytz
import tornado
+from tornado import httputil
+from tornado.web import Application
from core.prometheus_metrics_handler import api_requests_counter
@@ -9,6 +12,11 @@ from core.prometheus_metrics_handler import api_requests_counter
class APISolarConditionsHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/solar"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._solar_conditions = None
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, solar_conditions, web_server_metrics):
self._solar_conditions = solar_conditions
self._web_server_metrics = web_server_metrics
diff --git a/server/handlers/api/spots.py b/server/handlers/api/spots.py
index b435eba..ab6d3e2 100644
--- a/server/handlers/api/spots.py
+++ b/server/handlers/api/spots.py
@@ -3,16 +3,18 @@ import json
import logging
from datetime import datetime, timedelta
from queue import Queue
+from typing import Any
import pytz
import tornado
import tornado_eventsource.handler
+from tornado import httputil
+from tornado.web import Application
from core.prometheus_metrics_handler import api_requests_counter
from core.utils import serialize_everything, empty_queue
from data.lookup_credentials import extract_credentials
-
SSE_HANDLER_MAX_QUEUE_SIZE = 1000
SSE_HANDLER_QUEUE_CHECK_INTERVAL = 5000
@@ -20,6 +22,11 @@ SSE_HANDLER_QUEUE_CHECK_INTERVAL = 5000
class APISpotsHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/spots"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._spots = None
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, spots, web_server_metrics):
self._spots = spots
self._web_server_metrics = web_server_metrics
@@ -67,6 +74,15 @@ class APISpotsHandler(tornado.web.RequestHandler):
class APISpotsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
"""API request handler for /api/v1/spots/stream"""
+ def __init__(self, application, request, **kwargs: Any):
+ self._sse_spot_queues = None
+ self._web_server_metrics = None
+ self._query_params = None
+ self._credentials = None
+ self._spot_queue = None
+ self._heartbeat = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, sse_spot_queues, web_server_metrics):
self._sse_spot_queues = sse_spot_queues
self._web_server_metrics = web_server_metrics
diff --git a/server/handlers/api/status.py b/server/handlers/api/status.py
index 862b3fe..eb3efb9 100644
--- a/server/handlers/api/status.py
+++ b/server/handlers/api/status.py
@@ -1,8 +1,11 @@
import json
from datetime import datetime
+from typing import Any
import pytz
import tornado
+from tornado import httputil
+from tornado.web import Application
from core.prometheus_metrics_handler import api_requests_counter
from core.utils import serialize_everything
@@ -11,6 +14,11 @@ from core.utils import serialize_everything
class APIStatusHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/status"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._status_data = None
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, status_data, web_server_metrics):
self._status_data = status_data
self._web_server_metrics = web_server_metrics
diff --git a/server/handlers/pagetemplate.py b/server/handlers/pagetemplate.py
index 30898d9..c96aa14 100644
--- a/server/handlers/pagetemplate.py
+++ b/server/handlers/pagetemplate.py
@@ -1,7 +1,10 @@
from datetime import datetime
+from typing import Any
import pytz
import tornado
+from tornado import httputil
+from tornado.web import Application
from core.config import ALLOW_SPOTTING, WEB_UI_OPTIONS, BASE_URL, SERVER_OWNER_CALLSIGN
from core.constants import SOFTWARE_VERSION
@@ -11,6 +14,11 @@ from core.prometheus_metrics_handler import page_requests_counter
class PageTemplateHandler(tornado.web.RequestHandler):
"""Handler for all HTML pages generated from templates"""
+ def __init__(self, application: "Application", request: httputil.HTTPServerRequest, **kwargs: Any):
+ self._template_name = None
+ self._web_server_metrics = None
+ super().__init__(application, request, **kwargs)
+
def initialize(self, template_name, web_server_metrics):
self._template_name = template_name
self._web_server_metrics = web_server_metrics
diff --git a/server/webserver.py b/server/webserver.py
index d204fb6..69f0926 100644
--- a/server/webserver.py
+++ b/server/webserver.py
@@ -19,6 +19,9 @@ from server.handlers.metrics import PrometheusMetricsHandler
from server.handlers.pagetemplate import PageTemplateHandler
+_HERE = os.path.dirname(__file__ or "")
+
+
class WebServer:
"""Provides the public-facing web server."""
@@ -101,11 +104,11 @@ class WebServer:
misc_routes = [
(r"/apidocs", PageTemplateHandler, {"template_name": "apidocs", **handler_opts}),
(r"/metrics", PrometheusMetricsHandler),
- (r"/(.*)", StaticFileHandler, {"path": os.path.join(os.path.dirname(__file__), "../webassets")})
+ (r"/(.*)", StaticFileHandler, {"path": os.path.join(_HERE, "../webassets")})
]
app = tornado.web.Application(api_routes + ui_routes + misc_routes,
- template_path=os.path.join(os.path.dirname(__file__), "../templates"),
+ template_path=os.path.join(_HERE, "../templates"),
debug=False)
app.listen(self._port)
logging.info("Web server running on port " + str(WEB_SERVER_PORT))
diff --git a/solarconditionsproviders/giroionosonde.py b/solarconditionsproviders/giroionosonde.py
index 6911ebc..1042aec 100644
--- a/solarconditionsproviders/giroionosonde.py
+++ b/solarconditionsproviders/giroionosonde.py
@@ -28,7 +28,8 @@ class GIROIonosonde(SolarConditionsProvider):
self._thread = None
self._stop_event = Event()
- def _load_stations(self):
+ @staticmethod
+ def _load_stations():
stations = []
with open(STATIONS_INDEX, newline='') as f:
for row in csv.reader(f):
diff --git a/solarconditionsproviders/hamqsl.py b/solarconditionsproviders/hamqsl.py
index 1d51aa5..c20db58 100644
--- a/solarconditionsproviders/hamqsl.py
+++ b/solarconditionsproviders/hamqsl.py
@@ -32,6 +32,9 @@ class HamQSL(HTTPSolarConditionsProvider):
# Some error checking functions in case the data is janky.
def text(tag, default=None):
+ if sd is None:
+ logging.warning("HamQSL solar conditions API returned unexpected XML structure")
+ return default
el = sd.find(tag)
return el.text.strip() if el is not None and el.text else default
@@ -104,7 +107,7 @@ class HamQSL(HTTPSolarConditionsProvider):
"geomag_noise": text("signalnoise"),
"hf_conditions": hf_conditions,
"vhf_conditions": {
- "vhf_aurora_northern_hemi": vhf_map.get(("vhf-aurora", "northern_hemi")).title().replace("Lat Aur", "Latitude"),
+ "vhf_aurora_northern_hemi": (vhf_map.get(("vhf-aurora", "northern_hemi")) or "").title().replace("Lat Aur", "Latitude") or None,
"es_2m_europe": vhf_map.get(("E-Skip", "europe")),
"es_4m_europe": vhf_map.get(("E-Skip", "europe_4m")),
"es_6m_europe": vhf_map.get(("E-Skip", "europe_6m")),
diff --git a/solarconditionsproviders/ionosonde_utils.py b/solarconditionsproviders/ionosonde_utils.py
index 9c50e86..0a3a787 100644
--- a/solarconditionsproviders/ionosonde_utils.py
+++ b/solarconditionsproviders/ionosonde_utils.py
@@ -3,8 +3,8 @@ from core.constants import BANDS
HF_BANDS = [b for b in BANDS if b.is_ham_hf]
-def _latest(d):
- return d[max(d.keys())] if d else None
+def _latest(d) -> float | None:
+ return float(d[max(d.keys())]) if d else None
def compute_band_states(fof2_dict, muf_dict, luf_dict):
diff --git a/solarconditionsproviders/solar_conditions_provider.py b/solarconditionsproviders/solar_conditions_provider.py
index 7479456..6967166 100644
--- a/solarconditionsproviders/solar_conditions_provider.py
+++ b/solarconditionsproviders/solar_conditions_provider.py
@@ -10,6 +10,7 @@ class SolarConditionsProvider:
def __init__(self, provider_config):
"""Constructor"""
+ self._solar_conditions_cache = None
self.name = provider_config["name"]
self.enabled = provider_config["enabled"]
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
diff --git a/spothole.py b/spothole.py
index f5455be..cef083d 100644
--- a/spothole.py
+++ b/spothole.py
@@ -29,13 +29,14 @@ cleanup_timer = None
run = True
-def shutdown(sig, frame):
+def shutdown(_signum=None, _frame=None):
"""Shutdown function"""
global run
logging.info("Stopping program...")
- web_server.stop()
+ if web_server:
+ web_server.stop()
for sp in spot_providers:
if sp.enabled:
sp.stop()
@@ -45,8 +46,10 @@ def shutdown(sig, frame):
for scp in solar_condition_providers:
if scp.enabled:
scp.stop()
- cleanup_timer.stop()
- lookup_helper.stop()
+ if cleanup_timer:
+ cleanup_timer.stop()
+ if lookup_helper:
+ lookup_helper.stop()
spots.close()
alerts.close()
solar_conditions_cache.close()
diff --git a/spotproviders/aprsis.py b/spotproviders/aprsis.py
index 9da7aed..9178949 100644
--- a/spotproviders/aprsis.py
+++ b/spotproviders/aprsis.py
@@ -37,20 +37,20 @@ class APRSIS(SpotProvider):
def _handle(self, data):
# Split SSID in "from" call and store separately
- from_parts = data["from"].split("-").upper()
- dx_call = from_parts[0]
- dx_ssid = from_parts[1] if len(from_parts) > 1 else None
- via_parts = data["via"].split("-").upper()
- de_call = via_parts[0]
- de_ssid = via_parts[1] if len(via_parts) > 1 else None
+ from_parts = str(data["from"]).split("-")
+ dx_call = from_parts[0].upper()
+ dx_ssid = from_parts[1].upper() if len(from_parts) > 1 else None
+ via_parts = str(data["via"]).split("-")
+ de_call = via_parts[0].upper()
+ de_ssid = via_parts[1].upper() if len(via_parts) > 1 else None
spot = Spot(source="APRS-IS",
dx_call=dx_call,
dx_ssid=dx_ssid,
de_call=de_call,
de_ssid=de_ssid,
- comment=data["comment"] if "comment" in data else None,
- dx_latitude=data["latitude"] if "latitude" in data else None,
- dx_longitude=data["longitude"] if "longitude" in data else None,
+ comment=str(data["comment"]) if "comment" in data else None,
+ dx_latitude=float(data["latitude"]) if "latitude" in data else None,
+ dx_longitude=float(data["longitude"]) if "longitude" in data else None,
time=datetime.now(
pytz.UTC).timestamp()) # APRS-IS spots are live so we can assume spot time is "now"
diff --git a/spotproviders/gma.py b/spotproviders/gma.py
index 732c8cd..46b989d 100644
--- a/spotproviders/gma.py
+++ b/spotproviders/gma.py
@@ -55,7 +55,7 @@ class GMA(HTTPSpotProvider):
# spots come through with reftype=POTA or reftype=WWFF. SOTA is harder to figure out because both SOTA
# and GMA summits come through with reftype=Summit, so we must check for the presence of a "sota" entry
# to determine if it's a SOTA summit.
- if "reftype" in ref_info and ref_info["reftype"] not in ["POTA", "WWFF"] and (
+ if spot.sig_refs and "reftype" in ref_info and ref_info["reftype"] not in ["POTA", "WWFF"] and (
ref_info["reftype"] != "Summit" or "sota" not in ref_info or ref_info["sota"] == ""):
match ref_info["reftype"]:
case "Summit":
diff --git a/spotproviders/hema.py b/spotproviders/hema.py
index cad3167..b2517e7 100644
--- a/spotproviders/hema.py
+++ b/spotproviders/hema.py
@@ -45,6 +45,8 @@ class HEMA(HTTPSpotProvider):
# Fiddle with some data to extract bits we need. Freq/mode and spotter/comment come in combined fields.
freq_mode_match = re.search(self.FREQ_MODE_PATTERN, spot_items[5])
spotter_comment_match = re.search(self.SPOTTER_COMMENT_PATTERN, spot_items[6])
+ if not freq_mode_match or not spotter_comment_match:
+ continue
# Convert to our spot format
spot = Spot(source=self.name,
diff --git a/spotproviders/llota.py b/spotproviders/llota.py
index 484bc97..609c158 100644
--- a/spotproviders/llota.py
+++ b/spotproviders/llota.py
@@ -22,8 +22,8 @@ class LLOTA(HTTPSpotProvider):
comment = None
spotter = None
if "history" in source_spot and len(source_spot["history"]) > 0:
- comment = source_spot["history"][-1]["comment"]
- spotter = source_spot["history"][-1]["spotter_callsign"]
+ comment = str(source_spot["history"][-1]["comment"])
+ spotter = str(source_spot["history"][-1]["spotter_callsign"])
# Convert to our spot format
spot = Spot(source=self.name,
source_id=source_spot["id"],
diff --git a/spotproviders/parksnpeaks.py b/spotproviders/parksnpeaks.py
index ae63201..bac7c99 100644
--- a/spotproviders/parksnpeaks.py
+++ b/spotproviders/parksnpeaks.py
@@ -39,9 +39,9 @@ class ParksNPeaks(HTTPSpotProvider):
tzinfo=pytz.UTC).timestamp())
# Extract a de_call if it's in the comment but not in the "actSpoter" field
- m = re.search(r"\(de ([A-Za-z0-9]*)\)", spot.comment)
+ m = re.search(r"\(de ([A-Za-z0-9]*)\)", spot.comment or "")
if not spot.de_call and m:
- spot.de_call = m.group(1)
+ spot.de_call = str(m.group(1))
# Record SIG information. Sometimes we get a "SIG" of "QRP", which we ignore as it's not a programme with a
# defined set of references
@@ -49,11 +49,12 @@ class ParksNPeaks(HTTPSpotProvider):
sig_ref = source_spot["actSiteID"]
if sig and sig != "" and sig != "QRP" and sig_ref and sig_ref != "":
spot.sig = sig
- spot.sig_refs = [SIGRef(id=source_spot["actSiteID"], sig=source_spot["actClass"].upper())]
+ sig_refs = [SIGRef(id=source_spot["actSiteID"], sig=source_spot["actClass"].upper())]
+ spot.sig_refs = sig_refs
# Free text location is not present in all spots, so only add it if it's set
if "actLocation" in source_spot and source_spot["actLocation"] != "":
- spot.sig_refs[0].name = source_spot["actLocation"]
+ sig_refs[0].name = source_spot["actLocation"]
# Log a warning for the developer if PnP gives us an unknown programme we've never seen before
if sig not in ["POTA", "SOTA", "WWFF", "SIOTA", "ZLOTA", "KRMNPA"]:
diff --git a/spotproviders/rbn.py b/spotproviders/rbn.py
index e264732..830dfdd 100644
--- a/spotproviders/rbn.py
+++ b/spotproviders/rbn.py
@@ -46,7 +46,7 @@ class RBN(SpotProvider):
self.status = "Connecting"
logging.info("RBN port " + str(self._port) + " connecting...")
self._telnet = telnetlib3.Telnet("telnet.reversebeacon.net", self._port)
- telnet_output = self._telnet.read_until("Please enter your call: ".encode("latin-1"))
+ self._telnet.read_until("Please enter your call: ".encode("latin-1"))
self._telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("latin-1"))
connected = True
logging.info("RBN port " + str(self._port) + " connected.")
diff --git a/spotproviders/wota.py b/spotproviders/wota.py
index 0af0760..643a63e 100644
--- a/spotproviders/wota.py
+++ b/spotproviders/wota.py
@@ -1,9 +1,11 @@
import logging
import re
from datetime import datetime
+from typing import cast
import pytz
from rss_parser import Parser
+from rss_parser.models.rss import RSS
from data.sig_ref import SIGRef
from data.spot import Spot
@@ -23,7 +25,7 @@ class WOTA(HTTPSpotProvider):
def _http_response_to_spots(self, http_response):
new_spots = []
- rss = Parser.parse(http_response.content.decode())
+ rss = cast(RSS, Parser.parse(http_response.content.decode()))
# Iterate through source data
for source_spot in rss.channel.items:
@@ -39,9 +41,9 @@ class WOTA(HTTPSpotProvider):
ref_name = None
if len(title_split) > 1:
ref_split = title_split[1].split(" - ")
- ref = ref_split[0]
+ ref = str(ref_split[0])
if len(ref_split) > 1:
- ref_name = ref_split[1]
+ ref_name = str(ref_split[1])
# Pick apart the description
desc_split = source_spot.description.split(". ")
diff --git a/spotproviders/xota.py b/spotproviders/xota.py
index cfa9565..945f4f0 100644
--- a/spotproviders/xota.py
+++ b/spotproviders/xota.py
@@ -22,8 +22,8 @@ class XOTA(WebsocketSpotProvider):
def __init__(self, provider_config):
super().__init__(provider_config, provider_config["url"])
- locations_csv = provider_config["locations-csv"] if "locations-csv" in provider_config else None
- self.SIG = provider_config["sig"] if "sig" in provider_config else None
+ locations_csv = str(provider_config["locations-csv"]) if "locations-csv" in provider_config else None
+ self.SIG = str(provider_config["sig"]) if "sig" in provider_config else None
# Load location data
if locations_csv:
@@ -48,7 +48,7 @@ class XOTA(WebsocketSpotProvider):
freq=float(source_spot["freq"]) * 1000,
mode=source_spot["mode"].upper(),
sig=self.SIG,
- sig_refs=[SIGRef(id=ref_id, sig=self.SIG, url=source_spot["reference"]["website"], latitude=lat,
+ sig_refs=[SIGRef(id=ref_id, sig=self.SIG or "", url=source_spot["reference"]["website"], latitude=lat,
longitude=lon)],
time=datetime.now(pytz.UTC).timestamp(),
dx_latitude=lat,
diff --git a/templates/add_spot.html b/templates/add_spot.html
index 86bc018..70f6086 100644
--- a/templates/add_spot.html
+++ b/templates/add_spot.html
@@ -69,7 +69,7 @@
-
+
{% end %}
\ No newline at end of file
diff --git a/templates/alerts.html b/templates/alerts.html
index 5927145..50250c6 100644
--- a/templates/alerts.html
+++ b/templates/alerts.html
@@ -70,7 +70,7 @@
-
+
{% end %}
\ No newline at end of file
diff --git a/templates/bands.html b/templates/bands.html
index a877d6f..97e7e60 100644
--- a/templates/bands.html
+++ b/templates/bands.html
@@ -76,8 +76,8 @@
-
-
+
+
{% end %}
\ No newline at end of file
diff --git a/templates/base.html b/templates/base.html
index 5818fd7..c21988e 100644
--- a/templates/base.html
+++ b/templates/base.html
@@ -1,6 +1,6 @@
{% extends "skeleton.html" %}
{% block head_extra %}
-
+
@@ -10,10 +10,10 @@
-
-
-
-
+
+
+
+
{% end %}
{% block body %}
diff --git a/templates/conditions.html b/templates/conditions.html
index 4f3baa2..2c009f4 100644
--- a/templates/conditions.html
+++ b/templates/conditions.html
@@ -284,7 +284,7 @@
-
+
diff --git a/templates/map.html b/templates/map.html
index 15acbd8..7ae5f89 100644
--- a/templates/map.html
+++ b/templates/map.html
@@ -94,8 +94,8 @@
-
-
+
+
{% end %}
\ No newline at end of file
diff --git a/templates/spots.html b/templates/spots.html
index af8d255..05a3439 100644
--- a/templates/spots.html
+++ b/templates/spots.html
@@ -104,8 +104,8 @@
-
-
+
+
{% end %}
\ No newline at end of file
diff --git a/templates/status.html b/templates/status.html
index d259953..d1ec107 100644
--- a/templates/status.html
+++ b/templates/status.html
@@ -59,7 +59,7 @@
-
+