Improve adherence to python coding standards and clear up IDE static analysis warnings

This commit is contained in:
Ian Renton
2026-02-27 19:17:04 +00:00
parent 6b18ec6f88
commit 6982354364
53 changed files with 633 additions and 626 deletions

View File

@@ -15,39 +15,39 @@ class AlertProvider:
self.enabled = provider_config["enabled"]
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
self.status = "Not Started" if self.enabled else "Disabled"
self.alerts = None
self.web_server = None
self._alerts = None
self._web_server = None
def setup(self, alerts, web_server):
"""Set up the provider, e.g. giving it the alert list to work from"""
self.alerts = alerts
self.web_server = web_server
self._alerts = alerts
self._web_server = web_server
def start(self):
"""Start the provider. This should return immediately after spawning threads to access the remote resources"""
raise NotImplementedError("Subclasses must implement this method")
def submit_batch(self, alerts):
def _submit_batch(self, alerts):
"""Submit a batch of alerts retrieved from the provider. There is no timestamp checking like there is for spots,
because alerts could be created at any point for any time in the future. Rely on hashcode-based id matching
to deal with duplicates."""
# Sort the batch so that earliest ones go in first. This helps keep the ordering correct when alerts are fired
# off to SSE listeners.
alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0))
alerts = sorted(alerts, key=lambda a: (a.start_time if a and a.start_time else 0))
for alert in alerts:
# Fill in any blanks and add to the list
alert.infer_missing()
self.add_alert(alert)
self._add_alert(alert)
def add_alert(self, alert):
def _add_alert(self, alert):
if not alert.expired():
self.alerts.add(alert.id, alert, expire=MAX_ALERT_AGE)
self._alerts.add(alert.id, alert, expire=MAX_ALERT_AGE)
# Ping the web server in case we have any SSE connections that need to see this immediately
if self.web_server:
self.web_server.notify_new_alert(alert)
if self._web_server:
self._web_server.notify_new_alert(alert)
def stop(self):
"""Stop any threads and prepare for application shutdown"""

View File

@@ -17,7 +17,7 @@ class BOTA(HTTPAlertProvider):
def __init__(self, provider_config):
super().__init__(provider_config, self.ALERTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_alerts(self, http_response):
def _http_response_to_alerts(self, http_response):
new_alerts = []
# Find the table of upcoming alerts
bs = BeautifulSoup(http_response.content.decode(), features="lxml")

View File

@@ -15,14 +15,15 @@ class HTTPAlertProvider(AlertProvider):
def __init__(self, provider_config, url, poll_interval):
super().__init__(provider_config)
self.url = url
self.poll_interval = poll_interval
self._url = url
self._poll_interval = poll_interval
self._thread = None
self._stop_event = Event()
def start(self):
# Fire off the polling thread. It will poll immediately on startup, then sleep for poll_interval between
# subsequent polls, so start() returns immediately and the application can continue starting.
logging.info("Set up query of " + self.name + " alert API every " + str(self.poll_interval) + " seconds.")
logging.info("Set up query of " + self.name + " alert API every " + str(self._poll_interval) + " seconds.")
self._thread = Thread(target=self._run, daemon=True)
self._thread.start()
@@ -32,31 +33,31 @@ class HTTPAlertProvider(AlertProvider):
def _run(self):
while True:
self._poll()
if self._stop_event.wait(timeout=self.poll_interval):
if self._stop_event.wait(timeout=self._poll_interval):
break
def _poll(self):
try:
# Request data from API
logging.debug("Polling " + self.name + " alert API...")
http_response = requests.get(self.url, headers=HTTP_HEADERS)
http_response = requests.get(self._url, headers=HTTP_HEADERS)
# Pass off to the subclass for processing
new_alerts = self.http_response_to_alerts(http_response)
new_alerts = self._http_response_to_alerts(http_response)
# Submit the new alerts for processing. There might not be any alerts for the less popular programs.
if new_alerts:
self.submit_batch(new_alerts)
self._submit_batch(new_alerts)
self.status = "OK"
self.last_update_time = datetime.now(pytz.UTC)
logging.debug("Received data from " + self.name + " alert API.")
except Exception as e:
except Exception:
self.status = "Error"
logging.exception("Exception in HTTP JSON Alert Provider (" + self.name + ")")
# Brief pause on error before the next poll, but still respond promptly to stop()
self._stop_event.wait(timeout=1)
def http_response_to_alerts(self, http_response):
def _http_response_to_alerts(self, http_response):
"""Convert an HTTP response returned by the API into alert data. The whole response is provided here so the subclass
implementations can check for HTTP status codes if necessary, and handle the response as JSON, XML, text, whatever
the API actually provides."""

View File

@@ -18,7 +18,7 @@ class NG3K(HTTPAlertProvider):
def __init__(self, provider_config):
super().__init__(provider_config, self.ALERTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_alerts(self, http_response):
def _http_response_to_alerts(self, http_response):
new_alerts = []
rss = RSSParser.parse(http_response.content.decode())
# Iterate through source data

View File

@@ -17,7 +17,7 @@ class ParksNPeaks(HTTPAlertProvider):
def __init__(self, provider_config):
super().__init__(provider_config, self.ALERTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_alerts(self, http_response):
def _http_response_to_alerts(self, http_response):
new_alerts = []
# Iterate through source data
for source_alert in http_response.json():
@@ -45,7 +45,7 @@ class ParksNPeaks(HTTPAlertProvider):
# Log a warning for the developer if PnP gives us an unknown programme we've never seen before
if sig and sig not in ["POTA", "SOTA", "WWFF", "SiOTA", "ZLOTA", "KRMNPA"]:
logging.warn("PNP alert found with sig " + sig + ", developer needs to add support for this!")
logging.warning("PNP alert found with sig " + sig + ", developer needs to add support for this!")
# If this is POTA, SOTA or WWFF data we already have it through other means, so ignore. Otherwise, add to
# the alert list. Note that while ZLOTA has its own spots API, it doesn't have its own alerts API. So that

View File

@@ -16,7 +16,7 @@ class POTA(HTTPAlertProvider):
def __init__(self, provider_config):
super().__init__(provider_config, self.ALERTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_alerts(self, http_response):
def _http_response_to_alerts(self, http_response):
new_alerts = []
# Iterate through source data
for source_alert in http_response.json():

View File

@@ -16,7 +16,7 @@ class SOTA(HTTPAlertProvider):
def __init__(self, provider_config):
super().__init__(provider_config, self.ALERTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_alerts(self, http_response):
def _http_response_to_alerts(self, http_response):
new_alerts = []
# Iterate through source data
for source_alert in http_response.json():

View File

@@ -18,7 +18,7 @@ class WOTA(HTTPAlertProvider):
def __init__(self, provider_config):
super().__init__(provider_config, self.ALERTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_alerts(self, http_response):
def _http_response_to_alerts(self, http_response):
new_alerts = []
rss = RSSParser.parse(http_response.content.decode())
# Iterate through source data

View File

@@ -16,7 +16,7 @@ class WWFF(HTTPAlertProvider):
def __init__(self, provider_config):
super().__init__(provider_config, self.ALERTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_alerts(self, http_response):
def _http_response_to_alerts(self, http_response):
new_alerts = []
# Iterate through source data
for source_alert in http_response.json():