First attempt at SSE backend #3

This commit is contained in:
Ian Renton
2025-12-22 13:02:11 +00:00
parent befaceb2f5
commit fd2986f310
4 changed files with 37 additions and 30 deletions

View File

@@ -3,7 +3,6 @@ from datetime import datetime
import pytz import pytz
from core.config import MAX_ALERT_AGE from core.config import MAX_ALERT_AGE
from spothole import add_alert
# Generic alert provider class. Subclasses of this query the individual APIs for alerts. # Generic alert provider class. Subclasses of this query the individual APIs for alerts.
@@ -16,10 +15,12 @@ class AlertProvider:
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC) self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
self.status = "Not Started" if self.enabled else "Disabled" self.status = "Not Started" if self.enabled else "Disabled"
self.alerts = None self.alerts = None
self.web_server = None
# Set up the provider, e.g. giving it the alert list to work from # Set up the provider, e.g. giving it the alert list to work from
def setup(self, alerts): def setup(self, alerts, web_server):
self.alerts = alerts self.alerts = alerts
self.web_server = web_server
# Start the provider. This should return immediately after spawning threads to access the remote resources # Start the provider. This should return immediately after spawning threads to access the remote resources
def start(self): def start(self):
@@ -32,7 +33,14 @@ class AlertProvider:
for alert in alerts: for alert in alerts:
# Fill in any blanks and add to the list # Fill in any blanks and add to the list
alert.infer_missing() alert.infer_missing()
add_alert(alert) self.add_alert(alert)
def add_alert(self, alert):
if not alert.expired():
self.alerts.add(alert.id, alert, expire=MAX_ALERT_AGE)
# Ping the web server in case we have any SSE connections that need to see this immediately
if self.web_server:
self.web_server.notify_new_alert(alert)
# Stop any threads and prepare for application shutdown # Stop any threads and prepare for application shutdown
def stop(self): def stop(self):

View File

@@ -6,6 +6,7 @@ from queue import Queue
from threading import Thread from threading import Thread
import bottle import bottle
import gevent
import pytz import pytz
from bottle import run, request, response, template from bottle import run, request, response, template
@@ -73,7 +74,7 @@ class WebServer:
def run(self): def run(self):
logging.info("Starting web server on port " + str(self.port) + "...") logging.info("Starting web server on port " + str(self.port) + "...")
self.status = "Waiting" self.status = "Waiting"
run(host='localhost', port=self.port) run(host='localhost', port=self.port, server="gevent")
# Serve the JSON API /spots endpoint # Serve the JSON API /spots endpoint
def serve_spots_api(self): def serve_spots_api(self):
@@ -116,8 +117,11 @@ class WebServer:
spot_queue = Queue(maxsize=100) spot_queue = Queue(maxsize=100)
self.sse_spot_queues.append(spot_queue) self.sse_spot_queues.append(spot_queue)
while True: while True:
spot = spot_queue.get() if spot_queue.empty():
yield 'data: ' + json.dumps(spot, default=serialize_everything) + '\n\n' gevent.sleep(1)
else:
spot = spot_queue.get()
yield 'data: ' + json.dumps(spot, default=serialize_everything) + '\n\n'
# Serve the SSE JSON API /alerts/stream endpoint # Serve the SSE JSON API /alerts/stream endpoint
@@ -129,8 +133,11 @@ class WebServer:
alert_queue = Queue(maxsize=100) alert_queue = Queue(maxsize=100)
self.sse_alert_queues.append(alert_queue) self.sse_alert_queues.append(alert_queue)
while True: while True:
alert = alert_queue.get() if alert_queue.empty():
yield 'data: ' + json.dumps(alert, default=serialize_everything) + '\n\n' gevent.sleep(1)
else:
alert = alert_queue.get()
yield 'data: ' + json.dumps(alert, default=serialize_everything) + '\n\n'
# Look up data for a callsign # Look up data for a callsign
def serve_call_lookup_api(self): def serve_call_lookup_api(self):

View File

