mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-02-04 09:14:30 +00:00
Fix issue with SSE queues getting lost #3
This commit is contained in:
@@ -30,6 +30,9 @@ class AlertProvider:
|
|||||||
# because alerts could be created at any point for any time in the future. Rely on hashcode-based id matching
|
# because alerts could be created at any point for any time in the future. Rely on hashcode-based id matching
|
||||||
# to deal with duplicates.
|
# to deal with duplicates.
|
||||||
def submit_batch(self, alerts):
|
def submit_batch(self, alerts):
|
||||||
|
# Sort the batch so that earliest ones go in first. This helps keep the ordering correct when alerts are fired
|
||||||
|
# off to SSE listeners.
|
||||||
|
alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0))
|
||||||
for alert in alerts:
|
for alert in alerts:
|
||||||
# Fill in any blanks and add to the list
|
# Fill in any blanks and add to the list
|
||||||
alert.infer_missing()
|
alert.infer_missing()
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import tornado
|
import tornado
|
||||||
@@ -98,6 +99,22 @@ class WebServer:
|
|||||||
# Clean up any SSE queues that are growing too large; probably their client disconnected and we didn't catch it
|
# Clean up any SSE queues that are growing too large; probably their client disconnected and we didn't catch it
|
||||||
# properly for some reason.
|
# properly for some reason.
|
||||||
def clean_up_sse_queues(self):
|
def clean_up_sse_queues(self):
|
||||||
self.sse_spot_queues = [q for q in self.sse_spot_queues if not q.full()]
|
for q in self.sse_spot_queues:
|
||||||
self.sse_alert_queues = [q for q in self.sse_alert_queues if not q.full()]
|
try:
|
||||||
|
if q.full():
|
||||||
|
logging.warn("A full SSE spot queue was found, presumably because the client disconnected strangely. It has been removed.")
|
||||||
|
self.sse_spot_queues.remove(q)
|
||||||
|
q.empty()
|
||||||
|
except:
|
||||||
|
# Probably got deleted already on another thread
|
||||||
|
pass
|
||||||
|
for q in self.sse_alert_queues:
|
||||||
|
try:
|
||||||
|
if q.full():
|
||||||
|
logging.warn("A full SSE alert queue was found, presumably because the client disconnected strangely. It has been removed.")
|
||||||
|
self.sse_alert_queues.remove(q)
|
||||||
|
q.empty()
|
||||||
|
except:
|
||||||
|
# Probably got deleted already on another thread
|
||||||
|
pass
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -32,6 +32,9 @@ class SpotProvider:
|
|||||||
# their infer_missing() method called to complete their data set. This is called by the API-querying
|
# their infer_missing() method called to complete their data set. This is called by the API-querying
|
||||||
# subclasses on receiving spots.
|
# subclasses on receiving spots.
|
||||||
def submit_batch(self, spots):
|
def submit_batch(self, spots):
|
||||||
|
# Sort the batch so that earliest ones go in first. This helps keep the ordering correct when spots are fired
|
||||||
|
# off to SSE listeners.
|
||||||
|
spots = sorted(spots, key=lambda spot: (spot.time if spot and spot.time else 0))
|
||||||
for spot in spots:
|
for spot in spots:
|
||||||
if datetime.fromtimestamp(spot.time, pytz.UTC) > self.last_spot_time:
|
if datetime.fromtimestamp(spot.time, pytz.UTC) > self.last_spot_time:
|
||||||
# Fill in any blanks and add to the list
|
# Fill in any blanks and add to the list
|
||||||
|
|||||||
Reference in New Issue
Block a user