Files
spothole/server/webserver.py

233 lines
11 KiB
Python

import json
import logging
from datetime import datetime, timedelta
from threading import Thread
import bottle
import pytz
from bottle import run, response, template
from core.config import MAX_SPOT_AGE, ALLOW_SPOTTING
from core.constants import BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS
from core.utils import serialize_everything
from data.spot import Spot
# Provides the public-facing web server.
class WebServer:
# Constructor
def __init__(self, spots, alerts, status_data, port):
self.last_page_access_time = None
self.last_api_access_time = None
self.spots = spots
self.alerts = alerts
self.status_data = status_data
self.port = port
self.thread = Thread(target=self.run)
self.thread.daemon = True
self.status = "Starting"
# Routes for API calls
bottle.get("/api/spots")(lambda: self.serve_api(self.get_spot_list_with_filters()))
bottle.get("/api/alerts")(lambda: self.serve_api(self.get_alert_list_with_filters()))
bottle.get("/api/options")(lambda: self.serve_api(self.get_options()))
bottle.get("/api/status")(lambda: self.serve_api(self.status_data))
bottle.post("/api/spot")(lambda: self.accept_spot())
# Routes for templated pages
bottle.get("/")(lambda: self.serve_template('webpage_spots'))
bottle.get("/alerts")(lambda: self.serve_template('webpage_alerts'))
bottle.get("/about")(lambda: self.serve_template('webpage_about'))
bottle.get("/apidocs")(lambda: self.serve_template('webpage_apidocs'))
# Default route to serve from "webassets"
bottle.get("/<filepath:path>")(self.serve_static_file)
# Start the web server
def start(self):
self.thread.start()
# Run the web server itself. This blocks until the server is shut down, so it runs in a separate thread.
def run(self):
logging.info("Starting web server on port " + str(self.port) + "...")
self.status = "Waiting"
run(host='localhost', port=self.port)
# Serve a JSON API endpoint
def serve_api(self, data):
self.last_api_access_time = datetime.now(pytz.UTC)
self.status = "OK"
response.content_type = 'application/json'
response.set_header('Cache-Control', 'no-store')
return json.dumps(data, default=serialize_everything)
# Accept a spot
def accept_spot(self):
self.last_api_access_time = datetime.now(pytz.UTC)
self.status = "OK"
try:
# Reject if not allowed
if not ALLOW_SPOTTING:
response.content_type = 'application/json'
response.status = 401
return json.dumps("Error - this server does not allow new spots to be added via the API.",
default=serialize_everything)
# Reject if no spot
if not bottle.request.query.spot:
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - no 'spot' parameter provided", default=serialize_everything)
# Read in the spot as JSON then convert to a Spot object
json_spot = json.loads(bottle.request.query.spot)
spot = Spot(**json_spot)
# Reject if no timestamp or dx_call
if not spot.time or not spot.dx_call:
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - 'time' and 'dx_call' must be provided as a minimum.",
default=serialize_everything)
# infer missing data, and add it to our database.
spot.source = "API"
spot.icon = "desktop"
spot.infer_missing()
self.spots.add(spot.id, spot, expire=MAX_SPOT_AGE)
response.content_type = 'application/json'
response.set_header('Cache-Control', 'no-store')
return json.dumps("OK", default=serialize_everything)
except Exception as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - " + str(e), default=serialize_everything)
# Serve a templated page
def serve_template(self, template_name):
self.last_page_access_time = datetime.now(pytz.UTC)
self.status = "OK"
return template(template_name)
# Serve general static files from "webassets" directory.
def serve_static_file(self, filepath):
return bottle.static_file(filepath, root="webassets")
# Utility method to apply filters to the overall spot list and return only a subset. Enables query parameters in
# the main "spots" GET call.
def get_spot_list_with_filters(self):
# Get the query (and the right one, with Bottle magic. This is a MultiDict object)
query = bottle.request.query
# Create a shallow copy of the spot list, ordered by spot time. We'll then filter it accordingly.
# We can filter by spot time and received time with "since" and "received_since", which take a UNIX timestamp
# in seconds UTC.
# We can also filter by source, sig, band, mode, dx_continent and de_continent. Each of these accepts a single
# value or a comma-separated list.
# We can provide a "limit" number as well. Spots are always returned newest-first; "limit" limits to only the
# most recent X spots.
spot_ids = list(self.spots.iterkeys())
spots = []
for k in spot_ids:
spots.append(self.spots.get(k))
spots = sorted(spots, key=lambda spot: (spot.time if spot and spot.time else 0), reverse=True)
for k in query.keys():
match k:
case "since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC)
spots = [s for s in spots if s.time > since]
case "max_age":
max_age = int(query.get(k))
since = datetime.now(pytz.UTC) - timedelta(seconds=max_age)
spots = [s for s in spots if s.time > since]
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC)
spots = [s for s in spots if s.received_time > since]
case "source":
sources = query.get(k).split(",")
spots = [s for s in spots if s.source in sources]
case "sig":
sigs = query.get(k).split(",")
spots = [s for s in spots if s.sig in sigs]
case "band":
bands = query.get(k).split(",")
spots = [s for s in spots if s.band in bands]
case "mode":
modes = query.get(k).split(",")
spots = [s for s in spots if s.mode in modes]
case "mode_type":
mode_families = query.get(k).split(",")
spots = [s for s in spots if s.mode_type in mode_families]
case "dx_continent":
dxconts = query.get(k).split(",")
spots = [s for s in spots if s.dx_continent in dxconts]
case "de_continent":
deconts = query.get(k).split(",")
spots = [s for s in spots if s.de_continent in deconts]
# If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys.
if "limit" in query.keys():
spots = spots[:int(query.get("limit"))]
return spots
# Utility method to apply filters to the overall alert list and return only a subset. Enables query parameters in
# the main "alerts" GET call.
def get_alert_list_with_filters(self):
# Get the query (and the right one, with Bottle magic. This is a MultiDict object)
query = bottle.request.query
# Create a shallow copy of the alert list, ordered by start time. We'll then filter it accordingly.
# We can filter by received time with "received_since", which take a UNIX timestamp in seconds UTC.
# We can also filter by source, sig, and dx_continent. Each of these accepts a single
# value or a comma-separated list.
# We can provide a "limit" number as well. Alerts are always returned newest-first; "limit" limits to only the
# most recent X alerts.
alert_ids = list(self.alerts.iterkeys())
alerts = []
for k in alert_ids:
alerts.append(self.alerts.get(k))
# We never want alerts that seem to be in the past
alerts = list(filter(lambda alert: not alert.expired(), alerts))
alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0))
for k in query.keys():
match k:
case "received_since":
since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC)
alerts = [a for a in alerts if a.received_time > since]
case "max_duration":
max_duration = int(query.get(k))
# Check the duration if end_time is provided. If end_time is not provided, assume the activation is
# "short", i.e. it always passes this check.
alerts = [a for a in alerts if (a.end_time and a.end_time - a.start_time <= max_duration) or
not a.end_time]
case "source":
sources = query.get(k).split(",")
alerts = [a for a in alerts if a.source in sources]
case "sig":
sigs = query.get(k).split(",")
alerts = [a for a in alerts if a.sig in sigs]
case "dx_continent":
dxconts = query.get(k).split(",")
alerts = [a for a in alerts if a.dx_continent in dxconts]
# If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys.
if "limit" in query.keys():
alerts = alerts[:int(query.get("limit"))]
return alerts
# Return all the "options" for various things that the server is aware of. This can be fetched with an API call.
# The idea is that this will include most of the things that can be provided as queries to the main spots call,
# and thus a client can use this data to configure its filter controls.
def get_options(self):
return {"bands": BANDS,
"modes": ALL_MODES,
"mode_types": MODE_TYPES,
"sigs": SIGS,
# Spot/alert sources are filtered for only ones that are enabled in config, no point letting the user toggle things that aren't even available.
"spot_sources": list(
map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["spot_providers"]))),
"alert_sources": list(
map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["alert_providers"]))),
"continents": CONTINENTS,
"max_spot_age": MAX_SPOT_AGE}