import asyncio import logging import os import tornado from tornado.web import StaticFileHandler from server.handlers.api.addspot import APISpotHandler from server.handlers.api.alerts import APIAlertsHandler, APIAlertsStreamHandler from server.handlers.api.lookups import APILookupCallHandler, APILookupSIGRefHandler from server.handlers.api.options import APIOptionsHandler from server.handlers.api.spots import APISpotsHandler, APISpotsStreamHandler from server.handlers.api.status import APIStatusHandler from server.handlers.metrics import PrometheusMetricsHandler from server.handlers.pagetemplate import PageTemplateHandler # Provides the public-facing web server. class WebServer: # Constructor def __init__(self, spots, alerts, status_data, port): self.spots = spots self.alerts = alerts self.sse_spot_queues = [] self.sse_alert_queues = [] self.status_data = status_data self.port = port self.shutdown_event = asyncio.Event() self.web_server_metrics = { "last_page_access_time": None, "last_api_access_time": None, "page_access_counter": 0, "api_access_counter": 0, "status": "Starting" } # Start the web server def start(self): asyncio.run(self.start_inner()) # Stop the web server def stop(self): self.shutdown_event.set() # Start method (async). Sets up the Tornado application. async def start_inner(self): app = tornado.web.Application([ # Routes for API calls (r"/api/v1/spots", APISpotsHandler, {"spots": self.spots, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/alerts", APIAlertsHandler, {"alerts": self.alerts, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/spots/stream", APISpotsStreamHandler, {"sse_spot_queues": self.sse_spot_queues, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/alerts/stream", APIAlertsStreamHandler, {"sse_alert_queues": self.sse_alert_queues, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/options", APIOptionsHandler, {"status_data": self.status_data, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/status", APIStatusHandler, {"status_data": self.status_data, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/lookup/call", APILookupCallHandler, {"web_server_metrics": self.web_server_metrics}), (r"/api/v1/lookup/sigref", APILookupSIGRefHandler, {"web_server_metrics": self.web_server_metrics}), (r"/api/v1/spot", APISpotHandler, {"spots": self.spots, "web_server_metrics": self.web_server_metrics}), # Routes for templated pages (r"/", PageTemplateHandler, {"template_name": "spots", "web_server_metrics": self.web_server_metrics}), (r"/map", PageTemplateHandler, {"template_name": "map", "web_server_metrics": self.web_server_metrics}), (r"/bands", PageTemplateHandler, {"template_name": "bands", "web_server_metrics": self.web_server_metrics}), (r"/alerts", PageTemplateHandler, {"template_name": "alerts", "web_server_metrics": self.web_server_metrics}), (r"/add-spot", PageTemplateHandler, {"template_name": "add_spot", "web_server_metrics": self.web_server_metrics}), (r"/status", PageTemplateHandler, {"template_name": "status", "web_server_metrics": self.web_server_metrics}), (r"/about", PageTemplateHandler, {"template_name": "about", "web_server_metrics": self.web_server_metrics}), (r"/apidocs", PageTemplateHandler, {"template_name": "apidocs", "web_server_metrics": self.web_server_metrics}), # Route for Prometheus metrics (r"/metrics", PrometheusMetricsHandler), # Default route to serve from "webassets" (r"/(.*)", StaticFileHandler, {"path": os.path.join(os.path.dirname(__file__), "../webassets")}), ], template_path=os.path.join(os.path.dirname(__file__), "../templates"), debug=False) app.listen(self.port) await self.shutdown_event.wait() # Internal method called when a new spot is added to the system. This is used to ping any SSE clients that are # awaiting a server-sent message with new spots. def notify_new_spot(self, spot): for queue in self.sse_spot_queues: try: queue.put(spot) except: # Cleanup thread was probably deleting the queue, that's fine pass pass # Internal method called when a new alert is added to the system. This is used to ping any SSE clients that are # awaiting a server-sent message with new spots. def notify_new_alert(self, alert): for queue in self.sse_alert_queues: try: queue.put(alert) except: # Cleanup thread was probably deleting the queue, that's fine pass pass # Clean up any SSE queues that are growing too large; probably their client disconnected and we didn't catch it # properly for some reason. def clean_up_sse_queues(self): for q in self.sse_spot_queues: try: if q.full(): logging.warn("A full SSE spot queue was found, presumably because the client disconnected strangely. It has been removed.") self.sse_spot_queues.remove(q) q.empty() except: # Probably got deleted already on another thread pass for q in self.sse_alert_queues: try: if q.full(): logging.warn("A full SSE alert queue was found, presumably because the client disconnected strangely. It has been removed.") self.sse_alert_queues.remove(q) q.empty() except: # Probably got deleted already on another thread pass pass