import asyncio import os import tornado from tornado.web import StaticFileHandler from server.handlers.api.addspot import APISpotHandler from server.handlers.api.alerts import APIAlertsHandler, APIAlertsStreamHandler from server.handlers.api.lookups import APILookupCallHandler, APILookupSIGRefHandler from server.handlers.api.options import APIOptionsHandler from server.handlers.api.spots import APISpotsHandler, APISpotsStreamHandler from server.handlers.api.status import APIStatusHandler from server.handlers.metrics import PrometheusMetricsHandler from server.handlers.pagetemplate import PageTemplateHandler # Provides the public-facing web server. # TODO SSE API responses # TODO clean_up_sse_queues class WebServer: # Constructor def __init__(self, spots, alerts, status_data, port): self.spots = spots self.alerts = alerts self.sse_spot_queues = [] self.sse_alert_queues = [] self.status_data = status_data self.port = port self.shutdown_event = asyncio.Event() self.web_server_metrics = { "last_page_access_time": None, "last_api_access_time": None, "page_access_counter": 0, "api_access_counter": 0, "status": "Starting" } # Start the web server def start(self): asyncio.run(self.start_inner()) # Stop the web server def stop(self): self.shutdown_event.set() # Start method (async). Sets up the Tornado application. async def start_inner(self): app = tornado.web.Application([ # Routes for API calls (r"/api/v1/spots", APISpotsHandler, {"spots": self.spots, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/alerts", APIAlertsHandler, {"alerts": self.alerts, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/spots/stream", APISpotsStreamHandler), # todo provide queues? (r"/api/v1/alerts/stream", APIAlertsStreamHandler), # todo provide queues? (r"/api/v1/options", APIOptionsHandler, {"status_data": self.status_data, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/status", APIStatusHandler, {"status_data": self.status_data, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/lookup/call", APILookupCallHandler, {"web_server_metrics": self.web_server_metrics}), (r"/api/v1/lookup/sigref", APILookupSIGRefHandler, {"web_server_metrics": self.web_server_metrics}), (r"/api/v1/spot", APISpotHandler, {"spots": self.spots, "web_server_metrics": self.web_server_metrics}), # Routes for templated pages (r"/", PageTemplateHandler, {"template_name": "spots", "web_server_metrics": self.web_server_metrics}), (r"/map", PageTemplateHandler, {"template_name": "map", "web_server_metrics": self.web_server_metrics}), (r"/bands", PageTemplateHandler, {"template_name": "bands", "web_server_metrics": self.web_server_metrics}), (r"/alerts", PageTemplateHandler, {"template_name": "alerts", "web_server_metrics": self.web_server_metrics}), (r"/add-spot", PageTemplateHandler, {"template_name": "add_spot", "web_server_metrics": self.web_server_metrics}), (r"/status", PageTemplateHandler, {"template_name": "status", "web_server_metrics": self.web_server_metrics}), (r"/about", PageTemplateHandler, {"template_name": "about", "web_server_metrics": self.web_server_metrics}), (r"/apidocs", PageTemplateHandler, {"template_name": "apidocs", "web_server_metrics": self.web_server_metrics}), # Route for Prometheus metrics (r"/metrics", PrometheusMetricsHandler), # Default route to serve from "webassets" (r"/(.*)", StaticFileHandler, {"path": os.path.join(os.path.dirname(__file__), "../webassets")}), ], template_path=os.path.join(os.path.dirname(__file__), "../templates"), debug=True) # todo set false app.listen(self.port) await self.shutdown_event.wait() # Internal method called when a new spot is added to the system. This is used to ping any SSE clients that are # awaiting a server-sent message with new spots. def notify_new_spot(self, spot): # todo # for queue in self.sse_spot_queues: # try: # queue.put(spot) # except: # # Cleanup thread was probably deleting the queue, that's fine # pass pass # Internal method called when a new alert is added to the system. This is used to ping any SSE clients that are # awaiting a server-sent message with new spots. def notify_new_alert(self, alert): # todo # for queue in self.sse_alert_queues: # try: # queue.put(alert) # except: # # Cleanup thread was probably deleting the queue, that's fine # pass pass # Clean up any SSE queues that are growing too large; probably their client disconnected. def clean_up_sse_queues(self): # todo # self.sse_spot_queues = [q for q in self.sse_spot_queues if not q.full()] # self.sse_alert_queues = [q for q in self.sse_alert_queues if not q.full()] pass