mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-06-24 05:35:10 +00:00
SSE server reliability improvements
This commit is contained in:
@@ -100,6 +100,9 @@ class APISpotsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
|
||||
self._heartbeat = tornado.ioloop.PeriodicCallback(self._callback, SSE_HANDLER_QUEUE_CHECK_INTERVAL)
|
||||
self._heartbeat.start()
|
||||
|
||||
# Flush headers immediately so nginx doesn't time out waiting for a response
|
||||
self.write_message(msg="", event="keepalive")
|
||||
|
||||
except Exception as e:
|
||||
logging.warning("Exception when serving SSE socket", e)
|
||||
|
||||
@@ -124,14 +127,19 @@ class APISpotsStreamHandler(tornado_eventsource.handler.EventSourceHandler):
|
||||
|
||||
try:
|
||||
if self._spot_queue:
|
||||
while not self._spot_queue.empty():
|
||||
spot = self._spot_queue.get()
|
||||
# If the new spot matches our param filters, send it to the client. If not, ignore it.
|
||||
if spot_allowed_by_query(spot, self._query_params):
|
||||
if self._credentials:
|
||||
spot = copy.deepcopy(spot)
|
||||
spot.infer_missing(self._credentials)
|
||||
self.write_message(msg=json.dumps(spot, default=serialize_everything))
|
||||
if not self._spot_queue.empty():
|
||||
while not self._spot_queue.empty():
|
||||
spot = self._spot_queue.get()
|
||||
# If the new spot matches our param filters, send it to the client. If not, ignore it.
|
||||
if spot_allowed_by_query(spot, self._query_params):
|
||||
if self._credentials:
|
||||
spot = copy.deepcopy(spot)
|
||||
spot.infer_missing(self._credentials)
|
||||
self.write_message(msg=json.dumps(spot, default=serialize_everything))
|
||||
|
||||
else:
|
||||
# Send a keepalive comment if the queue was empty
|
||||
self.write_message(msg="", event="keepalive")
|
||||
|
||||
if self._spot_queue not in self._sse_spot_queues:
|
||||
logging.error("Web server cleared up a queue of an active connection!")
|
||||
|
||||
Reference in New Issue
Block a user