import json import logging from datetime import datetime, timedelta from threading import Thread import bottle import pytz from bottle import run, response, template from core.config import MAX_SPOT_AGE from core.constants import BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS from core.utils import serialize_everything # Provides the public-facing web server. class WebServer: # Constructor def __init__(self, spot_list, status_data, port): self.last_page_access_time = None self.last_api_access_time = None self.spot_list = spot_list self.status_data = status_data self.port = port self.thread = Thread(target=self.run) self.thread.daemon = True self.status = "Starting" # Routes for API calls bottle.get("/api/spots")(lambda: self.serve_api(self.get_spot_list_with_filters())) bottle.get("/api/options")(lambda: self.serve_api(self.get_options())) bottle.get("/api/status")(lambda: self.serve_api(self.status_data)) bottle.post("/api/spot")(lambda: self.accept_spot()) # Routes for templated pages bottle.get("/")(lambda: self.serve_template('webpage_home')) bottle.get("/about")(lambda: self.serve_template('webpage_about')) bottle.get("/apidocs")(lambda: self.serve_template('webpage_apidocs')) # Default route to serve from "webassets" bottle.get("/")(self.serve_static_file) # Start the web server def start(self): self.thread.start() # Run the web server itself. This blocks until the server is shut down, so it runs in a separate thread. def run(self): logging.info("Starting web server on port " + str(self.port) + "...") self.status = "Waiting" run(host='localhost', port=self.port) # Serve a JSON API endpoint def serve_api(self, data): self.last_api_access_time = datetime.now(pytz.UTC) self.status = "OK" response.content_type = 'application/json' response.set_header('Cache-Control', 'no-store') return json.dumps(data, default=serialize_everything) # Accept a spot def accept_spot(self): self.last_api_access_time = datetime.now(pytz.UTC) self.status = "OK" print(bottle.request.forms.spot) response.content_type = 'application/json' response.set_header('Cache-Control', 'no-store') return json.dumps("OK", default=serialize_everything) # Serve a templated page def serve_template(self, template_name): self.last_page_access_time = datetime.now(pytz.UTC) self.status = "OK" return template(template_name) # Serve general static files from "webassets" directory. def serve_static_file(self, filepath): return bottle.static_file(filepath, root="webassets") # Utility method to apply filters to the overall spot list and return only a subset. Enables query parameters in # the main "spots" GET call. def get_spot_list_with_filters(self): # Get the query (and the right one, with Bottle magic. This is a MultiDict object query = bottle.request.query # Create a shallow copy of the spot list, ordered by spot time. We'll then filter it accordingly. # We can filter by spot time and received time with "since" and "received_since", which take a UNIX timestamp # in seconds UTC. # We can also filter by source, sig, band, mode, dx_continent and de_continent. Each of these accepts a single # value or a comma-separated list. # We can provide a "limit" number as well. Spots are always returned newest-first; "limit" limits to only the # most recent X spots. spots = sorted(self.spot_list, key=lambda spot: spot.time, reverse=True) for k in query.keys(): match k: case "since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC) spots = [s for s in spots if s.time > since] case "max_age": max_age = int(query.get(k)) since = datetime.now(pytz.UTC) - timedelta(seconds=max_age) spots = [s for s in spots if s.time > since] case "received_since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC) spots = [s for s in spots if s.received_time > since] case "source": sources = query.get(k).split(",") spots = [s for s in spots if s.source in sources] case "sig": sigs = query.get(k).split(",") spots = [s for s in spots if s.sig in sigs] case "band": bands = query.get(k).split(",") spots = [s for s in spots if s.band in bands] case "mode": modes = query.get(k).split(",") spots = [s for s in spots if s.mode in modes] case "mode_type": mode_families = query.get(k).split(",") spots = [s for s in spots if s.mode_type in mode_families] case "dx_continent": dxconts = query.get(k).split(",") spots = [s for s in spots if s.dx_continent in dxconts] case "de_continent": deconts = query.get(k).split(",") spots = [s for s in spots if s.de_continent in deconts] # If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys. if "limit" in query.keys(): spots = spots[:int(query.get("limit"))] return spots # Return all the "options" for various things that the server is aware of. This can be fetched with an API call. # The idea is that this will include most of the things that can be provided as queries to the main spots call, # and thus a client can use this data to configure its filter controls. def get_options(self): return {"bands": BANDS, "modes": ALL_MODES, "mode_types": MODE_TYPES, "sigs": SIGS, # Sources are filtered for only ones that are enabled in config, no point letting the user toggle things that aren't even available. "sources": list(map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["providers"]))), "continents": CONTINENTS, "max_spot_age": MAX_SPOT_AGE}