Fix SSE connections not respecting filters #3

This commit is contained in:
Ian Renton
2025-12-23 22:24:30 +00:00
parent 7fe478e040
commit 70dc1b495c
6 changed files with 67 additions and 15 deletions

View File

@@ -58,6 +58,10 @@ class APIAlertsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
self.web_server_metrics["status"] = "OK"
api_requests_counter.inc()
# request.arguments contains lists for each param key because technically the client can supply multiple,
# reduce that to just the first entry, and convert bytes to string
self.query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()}
# Create a alert queue and add it to the web server's list. The web server will fill this when alerts arrive
self.alert_queue = Queue(maxsize=100)
self.sse_alert_queues.append(self.alert_queue)
@@ -86,7 +90,10 @@ class APIAlertsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
if self.alert_queue:
while not self.alert_queue.empty():
alert = self.alert_queue.get()
self.write_message(msg=json.dumps(alert, default=serialize_everything))
# If the new alert matches our param filters, send it to the client. If not, ignore it.
if alert_allowed_by_query(alert, self.query_params):
self.write_message(msg=json.dumps(alert, default=serialize_everything))
if self.alert_queue not in self.sse_alert_queues:
logging.error("Web server cleared up a queue of an active connection!")
self.close()
@@ -155,4 +162,10 @@ def alert_allowed_by_query(alert, query):
dx_call_includes = query.get(k).strip()
if not alert.dx_call or dx_call_includes.upper() not in alert.dx_call.upper():
return False
case "text_includes":
text_includes = query.get(k).strip()
if (not alert.dx_call or text_includes.upper() not in alert.dx_call.upper()) \
and (not alert.comment or text_includes.upper() not in alert.comment.upper()) \
and (not alert.freqs_modes or text_includes.upper() not in alert.freqs_modes.upper()):
return False
return True

View File

@@ -60,6 +60,10 @@ class APISpotsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
self.web_server_metrics["status"] = "OK"
api_requests_counter.inc()
# request.arguments contains lists for each param key because technically the client can supply multiple,
# reduce that to just the first entry, and convert bytes to string
self.query_params = {k: v[0].decode("utf-8") for k, v in self.request.arguments.items()}
# Create a spot queue and add it to the web server's list. The web server will fill this when spots arrive
self.spot_queue = Queue(maxsize=1000)
self.sse_spot_queues.append(self.spot_queue)
@@ -88,7 +92,10 @@ class APISpotsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
if self.spot_queue:
while not self.spot_queue.empty():
spot = self.spot_queue.get()
self.write_message(msg=json.dumps(spot, default=serialize_everything))
# If the new spot matches our param filters, send it to the client. If not, ignore it.
if spot_allowed_by_query(spot, self.query_params):
self.write_message(msg=json.dumps(spot, default=serialize_everything))
if self.spot_queue not in self.sse_spot_queues:
logging.error("Web server cleared up a queue of an active connection!")
self.close()
@@ -206,6 +213,11 @@ def spot_allowed_by_query(spot, query):
dx_call_includes = query.get(k).strip()
if not spot.dx_call or dx_call_includes.upper() not in spot.dx_call.upper():
return False
case "text_includes":
text_includes = query.get(k).strip()
if (not spot.dx_call or text_includes.upper() not in spot.dx_call.upper()) \
and (not spot.comment or text_includes.upper() not in spot.comment.upper()):
return False
case "allow_qrt":
# If false, spots that are flagged as QRT are not returned.
prevent_qrt = query.get(k).upper() == "FALSE"