First attempt at SSE backend #3

This commit is contained in:
Ian Renton
2025-12-22 12:04:35 +00:00
parent 968576f74c
commit c95c6bb347
7 changed files with 427 additions and 148 deletions

View File

@@ -2,6 +2,7 @@ import json
import logging
import re
from datetime import datetime, timedelta
from queue import Queue
from threading import Thread
import bottle
@@ -28,6 +29,8 @@ class WebServer:
self.api_access_counter = 0
self.spots = spots
self.alerts = alerts
self.sse_spot_queues = []
self.sse_alert_queues = []
self.status_data = status_data
self.port = port
self.thread = Thread(target=self.run)
@@ -41,6 +44,8 @@ class WebServer:
# Routes for API calls
bottle.get("/api/v1/spots")(lambda: self.serve_spots_api())
bottle.get("/api/v1/alerts")(lambda: self.serve_alerts_api())
bottle.get("/api/v1/spots/stream")(lambda: self.serve_sse_spots_api())
bottle.get("/api/v1/alerts/stream")(lambda: self.serve_sse_alerts_api())
bottle.get("/api/v1/options")(lambda: self.serve_api(self.get_options()))
bottle.get("/api/v1/status")(lambda: self.serve_api(self.status_data))
bottle.get("/api/v1/lookup/call")(lambda: self.serve_call_lookup_api())
@@ -102,6 +107,31 @@ class WebServer:
response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything)
# Serve the SSE JSON API /spots/stream endpoint
def serve_sse_spots_api(self):
response.content_type = 'text/event-stream'
response.cache_control = 'no-cache'
yield 'retry: 1000\n\n'
spot_queue = Queue(maxsize=100)
self.sse_spot_queues.append(spot_queue)
while True:
spot = spot_queue.get()
yield 'data: ' + json.dumps(spot, default=serialize_everything) + '\n\n'
# Serve the SSE JSON API /alerts/stream endpoint
def serve_sse_alerts_api(self):
response.content_type = 'text/event-stream'
response.cache_control = 'no-cache'
yield 'retry: 1000\n\n'
alert_queue = Queue(maxsize=100)
self.sse_alert_queues.append(alert_queue)
while True:
alert = alert_queue.get()
yield 'data: ' + json.dumps(alert, default=serialize_everything) + '\n\n'
# Look up data for a callsign
def serve_call_lookup_api(self):
try:
@@ -303,16 +333,9 @@ class WebServer:
# Get the query (and the right one, with Bottle magic. This is a MultiDict object)
query = bottle.request.query
# Create a shallow copy of the spot list, ordered by spot time. We'll then filter it accordingly.
# We can filter by spot time and received time with "since" and "received_since", which take a UNIX timestamp
# in seconds UTC.
# We can also filter by source, sig, band, mode, dx_continent and de_continent. Each of these accepts a single
# value or a comma-separated list.
# We can filter by comments, accepting a single string, where the API will only return spots where the comment
# contains the provided value (case-insensitive).
# We can "de-dupe" spots, so only the latest spot will be sent for each callsign.
# We can provide a "limit" number as well. Spots are always returned newest-first; "limit" limits to only the
# most recent X spots.
# Create a shallow copy of the spot list, ordered by spot time, then filter the list to reduce it only to spots
# that match the filter parameters in the query string. Finally, apply a limit to the number of spots returned.
# The list of query string filters is defined in the API docs.
spot_ids = list(self.spots.iterkeys())
spots = []
for k in spot_ids:
@@ -320,87 +343,29 @@ class WebServer:
if s is not None:
spots.append(s)
spots = sorted(spots, key=lambda spot: (spot.time if spot and spot.time else 0), reverse=True)
for k in query.keys():
match k:
case "since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp()
spots = [s for s in spots if s.time and s.time > since]
case "max_age":
max_age = int(query.get(k))
since = (datetime.now(pytz.UTC) - timedelta(seconds=max_age)).timestamp()
spots = [s for s in spots if s.time and s.time > since]
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp()
spots = [s for s in spots if s.received_time and s.received_time > since]
case "source":
sources = query.get(k).split(",")
spots = [s for s in spots if s.source and s.source in sources]
case "sig":
# If a list of sigs is provided, the spot must have a sig and it must match one of them.
# The special "sig" "NO_SIG", when supplied in the list, mathches spots with no sig.
sigs = query.get(k).split(",")
include_no_sig = "NO_SIG" in sigs
spots = [s for s in spots if (s.sig and s.sig in sigs) or (include_no_sig and not s.sig)]
case "needs_sig":
# If true, a sig is required, regardless of what it is, it just can't be missing. Mutually
# exclusive with supplying the special "NO_SIG" parameter to the "sig" query param.
needs_sig = query.get(k).upper() == "TRUE"
if needs_sig:
spots = [s for s in spots if s.sig]
case "needs_sig_ref":
# If true, at least one sig ref is required, regardless of what it is, it just can't be missing.
needs_sig_ref = query.get(k).upper() == "TRUE"
if needs_sig_ref:
spots = [s for s in spots if s.sig_refs and len(s.sig_refs) > 0]
case "band":
bands = query.get(k).split(",")
spots = [s for s in spots if s.band and s.band in bands]
case "mode":
modes = query.get(k).split(",")
spots = [s for s in spots if s.mode in modes]
case "mode_type":
mode_families = query.get(k).split(",")
spots = [s for s in spots if s.mode_type and s.mode_type in mode_families]
case "dx_continent":
dxconts = query.get(k).split(",")
spots = [s for s in spots if s.dx_continent and s.dx_continent in dxconts]
case "de_continent":
deconts = query.get(k).split(",")
spots = [s for s in spots if s.de_continent and s.de_continent in deconts]
case "comment_includes":
comment_includes = query.get(k).strip()
spots = [s for s in spots if s.comment and comment_includes.upper() in s.comment.upper()]
case "dx_call_includes":
dx_call_includes = query.get(k).strip()
spots = [s for s in spots if s.dx_call and dx_call_includes.upper() in s.dx_call.upper()]
case "allow_qrt":
# If false, spots that are flagged as QRT are not returned.
prevent_qrt = query.get(k).upper() == "FALSE"
if prevent_qrt:
spots = [s for s in spots if not s.qrt or s.qrt == False]
case "needs_good_location":
# If true, spots require a "good" location to be returned
needs_good_location = query.get(k).upper() == "TRUE"
if needs_good_location:
spots = [s for s in spots if s.dx_location_good]
case "dedupe":
# Ensure only the latest spot of each callsign-SSID combo is present in the list. This relies on the
# list being in reverse time order, so if any future change allows re-ordering the list, that should
# be done *after* this. SSIDs are deliberately included here (see issue #68) because e.g. M0TRT-7
# and M0TRT-9 APRS transponders could well be in different locations, on different frequencies etc.
dedupe = query.get(k).upper() == "TRUE"
if dedupe:
spots_temp = []
already_seen = []
for s in spots:
call_plus_ssid = s.dx_call + (s.dx_ssid if s.dx_ssid else "")
if call_plus_ssid not in already_seen:
spots_temp.append(s)
already_seen.append(call_plus_ssid)
spots = spots_temp
# If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys.
spots = list(filter(lambda spot: spot_allowed_by_query(spot, query), spots))
if "limit" in query.keys():
spots = spots[:int(query.get("limit"))]
# Ensure only the latest spot of each callsign-SSID combo is present in the list. This relies on the
# list being in reverse time order, so if any future change allows re-ordering the list, that should
# be done *after* this. SSIDs are deliberately included here (see issue #68) because e.g. M0TRT-7
# and M0TRT-9 APRS transponders could well be in different locations, on different frequencies etc.
# This is a special consideration for the geo map and band map views (and Field Spotter) because while
# duplicates are fine in the main spot list (e.g. different cluster spots of the same DX) this doesn't
# work well for the other views.
if "dedupe" in query.keys():
dedupe = query.get("dedupe").upper() == "TRUE"
if dedupe:
spots_temp = []
already_seen = []
for s in spots:
call_plus_ssid = s.dx_call + (s.dx_ssid if s.dx_ssid else "")
if call_plus_ssid not in already_seen:
spots_temp.append(s)
already_seen.append(call_plus_ssid)
spots = spots_temp
return spots
# Utility method to apply filters to the overall alert list and return only a subset. Enables query parameters in
@@ -409,50 +374,17 @@ class WebServer:
# Get the query (and the right one, with Bottle magic. This is a MultiDict object)
query = bottle.request.query
# Create a shallow copy of the alert list, ordered by start time. We'll then filter it accordingly.
# We can filter by received time with "received_since", which take a UNIX timestamp in seconds UTC.
# We can also filter by source, sig, and dx_continent. Each of these accepts a single
# value or a comma-separated list.
# We can provide a "limit" number as well. Alerts are always returned newest-first; "limit" limits to only the
# most recent X alerts.
# Create a shallow copy of the alert list ordered by start time, then filter the list to reduce it only to alerts
# that match the filter parameters in the query string. Finally, apply a limit to the number of alerts returned.
# The list of query string filters is defined in the API docs.
alert_ids = list(self.alerts.iterkeys())
alerts = []
for k in alert_ids:
a = self.alerts.get(k)
if a is not None:
alerts.append(a)
# We never want alerts that seem to be in the past
alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0))
for k in query.keys():
match k:
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC)
alerts = [a for a in alerts if a.received_time and a.received_time > since]
case "max_duration":
max_duration = int(query.get(k))
# Check the duration if end_time is provided. If end_time is not provided, assume the activation is
# "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and
# the alert is a dxpedition, it also always passes the check.
dxpeditions_skip_check = bool(query.get(
"dxpeditions_skip_max_duration_check")) if "dxpeditions_skip_max_duration_check" in query.keys() else False
alerts = [a for a in alerts if (a.end_time and a.end_time - a.start_time <= max_duration) or
not a.end_time or (dxpeditions_skip_check and a.is_dxpedition)]
case "source":
sources = query.get(k).split(",")
alerts = [a for a in alerts if a.source and a.source in sources]
case "sig":
# If a list of sigs is provided, the alert must have a sig and it must match one of them.
# The special "sig" "NO_SIG", when supplied in the list, mathches alerts with no sig.
sigs = query.get(k).split(",")
include_no_sig = "NO_SIG" in sigs
spots = [a for a in alerts if (a.sig and a.sig in sigs) or (include_no_sig and not a.sig)]
case "dx_continent":
dxconts = query.get(k).split(",")
alerts = [a for a in alerts if a.dx_continent and a.dx_continent in dxconts]
case "dx_call_includes":
dx_call_includes = query.get(k).strip()
spots = [a for a in alerts if a.dx_call and dx_call_includes.upper() in a.dx_call.upper()]
# If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys.
alerts = list(filter(lambda alert: alert_allowed_by_query(alert, query), alerts))
if "limit" in query.keys():
alerts = alerts[:int(query.get("limit"))]
return alerts
@@ -481,6 +413,155 @@ class WebServer:
return options
# Internal method called when a new spot is added to the system. This is used to ping any SSE clients that are
# awaiting a server-sent message with new spots.
def notify_new_spot(self, spot):
for queue in self.sse_spot_queues:
try:
queue.put(spot)
except:
# Cleanup thread was probably deleting the queue, that's fine
pass
# Internal method called when a new alert is added to the system. This is used to ping any SSE clients that are
# awaiting a server-sent message with new spots.
def notify_new_alert(self, alert):
for queue in self.sse_alert_queues:
try:
queue.put(alert)
except:
# Cleanup thread was probably deleting the queue, that's fine
pass
# Clean up any SSE queues that are growing too large; probably their client disconnected.
def clean_up_sse_queues(self):
self.sse_spot_queues = [q for q in self.sse_spot_queues if not q.full()]
self.sse_alert_queues = [q for q in self.sse_alert_queues if not q.full()]
# Given URL query params and a spot, figure out if the spot "passes" the requested filters or is rejected. The list
# of query parameters and their function is defined in the API docs.
def spot_allowed_by_query(spot, query):
for k in query.keys():
match k:
case "since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp()
if not spot.time or spot.time <= since:
return False
case "max_age":
max_age = int(query.get(k))
since = (datetime.now(pytz.UTC) - timedelta(seconds=max_age)).timestamp()
if not spot.time or spot.time <= since:
return False
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp()
if not spot.received_time or spot.received_time <= since:
return False
case "source":
sources = query.get(k).split(",")
if not spot.source or spot.source not in sources:
return False
case "sig":
# If a list of sigs is provided, the spot must have a sig and it must match one of them.
# The special "sig" "NO_SIG", when supplied in the list, mathches spots with no sig.
sigs = query.get(k).split(",")
include_no_sig = "NO_SIG" in sigs
if not spot.sig and not include_no_sig:
return False
if spot.sig and spot.sig not in sigs:
return False
case "needs_sig":
# If true, a sig is required, regardless of what it is, it just can't be missing. Mutually
# exclusive with supplying the special "NO_SIG" parameter to the "sig" query param.
needs_sig = query.get(k).upper() == "TRUE"
if needs_sig and not spot.sig:
return False
case "needs_sig_ref":
# If true, at least one sig ref is required, regardless of what it is, it just can't be missing.
needs_sig_ref = query.get(k).upper() == "TRUE"
if needs_sig_ref and (not spot.sig_refs or len(spot.sig_refs) == 0):
return False
case "band":
bands = query.get(k).split(",")
if not spot.band or spot.band not in bands:
return False
case "mode":
modes = query.get(k).split(",")
if not spot.mode or spot.mode not in modes:
return False
case "mode_type":
mode_types = query.get(k).split(",")
if not spot.mode_type or spot.mode_type not in mode_types:
return False
case "dx_continent":
dxconts = query.get(k).split(",")
if not spot.dx_continent or spot.dx_continent not in dxconts:
return False
case "de_continent":
deconts = query.get(k).split(",")
if not spot.de_continent or spot.de_continent not in deconts:
return False
case "comment_includes":
comment_includes = query.get(k).strip()
if not spot.comment or comment_includes.upper() not in spot.comment.upper():
return False
case "dx_call_includes":
dx_call_includes = query.get(k).strip()
if not spot.dx_call or dx_call_includes.upper() not in spot.dx_call.upper():
return False
case "allow_qrt":
# If false, spots that are flagged as QRT are not returned.
prevent_qrt = query.get(k).upper() == "FALSE"
if prevent_qrt and spot.qrt and spot.qrt == True:
return False
case "needs_good_location":
# If true, spots require a "good" location to be returned
needs_good_location = query.get(k).upper() == "TRUE"
if needs_good_location and not spot.dx_location_good:
return False
return True
# Given URL query params and an alert, figure out if the alert "passes" the requested filters or is rejected. The list
# of query parameters and their function is defined in the API docs.
def alert_allowed_by_query(alert, query):
for k in query.keys():
match k:
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC)
if not alert.received_time or alert.received_time <= since:
return False
case "max_duration":
max_duration = int(query.get(k))
# Check the duration if end_time is provided. If end_time is not provided, assume the activation is
# "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and
# the alert is a dxpedition, it also always passes the check.
if alert.is_dxpedition and (bool(query.get(
"dxpeditions_skip_max_duration_check")) if "dxpeditions_skip_max_duration_check" in query.keys() else False):
continue
if alert.end_time and alert.start_time and alert.end_time - alert.start_time > max_duration:
return False
case "source":
sources = query.get(k).split(",")
if not alert.source or alert.source not in sources:
return False
case "sig":
# If a list of sigs is provided, the alert must have a sig and it must match one of them.
# The special "sig" "NO_SIG", when supplied in the list, mathches alerts with no sig.
sigs = query.get(k).split(",")
include_no_sig = "NO_SIG" in sigs
if not alert.sig and not include_no_sig:
return False
if alert.sig and alert.sig not in sigs:
return False
case "dx_continent":
dxconts = query.get(k).split(",")
if not alert.dx_continent or alert.dx_continent not in dxconts:
return False
case "dx_call_includes":
dx_call_includes = query.get(k).strip()
if not alert.dx_call or dx_call_includes.upper() not in alert.dx_call.upper():
return False
return True
# Convert objects to serialisable things. Used by JSON serialiser as a default when it encounters unserializable things.
# Just converts objects to dict. Try to avoid doing anything clever here when serialising spots, because we also need