Split up some code for sanity #3

This commit is contained in:
Ian Renton
2025-12-23 11:51:00 +00:00
parent fd246fc17b
commit 61784e8af6
12 changed files with 341 additions and 242 deletions

View File

@@ -1,20 +1,23 @@
import asyncio
import json
import os
from datetime import datetime, timedelta
import pytz
import tornado
from prometheus_client.openmetrics.exposition import CONTENT_TYPE_LATEST
from tornado.web import StaticFileHandler
from core.config import ALLOW_SPOTTING, MAX_SPOT_AGE, WEB_UI_OPTIONS
from core.constants import SOFTWARE_VERSION, BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS
from core.prometheus_metrics_handler import get_metrics, page_requests_counter
from server.handlers.api.addspot import APISpotHandler
from server.handlers.api.alerts import APIAlertsHandler, APIAlertsStreamHandler
from server.handlers.api.lookups import APILookupCallHandler, APILookupSIGRefHandler
from server.handlers.api.options import APIOptionsHandler
from server.handlers.api.spots import APISpotsHandler, APISpotsStreamHandler
from server.handlers.api.status import APIStatusHandler
from server.handlers.metrics import PrometheusMetricsHandler
from server.handlers.pagetemplate import PageTemplateHandler
# Provides the public-facing web server.
# TODO synchronous API responses
# TODO alerts API
# TODO lookup APIs
# TODO post spot API
# TODO SSE API responses
# TODO clean_up_sse_queues
# TODO page & API access counters - how to do from a subclass handler? e.g.
@@ -56,7 +59,7 @@ class WebServer:
async def start_inner(self):
app = tornado.web.Application([
# Routes for API calls
(r"/api/v1/spots", APISpotsHandler),
(r"/api/v1/spots", APISpotsHandler, {"spots": self.spots}),
(r"/api/v1/alerts", APIAlertsHandler),
(r"/api/v1/spots/stream", APISpotsStreamHandler),
(r"/api/v1/alerts/stream", APIAlertsStreamHandler),
@@ -88,235 +91,3 @@ class WebServer:
def clean_up_sse_queues(self):
# todo
pass
# API request handler for /api/v1/spots
class APISpotsHandler(tornado.web.RequestHandler):
def get(self):
# todo
self.write("Hello, world")
# API request handler for /api/v1/alerts
class APIAlertsHandler(tornado.web.RequestHandler):
def get(self):
# todo
self.write("Hello, world")
# API request handler for /api/v1/spots/stream
class APISpotsStreamHandler(tornado.web.RequestHandler):
def get(self):
# todo
self.write("Hello, world")
# API request handler for /api/v1/alerts/stream
class APIAlertsStreamHandler(tornado.web.RequestHandler):
def get(self):
# todo
self.write("Hello, world")
# API request handler for /api/v1/options
class APIOptionsHandler(tornado.web.RequestHandler):
def initialize(self, status_data):
self.status_data = status_data
def get(self):
options = {"bands": BANDS,
"modes": ALL_MODES,
"mode_types": MODE_TYPES,
"sigs": SIGS,
# Spot/alert sources are filtered for only ones that are enabled in config, no point letting the user toggle things that aren't even available.
"spot_sources": list(
map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["spot_providers"]))),
"alert_sources": list(
map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["alert_providers"]))),
"continents": CONTINENTS,
"max_spot_age": MAX_SPOT_AGE,
"spot_allowed": ALLOW_SPOTTING,
"web-ui-options": WEB_UI_OPTIONS}
# If spotting to this server is enabled, "API" is another valid spot source even though it does not come from
# one of our proviers.
if ALLOW_SPOTTING:
options["spot_sources"].append("API")
self.write(json.dumps(options, default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
# API request handler for /api/v1/status
class APIStatusHandler(tornado.web.RequestHandler):
def initialize(self, status_data):
self.status_data = status_data
def get(self):
self.write(json.dumps(self.status_data, default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
# API request handler for /api/v1/lookup/call
class APILookupCallHandler(tornado.web.RequestHandler):
def get(self):
# todo
self.write("Hello, world")
# API request handler for /api/v1/lookup/sigref
class APILookupSIGRefHandler(tornado.web.RequestHandler):
def get(self):
# todo
self.write("Hello, world")
# API request handler for /api/v1/spot (POST)
class APISpotHandler(tornado.web.RequestHandler):
def post(self):
# todo
self.write("Hello, world")
# Handler for all HTML pages generated from templates
class PageTemplateHandler(tornado.web.RequestHandler):
def initialize(self, template_name):
self.template_name = template_name
def get(self):
# Load named template, and provide variables used in templates
self.render(self.template_name + ".html", software_version=SOFTWARE_VERSION, allow_spotting=ALLOW_SPOTTING)
# Handler for Prometheus metrics endpoint
class PrometheusMetricsHandler(tornado.web.RequestHandler):
def get(self):
self.write(get_metrics())
self.set_status(200)
self.set_header('Content-Type', CONTENT_TYPE_LATEST)
# Given URL query params and a spot, figure out if the spot "passes" the requested filters or is rejected. The list
# of query parameters and their function is defined in the API docs.
def spot_allowed_by_query(spot, query):
for k in query.keys():
match k:
case "since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp()
if not spot.time or spot.time <= since:
return False
case "max_age":
max_age = int(query.get(k))
since = (datetime.now(pytz.UTC) - timedelta(seconds=max_age)).timestamp()
if not spot.time or spot.time <= since:
return False
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp()
if not spot.received_time or spot.received_time <= since:
return False
case "source":
sources = query.get(k).split(",")
if not spot.source or spot.source not in sources:
return False
case "sig":
# If a list of sigs is provided, the spot must have a sig and it must match one of them.
# The special "sig" "NO_SIG", when supplied in the list, mathches spots with no sig.
sigs = query.get(k).split(",")
include_no_sig = "NO_SIG" in sigs
if not spot.sig and not include_no_sig:
return False
if spot.sig and spot.sig not in sigs:
return False
case "needs_sig":
# If true, a sig is required, regardless of what it is, it just can't be missing. Mutually
# exclusive with supplying the special "NO_SIG" parameter to the "sig" query param.
needs_sig = query.get(k).upper() == "TRUE"
if needs_sig and not spot.sig:
return False
case "needs_sig_ref":
# If true, at least one sig ref is required, regardless of what it is, it just can't be missing.
needs_sig_ref = query.get(k).upper() == "TRUE"
if needs_sig_ref and (not spot.sig_refs or len(spot.sig_refs) == 0):
return False
case "band":
bands = query.get(k).split(",")
if not spot.band or spot.band not in bands:
return False
case "mode":
modes = query.get(k).split(",")
if not spot.mode or spot.mode not in modes:
return False
case "mode_type":
mode_types = query.get(k).split(",")
if not spot.mode_type or spot.mode_type not in mode_types:
return False
case "dx_continent":
dxconts = query.get(k).split(",")
if not spot.dx_continent or spot.dx_continent not in dxconts:
return False
case "de_continent":
deconts = query.get(k).split(",")
if not spot.de_continent or spot.de_continent not in deconts:
return False
case "comment_includes":
comment_includes = query.get(k).strip()
if not spot.comment or comment_includes.upper() not in spot.comment.upper():
return False
case "dx_call_includes":
dx_call_includes = query.get(k).strip()
if not spot.dx_call or dx_call_includes.upper() not in spot.dx_call.upper():
return False
case "allow_qrt":
# If false, spots that are flagged as QRT are not returned.
prevent_qrt = query.get(k).upper() == "FALSE"
if prevent_qrt and spot.qrt and spot.qrt == True:
return False
case "needs_good_location":
# If true, spots require a "good" location to be returned
needs_good_location = query.get(k).upper() == "TRUE"
if needs_good_location and not spot.dx_location_good:
return False
return True
# Given URL query params and an alert, figure out if the alert "passes" the requested filters or is rejected. The list
# of query parameters and their function is defined in the API docs.
def alert_allowed_by_query(alert, query):
for k in query.keys():
match k:
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC)
if not alert.received_time or alert.received_time <= since:
return False
case "max_duration":
max_duration = int(query.get(k))
# Check the duration if end_time is provided. If end_time is not provided, assume the activation is
# "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and
# the alert is a dxpedition, it also always passes the check.
if alert.is_dxpedition and (bool(query.get(
"dxpeditions_skip_max_duration_check")) if "dxpeditions_skip_max_duration_check" in query.keys() else False):
continue
if alert.end_time and alert.start_time and alert.end_time - alert.start_time > max_duration:
return False
case "source":
sources = query.get(k).split(",")
if not alert.source or alert.source not in sources:
return False
case "sig":
# If a list of sigs is provided, the alert must have a sig and it must match one of them.
# The special "sig" "NO_SIG", when supplied in the list, mathches alerts with no sig.
sigs = query.get(k).split(",")
include_no_sig = "NO_SIG" in sigs
if not alert.sig and not include_no_sig:
return False
if alert.sig and alert.sig not in sigs:
return False
case "dx_continent":
dxconts = query.get(k).split(",")
if not alert.dx_continent or alert.dx_continent not in dxconts:
return False
case "dx_call_includes":
dx_call_includes = query.get(k).strip()
if not alert.dx_call or dx_call_includes.upper() not in alert.dx_call.upper():
return False
return True
# Convert objects to serialisable things. Used by JSON serialiser as a default when it encounters unserializable things.
# Just converts objects to dict. Try to avoid doing anything clever here when serialising spots, because we also need
# to receive spots without complex handling.
def serialize_everything(obj):
return obj.__dict__