Implement SSE endpoints in Tornado #3

This commit is contained in:
Ian Renton
2025-12-23 21:01:41 +00:00
parent d463403018
commit 86beb27ebf
7 changed files with 129 additions and 73 deletions

View File

@@ -14,3 +14,4 @@ prometheus_client~=0.23.1
beautifulsoup4~=4.14.2
websocket-client~=1.9.0
tornado~=6.5.4
tornado_eventsource~=3.0.0

View File

@@ -1,9 +1,11 @@
import json
import logging
from datetime import datetime
from queue import Queue
import pytz
import tornado
import tornado_eventsource.handler
from core.prometheus_metrics_handler import api_requests_counter
from core.utils import serialize_everything
@@ -43,28 +45,54 @@ class APIAlertsHandler(tornado.web.RequestHandler):
self.set_header("Content-Type", "application/json")
# API request handler for /api/v1/alerts/stream
class APIAlertsStreamHandler(tornado.web.RequestHandler):
def get(self):
# todo
# try:
# # Metrics
# api_requests_counter.inc()
#
# response.content_type = 'text/event-stream'
# response.cache_control = 'no-cache'
# yield 'retry: 1000\n\n'
#
# alert_queue = Queue(maxsize=100)
# self.sse_alert_queues.append(alert_queue)
# while True:
# if alert_queue.empty():
# gevent.sleep(1)
# else:
# alert = alert_queue.get()
# yield 'data: ' + json.dumps(alert, default=serialize_everything) + '\n\n'
# except Exception as e:
# logging.warn("Exception when serving SSE socket", e)
pass
class APIAlertsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
def initialize(self, sse_alert_queues, web_server_metrics):
self.sse_alert_queues = sse_alert_queues
self.web_server_metrics = web_server_metrics
def open(self):
try:
# Metrics
self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC)
self.web_server_metrics["api_access_counter"] += 1
self.web_server_metrics["status"] = "OK"
api_requests_counter.inc()
# Create a alert queue and add it to the web server's list. The web server will fill this when alerts arrive
self.alert_queue = Queue(maxsize=100)
self.sse_alert_queues.append(self.alert_queue)
# Set up a timed callback to check if anything is in the queue
self.heartbeat = tornado.ioloop.PeriodicCallback(self._callback, 1000)
self.heartbeat.start()
except Exception as e:
logging.warn("Exception when serving SSE socket", e)
# When the user closes the socket, empty our queue and remove it from the list so the server no longer fills it
def close(self):
try:
if self.alert_queue in self.sse_alert_queues:
self.sse_alert_queues.remove(self.alert_queue)
self.alert_queue.empty()
except:
pass
self.alert_queue = None
super().close()
# Callback to check if anything has arrived in the queue, and if so send it to the client
def _callback(self):
try:
if self.alert_queue:
while not self.alert_queue.empty():
alert = self.alert_queue.get()
self.write_message(msg=json.dumps(alert, default=serialize_everything))
if self.alert_queue not in self.sse_alert_queues:
logging.error("Web server cleared up a queue of an active connection!")
self.close()
except:
logging.warn("Exception in SSE callback, connection will be closed.")
self.close()

View File

