import json import logging from datetime import datetime from queue import Queue import pytz import tornado import tornado_eventsource.handler from core.prometheus_metrics_handler import api_requests_counter from core.utils import serialize_everything SSE_HANDLER_MAX_QUEUE_SIZE = 100 SSE_HANDLER_QUEUE_CHECK_INTERVAL = 5000 # API request handler for /api/v1/alerts class APIAlertsHandler(tornado.web.RequestHandler): def initialize(self, alerts, web_server_metrics): self.alerts = alerts self.web_server_metrics = web_server_metrics def get(self): try: # Metrics self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC) self.web_server_metrics["api_access_counter"] += 1 self.web_server_metrics["status"] = "OK" api_requests_counter.inc() # request.arguments contains lists for each param key because technically the client can supply multiple, # reduce that to just the first entry, and convert bytes to string query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()} # Fetch all alerts matching the query data = get_alert_list_with_filters(self.alerts, query_params) self.write(json.dumps(data, default=serialize_everything)) self.set_status(200) except ValueError as e: logging.error(e) self.write(json.dumps("Bad request - " + str(e), default=serialize_everything)) self.set_status(400) except Exception as e: logging.error(e) self.write(json.dumps("Error - " + str(e), default=serialize_everything)) self.set_status(500) self.set_header("Cache-Control", "no-store") self.set_header("Content-Type", "application/json") # API request handler for /api/v1/alerts/stream class APIAlertsStreamHandler(tornado_eventsource.handler.EventSourceHandler): def initialize(self, sse_alert_queues, web_server_metrics): self.sse_alert_queues = sse_alert_queues self.web_server_metrics = web_server_metrics # Custom headers to avoid e.g. nginx reverse proxy from buffering SSE data def custom_headers(self): return {"Cache-Control": "no-store", "X-Accel-Buffering": "no"} def open(self): try: # Metrics self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC) self.web_server_metrics["api_access_counter"] += 1 self.web_server_metrics["status"] = "OK" api_requests_counter.inc() # request.arguments contains lists for each param key because technically the client can supply multiple, # reduce that to just the first entry, and convert bytes to string self.query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()} # Create a alert queue and add it to the web server's list. The web server will fill this when alerts arrive self.alert_queue = Queue(maxsize=SSE_HANDLER_MAX_QUEUE_SIZE) self.sse_alert_queues.append(self.alert_queue) # Set up a timed callback to check if anything is in the queue self.heartbeat = tornado.ioloop.PeriodicCallback(self._callback, SSE_HANDLER_QUEUE_CHECK_INTERVAL) self.heartbeat.start() except Exception as e: logging.warn("Exception when serving SSE socket", e) # When the user closes the socket, empty our queue and remove it from the list so the server no longer fills it def close(self): try: if self.alert_queue in self.sse_alert_queues: self.sse_alert_queues.remove(self.alert_queue) self.alert_queue.empty() except: pass self.alert_queue = None super().close() # Callback to check if anything has arrived in the queue, and if so send it to the client def _callback(self): try: if self.alert_queue: while not self.alert_queue.empty(): alert = self.alert_queue.get() # If the new alert matches our param filters, send it to the client. If not, ignore it. if alert_allowed_by_query(alert, self.query_params): self.write_message(msg=json.dumps(alert, default=serialize_everything)) if self.alert_queue not in self.sse_alert_queues: logging.error("Web server cleared up a queue of an active connection!") self.close() except: logging.warn("Exception in SSE callback, connection will be closed.") self.close() # Utility method to apply filters to the overall alert list and return only a subset. Enables query parameters in # the main "alerts" GET call. def get_alert_list_with_filters(all_alerts, query): # Create a shallow copy of the alert list ordered by start time, then filter the list to reduce it only to alerts # that match the filter parameters in the query string. Finally, apply a limit to the number of alerts returned. # The list of query string filters is defined in the API docs. alert_ids = list(all_alerts.iterkeys()) alerts = [] for k in alert_ids: a = all_alerts.get(k) if a is not None: alerts.append(a) alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0)) alerts = list(filter(lambda alert: alert_allowed_by_query(alert, query), alerts)) if "limit" in query.keys(): alerts = alerts[:int(query.get("limit"))] return alerts # Given URL query params and an alert, figure out if the alert "passes" the requested filters or is rejected. The list # of query parameters and their function is defined in the API docs. def alert_allowed_by_query(alert, query): for k in query.keys(): match k: case "received_since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC) if not alert.received_time or alert.received_time <= since: return False case "max_duration": max_duration = int(query.get(k)) # Check the duration if end_time is provided. If end_time is not provided, assume the activation is # "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and # the alert is a dxpedition, it also always passes the check. if alert.is_dxpedition and (bool(query.get( "dxpeditions_skip_max_duration_check")) if "dxpeditions_skip_max_duration_check" in query.keys() else False): continue if alert.end_time and alert.start_time and alert.end_time - alert.start_time > max_duration: return False case "source": sources = query.get(k).split(",") if not alert.source or alert.source not in sources: return False case "sig": # If a list of sigs is provided, the alert must have a sig and it must match one of them. # The special "sig" "NO_SIG", when supplied in the list, mathches alerts with no sig. sigs = query.get(k).split(",") include_no_sig = "NO_SIG" in sigs if not alert.sig and not include_no_sig: return False if alert.sig and alert.sig not in sigs: return False case "dx_continent": dxconts = query.get(k).split(",") if not alert.dx_continent or alert.dx_continent not in dxconts: return False case "dx_call_includes": dx_call_includes = query.get(k).strip() if not alert.dx_call or dx_call_includes.upper() not in alert.dx_call.upper(): return False case "text_includes": text_includes = query.get(k).strip() if (not alert.dx_call or text_includes.upper() not in alert.dx_call.upper()) \ and (not alert.comment or text_includes.upper() not in alert.comment.upper()) \ and (not alert.freqs_modes or text_includes.upper() not in alert.freqs_modes.upper()): return False return True