diff --git a/alertproviders/alert_provider.py b/alertproviders/alert_provider.py index 943558e..d961412 100644 --- a/alertproviders/alert_provider.py +++ b/alertproviders/alert_provider.py @@ -3,7 +3,6 @@ from datetime import datetime import pytz from core.config import MAX_ALERT_AGE -from spothole import add_alert # Generic alert provider class. Subclasses of this query the individual APIs for alerts. @@ -16,10 +15,12 @@ class AlertProvider: self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC) self.status = "Not Started" if self.enabled else "Disabled" self.alerts = None + self.web_server = None # Set up the provider, e.g. giving it the alert list to work from - def setup(self, alerts): + def setup(self, alerts, web_server): self.alerts = alerts + self.web_server = web_server # Start the provider. This should return immediately after spawning threads to access the remote resources def start(self): @@ -32,7 +33,14 @@ class AlertProvider: for alert in alerts: # Fill in any blanks and add to the list alert.infer_missing() - add_alert(alert) + self.add_alert(alert) + + def add_alert(self, alert): + if not alert.expired(): + self.alerts.add(alert.id, alert, expire=MAX_ALERT_AGE) + # Ping the web server in case we have any SSE connections that need to see this immediately + if self.web_server: + self.web_server.notify_new_alert(alert) # Stop any threads and prepare for application shutdown def stop(self): diff --git a/server/webserver.py b/server/webserver.py index e220c6c..b807372 100644 --- a/server/webserver.py +++ b/server/webserver.py @@ -6,6 +6,7 @@ from queue import Queue from threading import Thread import bottle +import gevent import pytz from bottle import run, request, response, template @@ -73,7 +74,7 @@ class WebServer: def run(self): logging.info("Starting web server on port " + str(self.port) + "...") self.status = "Waiting" - run(host='localhost', port=self.port) + run(host='localhost', port=self.port, server="gevent") # Serve the JSON API /spots endpoint def serve_spots_api(self): @@ -116,8 +117,11 @@ class WebServer: spot_queue = Queue(maxsize=100) self.sse_spot_queues.append(spot_queue) while True: - spot = spot_queue.get() - yield 'data: ' + json.dumps(spot, default=serialize_everything) + '\n\n' + if spot_queue.empty(): + gevent.sleep(1) + else: + spot = spot_queue.get() + yield 'data: ' + json.dumps(spot, default=serialize_everything) + '\n\n' # Serve the SSE JSON API /alerts/stream endpoint @@ -129,8 +133,11 @@ class WebServer: alert_queue = Queue(maxsize=100) self.sse_alert_queues.append(alert_queue) while True: - alert = alert_queue.get() - yield 'data: ' + json.dumps(alert, default=serialize_everything) + '\n\n' + if alert_queue.empty(): + gevent.sleep(1) + else: + alert = alert_queue.get() + yield 'data: ' + json.dumps(alert, default=serialize_everything) + '\n\n' # Look up data for a callsign def serve_call_lookup_api(self): diff --git a/spothole.py b/spothole.py index ba6b587..cc4f81b 100644 --- a/spothole.py +++ b/spothole.py @@ -52,22 +52,6 @@ def get_alert_provider_from_config(config_providers_entry): provider_class = getattr(module, config_providers_entry["class"]) return provider_class(config_providers_entry) -# Utility method to add a spot, notifying the web server in case any Server-Sent Event connections need to have data -# sent immediately. If the spot has already expired due to loading old data, it will be ignored. -def add_spot(spot): - if not spot.expired(): - spots.add(spot.id, spot, expire=MAX_SPOT_AGE) - if web_server: - web_server.notify_new_spot(spot) - -# Utility method to add an alert, notifying the web server in case any Server-Sent Event connections need to have data -# sent immediately. If the alert has already expired due to loading old data, it will be ignored. -def add_alert(alert): - if not alert.expired(): - alerts.add(alert.id, alert, expire=MAX_SPOT_AGE) - if web_server: - web_server.notify_new_alert(alert) - # Main function if __name__ == '__main__': @@ -98,7 +82,7 @@ if __name__ == '__main__': for entry in config["spot-providers"]: spot_providers.append(get_spot_provider_from_config(entry)) for p in spot_providers: - p.setup(spots=spots) + p.setup(spots=spots, web_server=web_server) if p.enabled: p.start() @@ -106,7 +90,7 @@ if __name__ == '__main__': for entry in config["alert-providers"]: alert_providers.append(get_alert_provider_from_config(entry)) for p in alert_providers: - p.setup(alerts=alerts) + p.setup(alerts=alerts, web_server=web_server) if p.enabled: p.start() diff --git a/spotproviders/spot_provider.py b/spotproviders/spot_provider.py index 465e4b7..e848e01 100644 --- a/spotproviders/spot_provider.py +++ b/spotproviders/spot_provider.py @@ -3,7 +3,6 @@ from datetime import datetime import pytz from core.config import MAX_SPOT_AGE -from spothole import add_spot # Generic spot provider class. Subclasses of this query the individual APIs for data. @@ -17,10 +16,12 @@ class SpotProvider: self.last_spot_time = datetime.min.replace(tzinfo=pytz.UTC) self.status = "Not Started" if self.enabled else "Disabled" self.spots = None + self.web_server = None # Set up the provider, e.g. giving it the spot list to work from - def setup(self, spots): + def setup(self, spots, web_server): self.spots = spots + self.web_server = web_server # Start the provider. This should return immediately after spawning threads to access the remote resources def start(self): @@ -35,7 +36,7 @@ class SpotProvider: if datetime.fromtimestamp(spot.time, pytz.UTC) > self.last_spot_time: # Fill in any blanks and add to the list spot.infer_missing() - add_spot(spot) + self.add_spot(spot) self.last_spot_time = datetime.fromtimestamp(max(map(lambda s: s.time, spots)), pytz.UTC) # Submit a single spot retrieved from the provider. This will be added to the list regardless of its age. Spots @@ -44,9 +45,16 @@ class SpotProvider: def submit(self, spot): # Fill in any blanks and add to the list spot.infer_missing() - add_spot(spot) + self.add_spot(spot) self.last_spot_time = datetime.fromtimestamp(spot.time, pytz.UTC) + def add_spot(self, spot): + if not spot.expired(): + self.spots.add(spot.id, spot, expire=MAX_SPOT_AGE) + # Ping the web server in case we have any SSE connections that need to see this immediately + if self.web_server: + self.web_server.notify_new_spot(spot) + # Stop any threads and prepare for application shutdown def stop(self): raise NotImplementedError("Subclasses must implement this method") \ No newline at end of file