import json import logging from datetime import datetime, timedelta from queue import Queue import pytz import tornado import tornado_eventsource.handler from core.prometheus_metrics_handler import api_requests_counter from core.utils import serialize_everything SSE_HANDLER_MAX_QUEUE_SIZE = 1000 SSE_HANDLER_QUEUE_CHECK_INTERVAL = 5000 # API request handler for /api/v1/spots class APISpotsHandler(tornado.web.RequestHandler): def initialize(self, spots, web_server_metrics): self.spots = spots self.web_server_metrics = web_server_metrics def get(self): try: # Metrics self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC) self.web_server_metrics["api_access_counter"] += 1 self.web_server_metrics["status"] = "OK" api_requests_counter.inc() # request.arguments contains lists for each param key because technically the client can supply multiple, # reduce that to just the first entry, and convert bytes to string query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()} # Fetch all spots matching the query data = get_spot_list_with_filters(self.spots, query_params) self.write(json.dumps(data, default=serialize_everything)) self.set_status(200) except ValueError as e: logging.error(e) self.write(json.dumps("Bad request - " + str(e), default=serialize_everything)) self.set_status(400) except Exception as e: logging.error(e) self.write(json.dumps("Error - " + str(e), default=serialize_everything)) self.set_status(500) self.set_header("Cache-Control", "no-store") self.set_header("Content-Type", "application/json") # API request handler for /api/v1/spots/stream class APISpotsStreamHandler(tornado_eventsource.handler.EventSourceHandler): def initialize(self, sse_spot_queues, web_server_metrics): self.sse_spot_queues = sse_spot_queues self.web_server_metrics = web_server_metrics # Called once on the client opening a connection, set things up def open(self): try: # Metrics self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC) self.web_server_metrics["api_access_counter"] += 1 self.web_server_metrics["status"] = "OK" api_requests_counter.inc() # request.arguments contains lists for each param key because technically the client can supply multiple, # reduce that to just the first entry, and convert bytes to string self.query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()} # Create a spot queue and add it to the web server's list. The web server will fill this when spots arrive self.spot_queue = Queue(maxsize=SSE_HANDLER_MAX_QUEUE_SIZE) self.sse_spot_queues.append(self.spot_queue) # Set up a timed callback to check if anything is in the queue self.heartbeat = tornado.ioloop.PeriodicCallback(self._callback, SSE_HANDLER_QUEUE_CHECK_INTERVAL) self.heartbeat.start() except Exception as e: logging.warn("Exception when serving SSE socket", e) # When the user closes the socket, empty our queue and remove it from the list so the server no longer fills it def close(self): try: if self.spot_queue in self.sse_spot_queues: self.sse_spot_queues.remove(self.spot_queue) self.spot_queue.empty() except: pass self.spot_queue = None super().close() # Callback to check if anything has arrived in the queue, and if so send it to the client def _callback(self): try: if self.spot_queue: while not self.spot_queue.empty(): spot = self.spot_queue.get() # If the new spot matches our param filters, send it to the client. If not, ignore it. if spot_allowed_by_query(spot, self.query_params): self.write_message(msg=json.dumps(spot, default=serialize_everything)) if self.spot_queue not in self.sse_spot_queues: logging.error("Web server cleared up a queue of an active connection!") self.close() except: logging.warn("Exception in SSE callback, connection will be closed.") self.close() # Utility method to apply filters to the overall spot list and return only a subset. Enables query parameters in # the main "spots" GET call. def get_spot_list_with_filters(all_spots, query): # Create a shallow copy of the spot list, ordered by spot time, then filter the list to reduce it only to spots # that match the filter parameters in the query string. Finally, apply a limit to the number of spots returned. # The list of query string filters is defined in the API docs. spot_ids = list(all_spots.iterkeys()) spots = [] for k in spot_ids: s = all_spots.get(k) if s is not None: spots.append(s) spots = sorted(spots, key=lambda spot: (spot.time if spot and spot.time else 0), reverse=True) spots = list(filter(lambda spot: spot_allowed_by_query(spot, query), spots)) if "limit" in query.keys(): spots = spots[:int(query.get("limit"))] # Ensure only the latest spot of each callsign-SSID combo is present in the list. This relies on the # list being in reverse time order, so if any future change allows re-ordering the list, that should # be done *after* this. SSIDs are deliberately included here (see issue #68) because e.g. M0TRT-7 # and M0TRT-9 APRS transponders could well be in different locations, on different frequencies etc. # This is a special consideration for the geo map and band map views (and Field Spotter) because while # duplicates are fine in the main spot list (e.g. different cluster spots of the same DX) this doesn't # work well for the other views. if "dedupe" in query.keys(): dedupe = query.get("dedupe").upper() == "TRUE" if dedupe: spots_temp = [] already_seen = [] for s in spots: call_plus_ssid = s.dx_call + (s.dx_ssid if s.dx_ssid else "") if call_plus_ssid not in already_seen: spots_temp.append(s) already_seen.append(call_plus_ssid) spots = spots_temp return spots # Given URL query params and a spot, figure out if the spot "passes" the requested filters or is rejected. The list # of query parameters and their function is defined in the API docs. def spot_allowed_by_query(spot, query): for k in query.keys(): match k: case "since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp() if not spot.time or spot.time <= since: return False case "max_age": max_age = int(query.get(k)) since = (datetime.now(pytz.UTC) - timedelta(seconds=max_age)).timestamp() if not spot.time or spot.time <= since: return False case "received_since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp() if not spot.received_time or spot.received_time <= since: return False case "source": sources = query.get(k).split(",") if not spot.source or spot.source not in sources: return False case "sig": # If a list of sigs is provided, the spot must have a sig and it must match one of them. # The special "sig" "NO_SIG", when supplied in the list, mathches spots with no sig. sigs = query.get(k).split(",") include_no_sig = "NO_SIG" in sigs if not spot.sig and not include_no_sig: return False if spot.sig and spot.sig not in sigs: return False case "needs_sig": # If true, a sig is required, regardless of what it is, it just can't be missing. Mutually # exclusive with supplying the special "NO_SIG" parameter to the "sig" query param. needs_sig = query.get(k).upper() == "TRUE" if needs_sig and not spot.sig: return False case "needs_sig_ref": # If true, at least one sig ref is required, regardless of what it is, it just can't be missing. needs_sig_ref = query.get(k).upper() == "TRUE" if needs_sig_ref and (not spot.sig_refs or len(spot.sig_refs) == 0): return False case "band": bands = query.get(k).split(",") if not spot.band or spot.band not in bands: return False case "mode": modes = query.get(k).split(",") if not spot.mode or spot.mode not in modes: return False case "mode_type": mode_types = query.get(k).split(",") if not spot.mode_type or spot.mode_type not in mode_types: return False case "dx_continent": dxconts = query.get(k).split(",") if not spot.dx_continent or spot.dx_continent not in dxconts: return False case "de_continent": deconts = query.get(k).split(",") if not spot.de_continent or spot.de_continent not in deconts: return False case "comment_includes": comment_includes = query.get(k).strip() if not spot.comment or comment_includes.upper() not in spot.comment.upper(): return False case "dx_call_includes": dx_call_includes = query.get(k).strip() if not spot.dx_call or dx_call_includes.upper() not in spot.dx_call.upper(): return False case "text_includes": text_includes = query.get(k).strip() if (not spot.dx_call or text_includes.upper() not in spot.dx_call.upper()) \ and (not spot.comment or text_includes.upper() not in spot.comment.upper()): return False case "allow_qrt": # If false, spots that are flagged as QRT are not returned. prevent_qrt = query.get(k).upper() == "FALSE" if prevent_qrt and spot.qrt and spot.qrt == True: return False case "needs_good_location": # If true, spots require a "good" location to be returned needs_good_location = query.get(k).upper() == "TRUE" if needs_good_location and not spot.dx_location_good: return False return True