import asyncio import json import os from datetime import datetime, timedelta import pytz import tornado from prometheus_client.openmetrics.exposition import CONTENT_TYPE_LATEST from tornado.web import StaticFileHandler from core.config import ALLOW_SPOTTING, MAX_SPOT_AGE, WEB_UI_OPTIONS from core.constants import SOFTWARE_VERSION, BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS from core.prometheus_metrics_handler import get_metrics, page_requests_counter # Provides the public-facing web server. # TODO synchronous API responses # TODO SSE API responses # TODO clean_up_sse_queues # TODO page & API access counters - how to do from a subclass handler? e.g. # self.last_api_access_time = datetime.now(pytz.UTC) # self.api_access_counter += 1 # api_requests_counter.inc() # self.status = "OK" # # self.last_page_access_time = datetime.now(pytz.UTC) # self.page_access_counter += 1 # page_requests_counter.inc() # self.status = "OK" class WebServer: # Constructor def __init__(self, spots, alerts, status_data, port): self.last_page_access_time = None self.last_api_access_time = None self.page_access_counter = 0 self.api_access_counter = 0 self.spots = spots self.alerts = alerts self.sse_spot_queues = [] self.sse_alert_queues = [] self.status_data = status_data self.port = port self.status = "Starting" self.shutdown_event = asyncio.Event() # Start the web server def start(self): asyncio.run(self.start_inner()) # Stop the web server def stop(self): self.shutdown_event.set() # Start method (async). Sets up the Tornado application. async def start_inner(self): app = tornado.web.Application([ # Routes for API calls (r"/api/v1/spots", APISpotsHandler), (r"/api/v1/alerts", APIAlertsHandler), (r"/api/v1/spots/stream", APISpotsStreamHandler), (r"/api/v1/alerts/stream", APIAlertsStreamHandler), (r"/api/v1/options", APIOptionsHandler, {"status_data": self.status_data}), (r"/api/v1/status", APIStatusHandler, {"status_data": self.status_data}), (r"/api/v1/lookup/call", APILookupCallHandler), (r"/api/v1/lookup/sigref", APILookupSIGRefHandler), (r"/api/v1/spot", APISpotHandler), # Routes for templated pages (r"/", PageTemplateHandler, {"template_name": "spots"}), (r"/map", PageTemplateHandler, {"template_name": "map"}), (r"/bands", PageTemplateHandler, {"template_name": "bands"}), (r"/alerts", PageTemplateHandler, {"template_name": "alerts"}), (r"/add-spot", PageTemplateHandler, {"template_name": "add_spot"}), (r"/status", PageTemplateHandler, {"template_name": "status"}), (r"/about", PageTemplateHandler, {"template_name": "about"}), (r"/apidocs", PageTemplateHandler, {"template_name": "apidocs"}), # Route for Prometheus metrics (r"/metrics", PrometheusMetricsHandler), # Default route to serve from "webassets" (r"/(.*)", StaticFileHandler, {"path": os.path.join(os.path.dirname(__file__), "../webassets")}), ], template_path=os.path.join(os.path.dirname(__file__), "../templates"), debug=True) # todo set false app.listen(self.port) await self.shutdown_event.wait() # Clean up any SSE queues that are growing too large; probably their client disconnected. def clean_up_sse_queues(self): # todo pass # API request handler for /api/v1/spots class APISpotsHandler(tornado.web.RequestHandler): def get(self): # todo self.write("Hello, world") # API request handler for /api/v1/alerts class APIAlertsHandler(tornado.web.RequestHandler): def get(self): # todo self.write("Hello, world") # API request handler for /api/v1/spots/stream class APISpotsStreamHandler(tornado.web.RequestHandler): def get(self): # todo self.write("Hello, world") # API request handler for /api/v1/alerts/stream class APIAlertsStreamHandler(tornado.web.RequestHandler): def get(self): # todo self.write("Hello, world") # API request handler for /api/v1/options class APIOptionsHandler(tornado.web.RequestHandler): def initialize(self, status_data): self.status_data = status_data def get(self): options = {"bands": BANDS, "modes": ALL_MODES, "mode_types": MODE_TYPES, "sigs": SIGS, # Spot/alert sources are filtered for only ones that are enabled in config, no point letting the user toggle things that aren't even available. "spot_sources": list( map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["spot_providers"]))), "alert_sources": list( map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["alert_providers"]))), "continents": CONTINENTS, "max_spot_age": MAX_SPOT_AGE, "spot_allowed": ALLOW_SPOTTING, "web-ui-options": WEB_UI_OPTIONS} # If spotting to this server is enabled, "API" is another valid spot source even though it does not come from # one of our proviers. if ALLOW_SPOTTING: options["spot_sources"].append("API") self.write(json.dumps(options, default=serialize_everything)) self.set_header("Cache-Control", "no-store") self.set_header("Content-Type", "application/json") # API request handler for /api/v1/status class APIStatusHandler(tornado.web.RequestHandler): def initialize(self, status_data): self.status_data = status_data def get(self): self.write(json.dumps(self.status_data, default=serialize_everything)) self.set_header("Cache-Control", "no-store") self.set_header("Content-Type", "application/json") # API request handler for /api/v1/lookup/call class APILookupCallHandler(tornado.web.RequestHandler): def get(self): # todo self.write("Hello, world") # API request handler for /api/v1/lookup/sigref class APILookupSIGRefHandler(tornado.web.RequestHandler): def get(self): # todo self.write("Hello, world") # API request handler for /api/v1/spot (POST) class APISpotHandler(tornado.web.RequestHandler): def post(self): # todo self.write("Hello, world") # Handler for all HTML pages generated from templates class PageTemplateHandler(tornado.web.RequestHandler): def initialize(self, template_name): self.template_name = template_name def get(self): # Load named template, and provide variables used in templates self.render(self.template_name + ".html", software_version=SOFTWARE_VERSION, allow_spotting=ALLOW_SPOTTING) # Handler for Prometheus metrics endpoint class PrometheusMetricsHandler(tornado.web.RequestHandler): def get(self): self.write(get_metrics()) self.set_status(200) self.set_header('Content-Type', CONTENT_TYPE_LATEST) # Given URL query params and a spot, figure out if the spot "passes" the requested filters or is rejected. The list # of query parameters and their function is defined in the API docs. def spot_allowed_by_query(spot, query): for k in query.keys(): match k: case "since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp() if not spot.time or spot.time <= since: return False case "max_age": max_age = int(query.get(k)) since = (datetime.now(pytz.UTC) - timedelta(seconds=max_age)).timestamp() if not spot.time or spot.time <= since: return False case "received_since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp() if not spot.received_time or spot.received_time <= since: return False case "source": sources = query.get(k).split(",") if not spot.source or spot.source not in sources: return False case "sig": # If a list of sigs is provided, the spot must have a sig and it must match one of them. # The special "sig" "NO_SIG", when supplied in the list, mathches spots with no sig. sigs = query.get(k).split(",") include_no_sig = "NO_SIG" in sigs if not spot.sig and not include_no_sig: return False if spot.sig and spot.sig not in sigs: return False case "needs_sig": # If true, a sig is required, regardless of what it is, it just can't be missing. Mutually # exclusive with supplying the special "NO_SIG" parameter to the "sig" query param. needs_sig = query.get(k).upper() == "TRUE" if needs_sig and not spot.sig: return False case "needs_sig_ref": # If true, at least one sig ref is required, regardless of what it is, it just can't be missing. needs_sig_ref = query.get(k).upper() == "TRUE" if needs_sig_ref and (not spot.sig_refs or len(spot.sig_refs) == 0): return False case "band": bands = query.get(k).split(",") if not spot.band or spot.band not in bands: return False case "mode": modes = query.get(k).split(",") if not spot.mode or spot.mode not in modes: return False case "mode_type": mode_types = query.get(k).split(",") if not spot.mode_type or spot.mode_type not in mode_types: return False case "dx_continent": dxconts = query.get(k).split(",") if not spot.dx_continent or spot.dx_continent not in dxconts: return False case "de_continent": deconts = query.get(k).split(",") if not spot.de_continent or spot.de_continent not in deconts: return False case "comment_includes": comment_includes = query.get(k).strip() if not spot.comment or comment_includes.upper() not in spot.comment.upper(): return False case "dx_call_includes": dx_call_includes = query.get(k).strip() if not spot.dx_call or dx_call_includes.upper() not in spot.dx_call.upper(): return False case "allow_qrt": # If false, spots that are flagged as QRT are not returned. prevent_qrt = query.get(k).upper() == "FALSE" if prevent_qrt and spot.qrt and spot.qrt == True: return False case "needs_good_location": # If true, spots require a "good" location to be returned needs_good_location = query.get(k).upper() == "TRUE" if needs_good_location and not spot.dx_location_good: return False return True # Given URL query params and an alert, figure out if the alert "passes" the requested filters or is rejected. The list # of query parameters and their function is defined in the API docs. def alert_allowed_by_query(alert, query): for k in query.keys(): match k: case "received_since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC) if not alert.received_time or alert.received_time <= since: return False case "max_duration": max_duration = int(query.get(k)) # Check the duration if end_time is provided. If end_time is not provided, assume the activation is # "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and # the alert is a dxpedition, it also always passes the check. if alert.is_dxpedition and (bool(query.get( "dxpeditions_skip_max_duration_check")) if "dxpeditions_skip_max_duration_check" in query.keys() else False): continue if alert.end_time and alert.start_time and alert.end_time - alert.start_time > max_duration: return False case "source": sources = query.get(k).split(",") if not alert.source or alert.source not in sources: return False case "sig": # If a list of sigs is provided, the alert must have a sig and it must match one of them. # The special "sig" "NO_SIG", when supplied in the list, mathches alerts with no sig. sigs = query.get(k).split(",") include_no_sig = "NO_SIG" in sigs if not alert.sig and not include_no_sig: return False if alert.sig and alert.sig not in sigs: return False case "dx_continent": dxconts = query.get(k).split(",") if not alert.dx_continent or alert.dx_continent not in dxconts: return False case "dx_call_includes": dx_call_includes = query.get(k).strip() if not alert.dx_call or dx_call_includes.upper() not in alert.dx_call.upper(): return False return True # Convert objects to serialisable things. Used by JSON serialiser as a default when it encounters unserializable things. # Just converts objects to dict. Try to avoid doing anything clever here when serialising spots, because we also need # to receive spots without complex handling. def serialize_everything(obj): return obj.__dict__