@@ -52,22 +52,6 @@ def get_alert_provider_from_config(config_providers_entry):
provider_class = getattr(module, config_providers_entry["class"]) provider_class = getattr(module, config_providers_entry["class"])
return provider_class(config_providers_entry) return provider_class(config_providers_entry)
# Utility method to add a spot, notifying the web server in case any Server-Sent Event connections need to have data
# sent immediately. If the spot has already expired due to loading old data, it will be ignored.
def add_spot(spot):
if not spot.expired():
spots.add(spot.id, spot, expire=MAX_SPOT_AGE)
if web_server:
web_server.notify_new_spot(spot)
# Utility method to add an alert, notifying the web server in case any Server-Sent Event connections need to have data
# sent immediately. If the alert has already expired due to loading old data, it will be ignored.
def add_alert(alert):
if not alert.expired():
alerts.add(alert.id, alert, expire=MAX_SPOT_AGE)
if web_server:
web_server.notify_new_alert(alert)
# Main function # Main function
if __name__ == '__main__': if __name__ == '__main__':
@@ -98,7 +82,7 @@ if __name__ == '__main__':
for entry in config["spot-providers"]: for entry in config["spot-providers"]:
spot_providers.append(get_spot_provider_from_config(entry)) spot_providers.append(get_spot_provider_from_config(entry))
for p in spot_providers: for p in spot_providers:
p.setup(spots=spots) p.setup(spots=spots, web_server=web_server)
if p.enabled: if p.enabled:
p.start() p.start()
@@ -106,7 +90,7 @@ if __name__ == '__main__':
for entry in config["alert-providers"]: for entry in config["alert-providers"]:
alert_providers.append(get_alert_provider_from_config(entry)) alert_providers.append(get_alert_provider_from_config(entry))
for p in alert_providers: for p in alert_providers:
p.setup(alerts=alerts) p.setup(alerts=alerts, web_server=web_server)
if p.enabled: if p.enabled:
p.start() p.start()

View File

@@ -3,7 +3,6 @@ from datetime import datetime
import pytz import pytz
from core.config import MAX_SPOT_AGE from core.config import MAX_SPOT_AGE
from spothole import add_spot
# Generic spot provider class. Subclasses of this query the individual APIs for data. # Generic spot provider class. Subclasses of this query the individual APIs for data.
@@ -17,10 +16,12 @@ class SpotProvider:
self.last_spot_time = datetime.min.replace(tzinfo=pytz.UTC) self.last_spot_time = datetime.min.replace(tzinfo=pytz.UTC)
self.status = "Not Started" if self.enabled else "Disabled" self.status = "Not Started" if self.enabled else "Disabled"
self.spots = None self.spots = None
self.web_server = None
# Set up the provider, e.g. giving it the spot list to work from # Set up the provider, e.g. giving it the spot list to work from
def setup(self, spots): def setup(self, spots, web_server):
self.spots = spots self.spots = spots
self.web_server = web_server
# Start the provider. This should return immediately after spawning threads to access the remote resources # Start the provider. This should return immediately after spawning threads to access the remote resources
def start(self): def start(self):
@@ -35,7 +36,7 @@ class SpotProvider:
if datetime.fromtimestamp(spot.time, pytz.UTC) > self.last_spot_time: if datetime.fromtimestamp(spot.time, pytz.UTC) > self.last_spot_time:
# Fill in any blanks and add to the list # Fill in any blanks and add to the list
spot.infer_missing() spot.infer_missing()
add_spot(spot) self.add_spot(spot)
self.last_spot_time = datetime.fromtimestamp(max(map(lambda s: s.time, spots)), pytz.UTC) self.last_spot_time = datetime.fromtimestamp(max(map(lambda s: s.time, spots)), pytz.UTC)
# Submit a single spot retrieved from the provider. This will be added to the list regardless of its age. Spots # Submit a single spot retrieved from the provider. This will be added to the list regardless of its age. Spots
@@ -44,9 +45,16 @@ class SpotProvider:
def submit(self, spot): def submit(self, spot):
# Fill in any blanks and add to the list # Fill in any blanks and add to the list
spot.infer_missing() spot.infer_missing()
add_spot(spot) self.add_spot(spot)
self.last_spot_time = datetime.fromtimestamp(spot.time, pytz.UTC) self.last_spot_time = datetime.fromtimestamp(spot.time, pytz.UTC)
def add_spot(self, spot):
if not spot.expired():
self.spots.add(spot.id, spot, expire=MAX_SPOT_AGE)
# Ping the web server in case we have any SSE connections that need to see this immediately
if self.web_server:
self.web_server.notify_new_spot(spot)
# Stop any threads and prepare for application shutdown # Stop any threads and prepare for application shutdown
def stop(self): def stop(self):
raise NotImplementedError("Subclasses must implement this method") raise NotImplementedError("Subclasses must implement this method")