mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-02-04 17:24:30 +00:00
175 lines
8.1 KiB
Python
175 lines
8.1 KiB
Python
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from queue import Queue
|
|
|
|
import pytz
|
|
import tornado
|
|
import tornado_eventsource.handler
|
|
|
|
from core.prometheus_metrics_handler import api_requests_counter
|
|
from core.utils import serialize_everything
|
|
|
|
SSE_HANDLER_MAX_QUEUE_SIZE = 100
|
|
SSE_HANDLER_QUEUE_CHECK_INTERVAL = 5000
|
|
|
|
|
|
# API request handler for /api/v1/alerts
|
|
class APIAlertsHandler(tornado.web.RequestHandler):
|
|
def initialize(self, alerts, web_server_metrics):
|
|
self.alerts = alerts
|
|
self.web_server_metrics = web_server_metrics
|
|
|
|
def get(self):
|
|
try:
|
|
# Metrics
|
|
self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC)
|
|
self.web_server_metrics["api_access_counter"] += 1
|
|
self.web_server_metrics["status"] = "OK"
|
|
api_requests_counter.inc()
|
|
|
|
# request.arguments contains lists for each param key because technically the client can supply multiple,
|
|
# reduce that to just the first entry, and convert bytes to string
|
|
query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()}
|
|
|
|
# Fetch all alerts matching the query
|
|
data = get_alert_list_with_filters(self.alerts, query_params)
|
|
self.write(json.dumps(data, default=serialize_everything))
|
|
self.set_status(200)
|
|
except ValueError as e:
|
|
logging.error(e)
|
|
self.write(json.dumps("Bad request - " + str(e), default=serialize_everything))
|
|
self.set_status(400)
|
|
except Exception as e:
|
|
logging.error(e)
|
|
self.write(json.dumps("Error - " + str(e), default=serialize_everything))
|
|
self.set_status(500)
|
|
self.set_header("Cache-Control", "no-store")
|
|
self.set_header("Content-Type", "application/json")
|
|
|
|
# API request handler for /api/v1/alerts/stream
|
|
class APIAlertsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
|
|
def initialize(self, sse_alert_queues, web_server_metrics):
|
|
self.sse_alert_queues = sse_alert_queues
|
|
self.web_server_metrics = web_server_metrics
|
|
|
|
def open(self):
|
|
try:
|
|
# Metrics
|
|
self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC)
|
|
self.web_server_metrics["api_access_counter"] += 1
|
|
self.web_server_metrics["status"] = "OK"
|
|
api_requests_counter.inc()
|
|
|
|
# request.arguments contains lists for each param key because technically the client can supply multiple,
|
|
# reduce that to just the first entry, and convert bytes to string
|
|
self.query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()}
|
|
|
|
# Create a alert queue and add it to the web server's list. The web server will fill this when alerts arrive
|
|
self.alert_queue = Queue(maxsize=SSE_HANDLER_MAX_QUEUE_SIZE)
|
|
self.sse_alert_queues.append(self.alert_queue)
|
|
|
|
# Set up a timed callback to check if anything is in the queue
|
|
self.heartbeat = tornado.ioloop.PeriodicCallback(self._callback, SSE_HANDLER_QUEUE_CHECK_INTERVAL)
|
|
self.heartbeat.start()
|
|
|
|
except Exception as e:
|
|
logging.warn("Exception when serving SSE socket", e)
|
|
|
|
# When the user closes the socket, empty our queue and remove it from the list so the server no longer fills it
|
|
def close(self):
|
|
try:
|
|
if self.alert_queue in self.sse_alert_queues:
|
|
self.sse_alert_queues.remove(self.alert_queue)
|
|
self.alert_queue.empty()
|
|
except:
|
|
pass
|
|
self.alert_queue = None
|
|
super().close()
|
|
|
|
# Callback to check if anything has arrived in the queue, and if so send it to the client
|
|
def _callback(self):
|
|
try:
|
|
if self.alert_queue:
|
|
while not self.alert_queue.empty():
|
|
alert = self.alert_queue.get()
|
|
# If the new alert matches our param filters, send it to the client. If not, ignore it.
|
|
if alert_allowed_by_query(alert, self.query_params):
|
|
self.write_message(msg=json.dumps(alert, default=serialize_everything))
|
|
|
|
if self.alert_queue not in self.sse_alert_queues:
|
|
logging.error("Web server cleared up a queue of an active connection!")
|
|
self.close()
|
|
except:
|
|
logging.warn("Exception in SSE callback, connection will be closed.")
|
|
self.close()
|
|
|
|
|
|
|
|
|
|
# Utility method to apply filters to the overall alert list and return only a subset. Enables query parameters in
|
|
# the main "alerts" GET call.
|
|
def get_alert_list_with_filters(all_alerts, query):
|
|
# Create a shallow copy of the alert list ordered by start time, then filter the list to reduce it only to alerts
|
|
# that match the filter parameters in the query string. Finally, apply a limit to the number of alerts returned.
|
|
# The list of query string filters is defined in the API docs.
|
|
alert_ids = list(all_alerts.iterkeys())
|
|
alerts = []
|
|
for k in alert_ids:
|
|
a = all_alerts.get(k)
|
|
if a is not None:
|
|
alerts.append(a)
|
|
alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0))
|
|
alerts = list(filter(lambda alert: alert_allowed_by_query(alert, query), alerts))
|
|
if "limit" in query.keys():
|
|
alerts = alerts[:int(query.get("limit"))]
|
|
return alerts
|
|
|
|
# Given URL query params and an alert, figure out if the alert "passes" the requested filters or is rejected. The list
|
|
# of query parameters and their function is defined in the API docs.
|
|
def alert_allowed_by_query(alert, query):
|
|
for k in query.keys():
|
|
match k:
|
|
case "received_since":
|
|
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC)
|
|
if not alert.received_time or alert.received_time <= since:
|
|
return False
|
|
case "max_duration":
|
|
max_duration = int(query.get(k))
|
|
# Check the duration if end_time is provided. If end_time is not provided, assume the activation is
|
|
# "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and
|
|
# the alert is a dxpedition, it also always passes the check.
|
|
if alert.is_dxpedition and (bool(query.get(
|
|
"dxpeditions_skip_max_duration_check")) if "dxpeditions_skip_max_duration_check" in query.keys() else False):
|
|
continue
|
|
if alert.end_time and alert.start_time and alert.end_time - alert.start_time > max_duration:
|
|
return False
|
|
case "source":
|
|
sources = query.get(k).split(",")
|
|
if not alert.source or alert.source not in sources:
|
|
return False
|
|
case "sig":
|
|
# If a list of sigs is provided, the alert must have a sig and it must match one of them.
|
|
# The special "sig" "NO_SIG", when supplied in the list, mathches alerts with no sig.
|
|
sigs = query.get(k).split(",")
|
|
include_no_sig = "NO_SIG" in sigs
|
|
if not alert.sig and not include_no_sig:
|
|
return False
|
|
if alert.sig and alert.sig not in sigs:
|
|
return False
|
|
case "dx_continent":
|
|
dxconts = query.get(k).split(",")
|
|
if not alert.dx_continent or alert.dx_continent not in dxconts:
|
|
return False
|
|
case "dx_call_includes":
|
|
dx_call_includes = query.get(k).strip()
|
|
if not alert.dx_call or dx_call_includes.upper() not in alert.dx_call.upper():
|
|
return False
|
|
case "text_includes":
|
|
text_includes = query.get(k).strip()
|
|
if (not alert.dx_call or text_includes.upper() not in alert.dx_call.upper()) \
|
|
and (not alert.comment or text_includes.upper() not in alert.comment.upper()) \
|
|
and (not alert.freqs_modes or text_includes.upper() not in alert.freqs_modes.upper()):
|
|
return False
|
|
return True
|