diff --git a/alertproviders/alert_provider.py b/alertproviders/alert_provider.py index d961412..038ddbb 100644 --- a/alertproviders/alert_provider.py +++ b/alertproviders/alert_provider.py @@ -30,6 +30,9 @@ class AlertProvider: # because alerts could be created at any point for any time in the future. Rely on hashcode-based id matching # to deal with duplicates. def submit_batch(self, alerts): + # Sort the batch so that earliest ones go in first. This helps keep the ordering correct when alerts are fired + # off to SSE listeners. + alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0)) for alert in alerts: # Fill in any blanks and add to the list alert.infer_missing() diff --git a/server/webserver.py b/server/webserver.py index ccefbd2..ba55a8e 100644 --- a/server/webserver.py +++ b/server/webserver.py @@ -1,4 +1,5 @@ import asyncio +import logging import os import tornado @@ -98,6 +99,22 @@ class WebServer: # Clean up any SSE queues that are growing too large; probably their client disconnected and we didn't catch it # properly for some reason. def clean_up_sse_queues(self): - self.sse_spot_queues = [q for q in self.sse_spot_queues if not q.full()] - self.sse_alert_queues = [q for q in self.sse_alert_queues if not q.full()] + for q in self.sse_spot_queues: + try: + if q.full(): + logging.warn("A full SSE spot queue was found, presumably because the client disconnected strangely. It has been removed.") + self.sse_spot_queues.remove(q) + q.empty() + except: + # Probably got deleted already on another thread + pass + for q in self.sse_alert_queues: + try: + if q.full(): + logging.warn("A full SSE alert queue was found, presumably because the client disconnected strangely. It has been removed.") + self.sse_alert_queues.remove(q) + q.empty() + except: + # Probably got deleted already on another thread + pass pass diff --git a/spotproviders/spot_provider.py b/spotproviders/spot_provider.py index e848e01..779acf8 100644 --- a/spotproviders/spot_provider.py +++ b/spotproviders/spot_provider.py @@ -32,6 +32,9 @@ class SpotProvider: # their infer_missing() method called to complete their data set. This is called by the API-querying # subclasses on receiving spots. def submit_batch(self, spots): + # Sort the batch so that earliest ones go in first. This helps keep the ordering correct when spots are fired + # off to SSE listeners. + spots = sorted(spots, key=lambda spot: (spot.time if spot and spot.time else 0)) for spot in spots: if datetime.fromtimestamp(spot.time, pytz.UTC) > self.last_spot_time: # Fill in any blanks and add to the list