import json import logging from datetime import datetime from queue import Queue import pytz import tornado import tornado_eventsource.handler from core.prometheus_metrics_handler import api_requests_counter from core.utils import serialize_everything, empty_queue SSE_HANDLER_MAX_QUEUE_SIZE = 100 SSE_HANDLER_QUEUE_CHECK_INTERVAL = 5000 class APIAlertsHandler(tornado.web.RequestHandler): """API request handler for /api/v1/alerts""" def initialize(self, alerts, web_server_metrics): self._alerts = alerts self._web_server_metrics = web_server_metrics def get(self): try: # Metrics self._web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC) self._web_server_metrics["api_access_counter"] += 1 self._web_server_metrics["status"] = "OK" api_requests_counter.inc() # request.arguments contains lists for each param key because technically the client can supply multiple, # reduce that to just the first entry, and convert bytes to string query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()} # Fetch all alerts matching the query data = get_alert_list_with_filters(self._alerts, query_params) self.write(json.dumps(data, default=serialize_everything)) self.set_status(200) except ValueError as e: logging.error(e) self.write(json.dumps("Bad request - " + str(e), default=serialize_everything)) self.set_status(400) except Exception as e: logging.error(e) self.write(json.dumps("Error - " + str(e), default=serialize_everything)) self.set_status(500) self.set_header("Cache-Control", "no-store") self.set_header("Content-Type", "application/json") class APIAlertsStreamHandler(tornado_eventsource.handler.EventSourceHandler): """API request handler for /api/v1/alerts/stream""" def initialize(self, sse_alert_queues, web_server_metrics): self._sse_alert_queues = sse_alert_queues self._web_server_metrics = web_server_metrics def custom_headers(self): """Custom headers to avoid e.g. nginx reverse proxy from buffering SSE data""" return {"Cache-Control": "no-store", "X-Accel-Buffering": "no"} def open(self): try: # Metrics self._web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC) self._web_server_metrics["api_access_counter"] += 1 self._web_server_metrics["status"] = "OK" api_requests_counter.inc() # request.arguments contains lists for each param key because technically the client can supply multiple, # reduce that to just the first entry, and convert bytes to string self._query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()} # Create a alert queue and add it to the web server's list. The web server will fill this when alerts arrive self._alert_queue = Queue(maxsize=SSE_HANDLER_MAX_QUEUE_SIZE) self._sse_alert_queues.append(self._alert_queue) # Set up a timed callback to check if anything is in the queue self._heartbeat = tornado.ioloop.PeriodicCallback(self._callback, SSE_HANDLER_QUEUE_CHECK_INTERVAL) self._heartbeat.start() except Exception as e: logging.warning("Exception when serving SSE socket", e) def close(self): """When the user closes the socket, empty our queue and remove it from the list so the server no longer fills it""" try: if self._alert_queue in self._sse_alert_queues: self._sse_alert_queues.remove(self._alert_queue) empty_queue(self._alert_queue) except: pass try: self._heartbeat.stop() except: pass self._alert_queue = None super().close() def _callback(self): """Callback to check if anything has arrived in the queue, and if so send it to the client""" try: if self._alert_queue: while not self._alert_queue.empty(): alert = self._alert_queue.get() # If the new alert matches our param filters, send it to the client. If not, ignore it. if alert_allowed_by_query(alert, self._query_params): self.write_message(msg=json.dumps(alert, default=serialize_everything)) if self._alert_queue not in self._sse_alert_queues: logging.error("Web server cleared up a queue of an active connection!") self.close() except: logging.warning("Exception in SSE callback, connection will be closed.") self.close() def get_alert_list_with_filters(all_alerts, query): """Utility method to apply filters to the overall alert list and return only a subset. Enables query parameters in the main "alerts" GET call.""" # Create a shallow copy of the alert list ordered by start time, then filter the list to reduce it only to alerts # that match the filter parameters in the query string. Finally, apply a limit to the number of alerts returned. # The list of query string filters is defined in the API docs. alert_ids = list(all_alerts.iterkeys()) alerts = [] for k in alert_ids: a = all_alerts.get(k) if a is not None: alerts.append(a) alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0)) alerts = list(filter(lambda alert: alert_allowed_by_query(alert, query), alerts)) if "limit" in query.keys(): alerts = alerts[:int(query.get("limit"))] return alerts def alert_allowed_by_query(alert, query): """Given URL query params and an alert, figure out if the alert "passes" the requested filters or is rejected. The list of query parameters and their function is defined in the API docs.""" for k in query.keys(): match k: case "received_since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC) if not alert.received_time or alert.received_time <= since: return False case "max_duration": max_duration = int(query.get(k)) # Check the duration if end_time is provided. If end_time is not provided, assume the activation is # "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and # the alert is a dxpedition, it also always passes the check. if alert.is_dxpedition and (query.get( "dxpeditions_skip_max_duration_check").upper() == "TRUE" if "dxpeditions_skip_max_duration_check" in query.keys() else False): continue if alert.end_time and alert.start_time and alert.end_time - alert.start_time > max_duration: return False case "source": sources = query.get(k).split(",") if not alert.source or alert.source not in sources: return False case "sig": # If a list of sigs is provided, the alert must have a sig and it must match one of them. # The special "sig" "NO_SIG", when supplied in the list, mathches alerts with no sig. sigs = query.get(k).split(",") include_no_sig = "NO_SIG" in sigs if not alert.sig and not include_no_sig: return False if alert.sig and alert.sig not in sigs: return False case "dx_continent": dxconts = query.get(k).split(",") if not alert.dx_continent or alert.dx_continent not in dxconts: return False case "dx_call_includes": dx_call_includes = query.get(k).strip() if not alert.dx_call or dx_call_includes.upper() not in alert.dx_call.upper(): return False case "text_includes": text_includes = query.get(k).strip() if (not alert.dx_call or text_includes.upper() not in alert.dx_call.upper()) \ and (not alert.comment or text_includes.upper() not in alert.comment.upper()) \ and (not alert.freqs_modes or text_includes.upper() not in alert.freqs_modes.upper()): return False return True