Files
spothole/server/webserver.py

490 lines
25 KiB
Python

import json
import logging
import re
from datetime import datetime, timedelta
from threading import Thread
import bottle
import pytz
from bottle import run, request, response, template
from core.config import MAX_SPOT_AGE, ALLOW_SPOTTING, WEB_UI_OPTIONS
from core.constants import BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS, SOFTWARE_VERSION, UNKNOWN_BAND
from core.lookup_helper import lookup_helper
from core.prometheus_metrics_handler import page_requests_counter, get_metrics, api_requests_counter
from core.sig_utils import get_ref_regex_for_sig, get_sig_ref_info
from data.sig_ref import SIGRef
from data.spot import Spot
# Provides the public-facing web server.
class WebServer:
# Constructor
def __init__(self, spots, alerts, status_data, port):
self.last_page_access_time = None
self.last_api_access_time = None
self.page_access_counter = 0
self.api_access_counter = 0
self.spots = spots
self.alerts = alerts
self.status_data = status_data
self.port = port
self.thread = Thread(target=self.run)
self.thread.daemon = True
self.status = "Starting"
# Base template data
bottle.BaseTemplate.defaults['software_version'] = SOFTWARE_VERSION
bottle.BaseTemplate.defaults['allow_spotting'] = ALLOW_SPOTTING
# Routes for API calls
bottle.get("/api/v1/spots")(lambda: self.serve_spots_api())
bottle.get("/api/v1/alerts")(lambda: self.serve_alerts_api())
bottle.get("/api/v1/options")(lambda: self.serve_api(self.get_options()))
bottle.get("/api/v1/status")(lambda: self.serve_api(self.status_data))
bottle.get("/api/v1/lookup/call")(lambda: self.serve_call_lookup_api())
bottle.get("/api/v1/lookup/sigref")(lambda: self.serve_sig_ref_lookup_api())
bottle.post("/api/v1/spot")(lambda: self.accept_spot())
# Routes for templated pages
bottle.get("/")(lambda: self.serve_template('webpage_spots'))
bottle.get("/map")(lambda: self.serve_template('webpage_map'))
bottle.get("/bands")(lambda: self.serve_template('webpage_bands'))
bottle.get("/alerts")(lambda: self.serve_template('webpage_alerts'))
bottle.get("/add-spot")(lambda: self.serve_template('webpage_add_spot'))
bottle.get("/status")(lambda: self.serve_template('webpage_status'))
bottle.get("/about")(lambda: self.serve_template('webpage_about'))
bottle.get("/apidocs")(lambda: self.serve_template('webpage_apidocs'))
# Route for Prometheus metrics
bottle.get("/metrics")(lambda: self.serve_prometheus_metrics())
# Default route to serve from "webassets"
bottle.get("/<filepath:path>")(self.serve_static_file)
# Start the web server
def start(self):
self.thread.start()
# Run the web server itself. This blocks until the server is shut down, so it runs in a separate thread.
def run(self):
logging.info("Starting web server on port " + str(self.port) + "...")
self.status = "Waiting"
run(host='localhost', port=self.port)
# Serve the JSON API /spots endpoint
def serve_spots_api(self):
try:
data = self.get_spot_list_with_filters()
return self.serve_api(data)
except ValueError as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 400
return json.dumps("Bad request - " + str(e), default=serialize_everything)
except Exception as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything)
# Serve the JSON API /alerts endpoint
def serve_alerts_api(self):
try:
data = self.get_alert_list_with_filters()
return self.serve_api(data)
except ValueError as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 400
return json.dumps("Bad request - " + str(e), default=serialize_everything)
except Exception as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything)
# Look up data for a callsign
def serve_call_lookup_api(self):
try:
# Reject if no callsign
query = bottle.request.query
if not "call" in query.keys():
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - call must be provided", default=serialize_everything)
call = query.get("call").upper()
# Reject badly formatted callsigns
if not re.match(r"^[A-Za-z0-9/\-]*$", call):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - '" + call + "' does not look like a valid callsign.",
default=serialize_everything)
# Take the callsign, make a "fake spot" so we can run infer_missing() on it, then repack the resulting data
# in the correct way for the API response.
fake_spot = Spot(dx_call=call)
fake_spot.infer_missing()
return self.serve_api({
"call": call,
"name": fake_spot.dx_name,
"qth": fake_spot.dx_qth,
"country": fake_spot.dx_country,
"flag": fake_spot.dx_flag,
"continent": fake_spot.dx_continent,
"dxcc_id": fake_spot.dx_dxcc_id,
"cq_zone": fake_spot.dx_cq_zone,
"itu_zone": fake_spot.dx_itu_zone,
"grid": fake_spot.dx_grid,
"latitude": fake_spot.dx_latitude,
"longitude": fake_spot.dx_longitude,
"location_source": fake_spot.dx_location_source
})
except Exception as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything)
# Look up data for a SIG reference
def serve_sig_ref_lookup_api(self):
try:
# Reject if no sig or sig_ref
query = bottle.request.query
if not "sig" in query.keys() or not "id" in query.keys():
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - sig and id must be provided", default=serialize_everything)
sig = query.get("sig").upper()
id = query.get("id").upper()
# Reject if sig unknown
if not sig in list(map(lambda p: p.name, SIGS)):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - sig '" + sig + "' is not known.", default=serialize_everything)
# Reject if sig_ref format incorrect for sig
if get_ref_regex_for_sig(sig) and not re.match(get_ref_regex_for_sig(sig), id):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - '" + id + "' does not look like a valid reference ID for " + sig + ".", default=serialize_everything)
data = get_sig_ref_info(sig, id)
return self.serve_api(data)
except Exception as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything)
# Serve a JSON API endpoint
def serve_api(self, data):
self.last_api_access_time = datetime.now(pytz.UTC)
self.api_access_counter += 1
api_requests_counter.inc()
self.status = "OK"
response.content_type = 'application/json'
response.set_header('Cache-Control', 'no-store')
return json.dumps(data, default=serialize_everything)
# Accept a spot
def accept_spot(self):
self.last_api_access_time = datetime.now(pytz.UTC)
self.api_access_counter += 1
api_requests_counter.inc()
self.status = "OK"
try:
# Reject if not allowed
if not ALLOW_SPOTTING:
response.content_type = 'application/json'
response.status = 401
return json.dumps("Error - this server does not allow new spots to be added via the API.",
default=serialize_everything)
# Reject if format not json
if not request.get_header('Content-Type') or request.get_header('Content-Type') != "application/json":
response.content_type = 'application/json'
response.status = 415
return json.dumps("Error - request Content-Type must be application/json", default=serialize_everything)
# Reject if request body is empty
post_data = request.body.read()
if not post_data:
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - request body is empty", default=serialize_everything)
# Read in the request body as JSON then convert to a Spot object
json_spot = json.loads(post_data)
spot = Spot(**json_spot)
# Converting to a spot object this way won't have coped with sig_ref objects, so fix that. (Would be nice to
# redo this in a functional style)
if spot.sig_refs:
real_sig_refs = []
for dict_obj in spot.sig_refs:
real_sig_refs.append(json.loads(json.dumps(dict_obj), object_hook=lambda d: SIGRef(**d)))
spot.sig_refs = real_sig_refs
# Reject if no timestamp, frequency, dx_call or de_call
if not spot.time or not spot.dx_call or not spot.freq or not spot.de_call:
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - 'time', 'dx_call', 'freq' and 'de_call' must be provided as a minimum.",
default=serialize_everything)
# Reject invalid-looking callsigns
if not re.match(r"^[A-Za-z0-9/\-]*$", spot.dx_call):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - '" + spot.dx_call + "' does not look like a valid callsign.",
default=serialize_everything)
if not re.match(r"^[A-Za-z0-9/\-]*$", spot.de_call):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - '" + spot.de_call + "' does not look like a valid callsign.",
default=serialize_everything)
# Reject if frequency not in a known band
if lookup_helper.infer_band_from_freq(spot.freq) == UNKNOWN_BAND:
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - Frequency of " + str(spot.freq / 1000.0) + "kHz is not in a known band.", default=serialize_everything)
# Reject if grid formatting incorrect
if spot.dx_grid and not re.match(r"^([A-R]{2}[0-9]{2}[A-X]{2}[0-9]{2}[A-X]{2}|[A-R]{2}[0-9]{2}[A-X]{2}[0-9]{2}|[A-R]{2}[0-9]{2}[A-X]{2}|[A-R]{2}[0-9]{2})$", spot.dx_grid.upper()):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - '" + spot.dx_grid + "' does not look like a valid Maidenhead grid.", default=serialize_everything)
# Reject if sig_ref format incorrect for sig
if spot.sig and spot.sig_refs and len(spot.sig_refs) > 0 and spot.sig_refs[0].id and get_ref_regex_for_sig(spot.sig) and not re.match(get_ref_regex_for_sig(spot.sig), spot.sig_refs[0].id):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - '" + spot.sig_refs[0].id + "' does not look like a valid reference for " + spot.sig + ".", default=serialize_everything)
# infer missing data, and add it to our database.
spot.source = "API"
spot.infer_missing()
self.spots.add(spot.id, spot, expire=MAX_SPOT_AGE)
response.content_type = 'application/json'
response.set_header('Cache-Control', 'no-store')
response.status = 201
return json.dumps("OK", default=serialize_everything)
except Exception as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything)
# Serve a templated page
def serve_template(self, template_name):
self.last_page_access_time = datetime.now(pytz.UTC)
self.page_access_counter += 1
page_requests_counter.inc()
self.status = "OK"
return template(template_name)
# Serve general static files from "webassets" directory.
def serve_static_file(self, filepath):
return bottle.static_file(filepath, root="webassets")
# Serve Prometheus metrics
def serve_prometheus_metrics(self):
return get_metrics()
# Utility method to apply filters to the overall spot list and return only a subset. Enables query parameters in
# the main "spots" GET call.
def get_spot_list_with_filters(self):
# Get the query (and the right one, with Bottle magic. This is a MultiDict object)
query = bottle.request.query
# Create a shallow copy of the spot list, ordered by spot time. We'll then filter it accordingly.
# We can filter by spot time and received time with "since" and "received_since", which take a UNIX timestamp
# in seconds UTC.
# We can also filter by source, sig, band, mode, dx_continent and de_continent. Each of these accepts a single
# value or a comma-separated list.
# We can filter by comments, accepting a single string, where the API will only return spots where the comment
# contains the provided value (case-insensitive).
# We can "de-dupe" spots, so only the latest spot will be sent for each callsign.
# We can provide a "limit" number as well. Spots are always returned newest-first; "limit" limits to only the
# most recent X spots.
spot_ids = list(self.spots.iterkeys())
spots = []
for k in spot_ids:
s = self.spots.get(k)
if s is not None:
spots.append(s)
spots = sorted(spots, key=lambda spot: (spot.time if spot and spot.time else 0), reverse=True)
for k in query.keys():
match k:
case "since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp()
spots = [s for s in spots if s.time and s.time > since]
case "max_age":
max_age = int(query.get(k))
since = (datetime.now(pytz.UTC) - timedelta(seconds=max_age)).timestamp()
spots = [s for s in spots if s.time and s.time > since]
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp()
spots = [s for s in spots if s.received_time and s.received_time > since]
case "source":
sources = query.get(k).split(",")
spots = [s for s in spots if s.source and s.source in sources]
case "sig":
# If a list of sigs is provided, the spot must have a sig and it must match one of them.
# The special "sig" "NO_SIG", when supplied in the list, mathches spots with no sig.
sigs = query.get(k).split(",")
include_no_sig = "NO_SIG" in sigs
spots = [s for s in spots if (s.sig and s.sig in sigs) or (include_no_sig and not s.sig)]
case "needs_sig":
# If true, a sig is required, regardless of what it is, it just can't be missing. Mutually
# exclusive with supplying the special "NO_SIG" parameter to the "sig" query param.
needs_sig = query.get(k).upper() == "TRUE"
if needs_sig:
spots = [s for s in spots if s.sig]
case "needs_sig_ref":
# If true, at least one sig ref is required, regardless of what it is, it just can't be missing.
needs_sig_ref = query.get(k).upper() == "TRUE"
if needs_sig_ref:
spots = [s for s in spots if s.sig_refs and len(s.sig_refs) > 0]
case "band":
bands = query.get(k).split(",")
spots = [s for s in spots if s.band and s.band in bands]
case "mode":
modes = query.get(k).split(",")
spots = [s for s in spots if s.mode in modes]
case "mode_type":
mode_families = query.get(k).split(",")
spots = [s for s in spots if s.mode_type and s.mode_type in mode_families]
case "dx_continent":
dxconts = query.get(k).split(",")
spots = [s for s in spots if s.dx_continent and s.dx_continent in dxconts]
case "de_continent":
deconts = query.get(k).split(",")
spots = [s for s in spots if s.de_continent and s.de_continent in deconts]
case "comment_includes":
comment_includes = query.get(k).strip()
spots = [s for s in spots if s.comment and comment_includes.upper() in s.comment.upper()]
case "dx_call_includes":
dx_call_includes = query.get(k).strip()
spots = [s for s in spots if s.dx_call and dx_call_includes.upper() in s.dx_call.upper()]
case "allow_qrt":
# If false, spots that are flagged as QRT are not returned.
prevent_qrt = query.get(k).upper() == "FALSE"
if prevent_qrt:
spots = [s for s in spots if not s.qrt or s.qrt == False]
case "needs_good_location":
# If true, spots require a "good" location to be returned
needs_good_location = query.get(k).upper() == "TRUE"
if needs_good_location:
spots = [s for s in spots if s.dx_location_good]
case "dedupe":
# Ensure only the latest spot of each callsign-SSID combo is present in the list. This relies on the
# list being in reverse time order, so if any future change allows re-ordering the list, that should
# be done *after* this. SSIDs are deliberately included here (see issue #68) because e.g. M0TRT-7
# and M0TRT-9 APRS transponders could well be in different locations, on different frequencies etc.
dedupe = query.get(k).upper() == "TRUE"
if dedupe:
spots_temp = []
already_seen = []
for s in spots:
call_plus_ssid = s.dx_call + (s.dx_ssid if s.dx_ssid else "")
if call_plus_ssid not in already_seen:
spots_temp.append(s)
already_seen.append(call_plus_ssid)
spots = spots_temp
# If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys.
if "limit" in query.keys():
spots = spots[:int(query.get("limit"))]
return spots
# Utility method to apply filters to the overall alert list and return only a subset. Enables query parameters in
# the main "alerts" GET call.
def get_alert_list_with_filters(self):
# Get the query (and the right one, with Bottle magic. This is a MultiDict object)
query = bottle.request.query
# Create a shallow copy of the alert list, ordered by start time. We'll then filter it accordingly.
# We can filter by received time with "received_since", which take a UNIX timestamp in seconds UTC.
# We can also filter by source, sig, and dx_continent. Each of these accepts a single
# value or a comma-separated list.
# We can provide a "limit" number as well. Alerts are always returned newest-first; "limit" limits to only the
# most recent X alerts.
alert_ids = list(self.alerts.iterkeys())
alerts = []
for k in alert_ids:
a = self.alerts.get(k)
if a is not None:
alerts.append(a)
# We never want alerts that seem to be in the past
alerts = list(filter(lambda alert: not alert.expired(), alerts))
alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0))
for k in query.keys():
match k:
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC)
alerts = [a for a in alerts if a.received_time and a.received_time > since]
case "max_duration":
max_duration = int(query.get(k))
# Check the duration if end_time is provided. If end_time is not provided, assume the activation is
# "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and
# the alert is a dxpedition, it also always passes the check.
dxpeditions_skip_check = bool(query.get(
"dxpeditions_skip_max_duration_check")) if "dxpeditions_skip_max_duration_check" in query.keys() else False
alerts = [a for a in alerts if (a.end_time and a.end_time - a.start_time <= max_duration) or
not a.end_time or (dxpeditions_skip_check and a.is_dxpedition)]
case "source":
sources = query.get(k).split(",")
alerts = [a for a in alerts if a.source and a.source in sources]
case "sig":
# If a list of sigs is provided, the alert must have a sig and it must match one of them.
# The special "sig" "NO_SIG", when supplied in the list, mathches alerts with no sig.
sigs = query.get(k).split(",")
include_no_sig = "NO_SIG" in sigs
spots = [a for a in alerts if (a.sig and a.sig in sigs) or (include_no_sig and not a.sig)]
case "dx_continent":
dxconts = query.get(k).split(",")
alerts = [a for a in alerts if a.dx_continent and a.dx_continent in dxconts]
case "dx_call_includes":
dx_call_includes = query.get(k).strip()
spots = [a for a in alerts if a.dx_call and dx_call_includes.upper() in a.dx_call.upper()]
# If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys.
if "limit" in query.keys():
alerts = alerts[:int(query.get("limit"))]
return alerts
# Return all the "options" for various things that the server is aware of. This can be fetched with an API call.
# The idea is that this will include most of the things that can be provided as queries to the main spots call,
# and thus a client can use this data to configure its filter controls.
def get_options(self):
options = {"bands": BANDS,
"modes": ALL_MODES,
"mode_types": MODE_TYPES,
"sigs": SIGS,
# Spot/alert sources are filtered for only ones that are enabled in config, no point letting the user toggle things that aren't even available.
"spot_sources": list(
map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["spot_providers"]))),
"alert_sources": list(
map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["alert_providers"]))),
"continents": CONTINENTS,
"max_spot_age": MAX_SPOT_AGE,
"spot_allowed": ALLOW_SPOTTING,
"web-ui-options": WEB_UI_OPTIONS}
# If spotting to this server is enabled, "API" is another valid spot source even though it does not come from
# one of our proviers.
if ALLOW_SPOTTING:
options["spot_sources"].append("API")
return options
# Convert objects to serialisable things. Used by JSON serialiser as a default when it encounters unserializable things.
# Just converts objects to dict. Try to avoid doing anything clever here when serialising spots, because we also need
# to receive spots without complex handling.
def serialize_everything(obj):
return obj.__dict__