@@ -1,9 +1,11 @@
import json
import logging
from datetime import datetime, timedelta
from queue import Queue
import pytz
import tornado
import tornado_eventsource.handler
from core.prometheus_metrics_handler import api_requests_counter
from core.utils import serialize_everything
@@ -44,28 +46,55 @@ class APISpotsHandler(tornado.web.RequestHandler):
# API request handler for /api/v1/spots/stream
class APISpotsStreamHandler(tornado.web.RequestHandler):
def get(self):
# todo
# try:
# # Metrics
# api_requests_counter.inc()
#
# response.content_type = 'text/event-stream'
# response.cache_control = 'no-cache'
# yield 'retry: 1000\n\n'
#
# spot_queue = Queue(maxsize=100)
# self.sse_spot_queues.append(spot_queue)
# while True:
# if spot_queue.empty():
# gevent.sleep(1)
# else:
# spot = spot_queue.get()
# yield 'data: ' + json.dumps(spot, default=serialize_everything) + '\n\n'
# except Exception as e:
# logging.warn("Exception when serving SSE socket", e)
pass
class APISpotsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
def initialize(self, sse_spot_queues, web_server_metrics):
self.sse_spot_queues = sse_spot_queues
self.web_server_metrics = web_server_metrics
# Called once on the client opening a connection, set things up
def open(self):
try:
# Metrics
self.web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC)
self.web_server_metrics["api_access_counter"] += 1
self.web_server_metrics["status"] = "OK"
api_requests_counter.inc()
# Create a spot queue and add it to the web server's list. The web server will fill this when spots arrive
self.spot_queue = Queue(maxsize=1000)
self.sse_spot_queues.append(self.spot_queue)
# Set up a timed callback to check if anything is in the queue
self.heartbeat = tornado.ioloop.PeriodicCallback(self._callback, 1000)
self.heartbeat.start()
except Exception as e:
logging.warn("Exception when serving SSE socket", e)
# When the user closes the socket, empty our queue and remove it from the list so the server no longer fills it
def close(self):
try:
if self.spot_queue in self.sse_spot_queues:
self.sse_spot_queues.remove(self.spot_queue)
self.spot_queue.empty()
except:
pass
self.spot_queue = None
super().close()
# Callback to check if anything has arrived in the queue, and if so send it to the client
def _callback(self):
try:
if self.spot_queue:
while not self.spot_queue.empty():
spot = self.spot_queue.get()
self.write_message(msg=json.dumps(spot, default=serialize_everything))
if self.spot_queue not in self.sse_spot_queues:
logging.error("Web server cleared up a queue of an active connection!")
self.close()
except:
logging.warn("Exception in SSE callback, connection will be closed.")
self.close()

View File

@@ -15,8 +15,6 @@ from server.handlers.pagetemplate import PageTemplateHandler
# Provides the public-facing web server.
# TODO SSE API responses
# TODO clean_up_sse_queues
class WebServer:
# Constructor
def __init__(self, spots, alerts, status_data, port):
@@ -49,8 +47,8 @@ class WebServer:
# Routes for API calls
(r"/api/v1/spots", APISpotsHandler, {"spots": self.spots, "web_server_metrics": self.web_server_metrics}),
(r"/api/v1/alerts", APIAlertsHandler, {"alerts": self.alerts, "web_server_metrics": self.web_server_metrics}),
(r"/api/v1/spots/stream", APISpotsStreamHandler), # todo provide queues?
(r"/api/v1/alerts/stream", APIAlertsStreamHandler), # todo provide queues?
(r"/api/v1/spots/stream", APISpotsStreamHandler, {"sse_spot_queues": self.sse_spot_queues, "web_server_metrics": self.web_server_metrics}),
(r"/api/v1/alerts/stream", APIAlertsStreamHandler, {"sse_alert_queues": self.sse_alert_queues, "web_server_metrics": self.web_server_metrics}),
(r"/api/v1/options", APIOptionsHandler, {"status_data": self.status_data, "web_server_metrics": self.web_server_metrics}),
(r"/api/v1/status", APIStatusHandler, {"status_data": self.status_data, "web_server_metrics": self.web_server_metrics}),
(r"/api/v1/lookup/call", APILookupCallHandler, {"web_server_metrics": self.web_server_metrics}),
@@ -71,37 +69,35 @@ class WebServer:
(r"/(.*)", StaticFileHandler, {"path": os.path.join(os.path.dirname(__file__), "../webassets")}),
],
template_path=os.path.join(os.path.dirname(__file__), "../templates"),
debug=True) # todo set false
debug=False)
app.listen(self.port)
await self.shutdown_event.wait()
# Internal method called when a new spot is added to the system. This is used to ping any SSE clients that are
# awaiting a server-sent message with new spots.
def notify_new_spot(self, spot):
# todo
# for queue in self.sse_spot_queues:
# try:
# queue.put(spot)
# except:
# # Cleanup thread was probably deleting the queue, that's fine
# pass
for queue in self.sse_spot_queues:
try:
queue.put(spot)
except:
# Cleanup thread was probably deleting the queue, that's fine
pass
pass
# Internal method called when a new alert is added to the system. This is used to ping any SSE clients that are
# awaiting a server-sent message with new spots.
def notify_new_alert(self, alert):
# todo
# for queue in self.sse_alert_queues:
# try:
# queue.put(alert)
# except:
# # Cleanup thread was probably deleting the queue, that's fine
# pass
for queue in self.sse_alert_queues:
try:
queue.put(alert)
except:
# Cleanup thread was probably deleting the queue, that's fine
pass
pass
# Clean up any SSE queues that are growing too large; probably their client disconnected.
# Clean up any SSE queues that are growing too large; probably their client disconnected and we didn't catch it
# properly for some reason.
def clean_up_sse_queues(self):
# todo
# self.sse_spot_queues = [q for q in self.sse_spot_queues if not q.full()]
# self.sse_alert_queues = [q for q in self.sse_alert_queues if not q.full()]
self.sse_spot_queues = [q for q in self.sse_spot_queues if not q.full()]
self.sse_alert_queues = [q for q in self.sse_alert_queues if not q.full()]
pass

