diff --git a/requirements.txt b/requirements.txt index a57523f..b5da9a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,5 @@ pyproj~=3.7.2 prometheus_client~=0.23.1 beautifulsoup4~=4.14.2 websocket-client~=1.9.0 -tornado~=6.5.4 \ No newline at end of file +tornado~=6.5.4 +tornado_eventsource~=3.0.0 \ No newline at end of file diff --git a/server/handlers/api/alerts.py b/server/handlers/api/alerts.py index ed4c53b..1af3339 100644 --- a/server/handlers/api/alerts.py +++ b/server/handlers/api/alerts.py @@ -1,9 +1,11 @@ import json import logging from datetime import datetime +from queue import Queue import pytz import tornado +import tornado_eventsource.handler from core.prometheus_metrics_handler import api_requests_counter from core.utils import serialize_everything @@ -43,28 +45,54 @@ class APIAlertsHandler(tornado.web.RequestHandler): self.set_header("Content-Type", "application/json") # API request handler for /api/v1/alerts/stream -class APIAlertsStreamHandler(tornado.web.RequestHandler): - def get(self): - # todo - # try: - # # Metrics - # api_requests_counter.inc() - # - # response.content_type = 'text/event-stream' - # response.cache_control = 'no-cache' - # yield 'retry: 1000\n\n' - # - # alert_queue = Queue(maxsize=100) - # self.sse_alert_queues.append(alert_queue) - # while True: - # if alert_queue.empty(): - # gevent.sleep(1) - # else: - # alert = alert_queue.get() - # yield 'data: ' + json.dumps(alert, default=serialize_everything) + '\n\n' - # except Exception as e: - # logging.warn("Exception when serving SSE socket", e) - pass +class APIAlertsStreamHandler(tornado_eventsource.handler.EventSourceHandler): + def initialize(self, sse_alert_queues, web_server_metrics): + self.sse_alert_queues = sse_alert_queues + self.web_server_metrics = web_server_metrics + + def open(self): + try: + # Metrics + self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC) + self.web_server_metrics["api_access_counter"] += 1 + self.web_server_metrics["status"] = "OK" + api_requests_counter.inc() + + # Create a alert queue and add it to the web server's list. The web server will fill this when alerts arrive + self.alert_queue = Queue(maxsize=100) + self.sse_alert_queues.append(self.alert_queue) + + # Set up a timed callback to check if anything is in the queue + self.heartbeat = tornado.ioloop.PeriodicCallback(self._callback, 1000) + self.heartbeat.start() + + except Exception as e: + logging.warn("Exception when serving SSE socket", e) + + # When the user closes the socket, empty our queue and remove it from the list so the server no longer fills it + def close(self): + try: + if self.alert_queue in self.sse_alert_queues: + self.sse_alert_queues.remove(self.alert_queue) + self.alert_queue.empty() + except: + pass + self.alert_queue = None + super().close() + + # Callback to check if anything has arrived in the queue, and if so send it to the client + def _callback(self): + try: + if self.alert_queue: + while not self.alert_queue.empty(): + alert = self.alert_queue.get() + self.write_message(msg=json.dumps(alert, default=serialize_everything)) + if self.alert_queue not in self.sse_alert_queues: + logging.error("Web server cleared up a queue of an active connection!") + self.close() + except: + logging.warn("Exception in SSE callback, connection will be closed.") + self.close() diff --git a/server/handlers/api/spots.py b/server/handlers/api/spots.py index b02ead0..3c369f1 100644 --- a/server/handlers/api/spots.py +++ b/server/handlers/api/spots.py @@ -1,9 +1,11 @@ import json import logging from datetime import datetime, timedelta +from queue import Queue import pytz import tornado +import tornado_eventsource.handler from core.prometheus_metrics_handler import api_requests_counter from core.utils import serialize_everything @@ -44,28 +46,55 @@ class APISpotsHandler(tornado.web.RequestHandler): # API request handler for /api/v1/spots/stream -class APISpotsStreamHandler(tornado.web.RequestHandler): - def get(self): - # todo - # try: - # # Metrics - # api_requests_counter.inc() - # - # response.content_type = 'text/event-stream' - # response.cache_control = 'no-cache' - # yield 'retry: 1000\n\n' - # - # spot_queue = Queue(maxsize=100) - # self.sse_spot_queues.append(spot_queue) - # while True: - # if spot_queue.empty(): - # gevent.sleep(1) - # else: - # spot = spot_queue.get() - # yield 'data: ' + json.dumps(spot, default=serialize_everything) + '\n\n' - # except Exception as e: - # logging.warn("Exception when serving SSE socket", e) - pass +class APISpotsStreamHandler(tornado_eventsource.handler.EventSourceHandler): + def initialize(self, sse_spot_queues, web_server_metrics): + self.sse_spot_queues = sse_spot_queues + self.web_server_metrics = web_server_metrics + + # Called once on the client opening a connection, set things up + def open(self): + try: + # Metrics + self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC) + self.web_server_metrics["api_access_counter"] += 1 + self.web_server_metrics["status"] = "OK" + api_requests_counter.inc() + + # Create a spot queue and add it to the web server's list. The web server will fill this when spots arrive + self.spot_queue = Queue(maxsize=1000) + self.sse_spot_queues.append(self.spot_queue) + + # Set up a timed callback to check if anything is in the queue + self.heartbeat = tornado.ioloop.PeriodicCallback(self._callback, 1000) + self.heartbeat.start() + + except Exception as e: + logging.warn("Exception when serving SSE socket", e) + + # When the user closes the socket, empty our queue and remove it from the list so the server no longer fills it + def close(self): + try: + if self.spot_queue in self.sse_spot_queues: + self.sse_spot_queues.remove(self.spot_queue) + self.spot_queue.empty() + except: + pass + self.spot_queue = None + super().close() + + # Callback to check if anything has arrived in the queue, and if so send it to the client + def _callback(self): + try: + if self.spot_queue: + while not self.spot_queue.empty(): + spot = self.spot_queue.get() + self.write_message(msg=json.dumps(spot, default=serialize_everything)) + if self.spot_queue not in self.sse_spot_queues: + logging.error("Web server cleared up a queue of an active connection!") + self.close() + except: + logging.warn("Exception in SSE callback, connection will be closed.") + self.close() diff --git a/server/webserver.py b/server/webserver.py index 73e8a81..ccefbd2 100644 --- a/server/webserver.py +++ b/server/webserver.py @@ -15,8 +15,6 @@ from server.handlers.pagetemplate import PageTemplateHandler # Provides the public-facing web server. -# TODO SSE API responses -# TODO clean_up_sse_queues class WebServer: # Constructor def __init__(self, spots, alerts, status_data, port): @@ -49,8 +47,8 @@ class WebServer: # Routes for API calls (r"/api/v1/spots", APISpotsHandler, {"spots": self.spots, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/alerts", APIAlertsHandler, {"alerts": self.alerts, "web_server_metrics": self.web_server_metrics}), - (r"/api/v1/spots/stream", APISpotsStreamHandler), # todo provide queues? - (r"/api/v1/alerts/stream", APIAlertsStreamHandler), # todo provide queues? + (r"/api/v1/spots/stream", APISpotsStreamHandler, {"sse_spot_queues": self.sse_spot_queues, "web_server_metrics": self.web_server_metrics}), + (r"/api/v1/alerts/stream", APIAlertsStreamHandler, {"sse_alert_queues": self.sse_alert_queues, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/options", APIOptionsHandler, {"status_data": self.status_data, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/status", APIStatusHandler, {"status_data": self.status_data, "web_server_metrics": self.web_server_metrics}), (r"/api/v1/lookup/call", APILookupCallHandler, {"web_server_metrics": self.web_server_metrics}), @@ -71,37 +69,35 @@ class WebServer: (r"/(.*)", StaticFileHandler, {"path": os.path.join(os.path.dirname(__file__), "../webassets")}), ], template_path=os.path.join(os.path.dirname(__file__), "../templates"), - debug=True) # todo set false + debug=False) app.listen(self.port) await self.shutdown_event.wait() # Internal method called when a new spot is added to the system. This is used to ping any SSE clients that are # awaiting a server-sent message with new spots. def notify_new_spot(self, spot): - # todo - # for queue in self.sse_spot_queues: - # try: - # queue.put(spot) - # except: - # # Cleanup thread was probably deleting the queue, that's fine - # pass + for queue in self.sse_spot_queues: + try: + queue.put(spot) + except: + # Cleanup thread was probably deleting the queue, that's fine + pass pass # Internal method called when a new alert is added to the system. This is used to ping any SSE clients that are # awaiting a server-sent message with new spots. def notify_new_alert(self, alert): - # todo - # for queue in self.sse_alert_queues: - # try: - # queue.put(alert) - # except: - # # Cleanup thread was probably deleting the queue, that's fine - # pass + for queue in self.sse_alert_queues: + try: + queue.put(alert) + except: + # Cleanup thread was probably deleting the queue, that's fine + pass pass - # Clean up any SSE queues that are growing too large; probably their client disconnected. + # Clean up any SSE queues that are growing too large; probably their client disconnected and we didn't catch it + # properly for some reason. def clean_up_sse_queues(self): - # todo - # self.sse_spot_queues = [q for q in self.sse_spot_queues if not q.full()] - # self.sse_alert_queues = [q for q in self.sse_alert_queues if not q.full()] + self.sse_spot_queues = [q for q in self.sse_spot_queues if not q.full()] + self.sse_alert_queues = [q for q in self.sse_alert_queues if not q.full()] pass diff --git a/templates/base.html b/templates/base.html index af9c42b..3d0a5e7 100644 --- a/templates/base.html +++ b/templates/base.html @@ -63,7 +63,7 @@