View File

@@ -63,7 +63,7 @@
<li class="nav-item ms-4"><a href="/bands" class="nav-link" id="nav-link-bands"><i class="fa-solid fa-ruler-vertical"></i> Bands</a></li>
<li class="nav-item ms-4"><a href="/alerts" class="nav-link" id="nav-link-alerts"><i class="fa-solid fa-bell"></i> Alerts</a></li>
{% if allow_spotting %}
<li class="nav-item ms-4"><a href="/add-spot" class="nav-link" id="nav-link-add-spot"><i class="fa-solid fa-comment"></i> Add Spot</a></li>
<li class="nav-item ms-4"><a href="/add-spot" class="nav-link" id="nav-link-add-spot"><i class="fa-solid fa-comment"></i> Add&nbsp;Spot</a></li>
{% end %}
<li class="nav-item ms-4"><a href="/status" class="nav-link" id="nav-link-status"><i class="fa-solid fa-chart-simple"></i> Status</a></li>
<li class="nav-item ms-4"><a href="/about" class="nav-link" id="nav-link-about"><i class="fa-solid fa-circle-info"></i> About</a></li>

View File

@@ -132,7 +132,7 @@ function updateRefreshDisplay() {
}
$("#timing-container").html("Last updated at " + lastUpdateTime.format('HH:mm') + " UTC. " + updatingString);
} else {
$("#timing-container").html("Connected to live spot server. Last spot at " + lastUpdateTime.format('HH:mm') + " UTC.");
$("#timing-container").html("Connected to live spot server. Last spot received at " + lastUpdateTime.format('HH:mm') + " UTC.");
}
}
}

View File

@@ -45,6 +45,9 @@ function restartSSEConnection() {
spots = spots.slice(0, -1);
// Add spot to table
addSpotToTopOfTable(newSpot, true);
// Drop oldest spot off the end of the table. This is two rows because of the mobile view extra rows
$("#table tbody tr").last().remove();
$("#table tbody tr").last().remove();
};
evtSource.onerror = function(err) {
@@ -216,9 +219,8 @@ function createNewTableRowsForSpot(s, highlightNew) {
// Format the mode
mode_string = s["mode"];
if (s["mode"] == null) {
mode_string = "???";
}
if (s["mode_source"] == "BANDPLAN") {
mode_string = "";
} else if (s["mode_source"] == "BANDPLAN") {
mode_string = mode_string + "<span class='mode-q hideonmobile'><i class='fa-solid fa-circle-question' title='The mode was not reported via the spotting service. This is a guess based on the frequency.'></i></span>";
}