import json import logging from datetime import datetime, timedelta from threading import Thread import bottle import pytz from bottle import run, request, response, template from core.config import MAX_SPOT_AGE, ALLOW_SPOTTING from core.constants import BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS, SOFTWARE_VERSION from data.spot import Spot # Provides the public-facing web server. class WebServer: # Constructor def __init__(self, spots, alerts, status_data, port): self.last_page_access_time = None self.last_api_access_time = None self.spots = spots self.alerts = alerts self.status_data = status_data self.port = port self.thread = Thread(target=self.run) self.thread.daemon = True self.status = "Starting" # Base template data bottle.BaseTemplate.defaults['software_version'] = SOFTWARE_VERSION # Routes for API calls bottle.get("/api/v1/spots")(lambda: self.serve_api(self.get_spot_list_with_filters())) bottle.get("/api/v1/alerts")(lambda: self.serve_api(self.get_alert_list_with_filters())) bottle.get("/api/v1/options")(lambda: self.serve_api(self.get_options())) bottle.get("/api/v1/status")(lambda: self.serve_api(self.status_data)) bottle.post("/api/v1/spot")(lambda: self.accept_spot()) # Routes for templated pages bottle.get("/")(lambda: self.serve_template('webpage_spots')) bottle.get("/map")(lambda: self.serve_template('webpage_map')) bottle.get("/alerts")(lambda: self.serve_template('webpage_alerts')) bottle.get("/status")(lambda: self.serve_template('webpage_status')) bottle.get("/about")(lambda: self.serve_template('webpage_about')) bottle.get("/apidocs")(lambda: self.serve_template('webpage_apidocs')) # Default route to serve from "webassets" bottle.get("/")(self.serve_static_file) # Start the web server def start(self): self.thread.start() # Run the web server itself. This blocks until the server is shut down, so it runs in a separate thread. def run(self): logging.info("Starting web server on port " + str(self.port) + "...") self.status = "Waiting" run(host='localhost', port=self.port) # Serve a JSON API endpoint def serve_api(self, data): self.last_api_access_time = datetime.now(pytz.UTC) self.status = "OK" response.content_type = 'application/json' response.set_header('Cache-Control', 'no-store') return json.dumps(data, default=serialize_everything) # Accept a spot def accept_spot(self): self.last_api_access_time = datetime.now(pytz.UTC) self.status = "OK" try: # Reject if not allowed if not ALLOW_SPOTTING: response.content_type = 'application/json' response.status = 401 return json.dumps("Error - this server does not allow new spots to be added via the API.", default=serialize_everything) # Reject if format not json if not request.get_header('Content-Type') or request.get_header('Content-Type') != "application/json": response.content_type = 'application/json' response.status = 415 return json.dumps("Error - request Content-Type must be application/json", default=serialize_everything) # Reject if request body is empty post_data = request.body.read() if not post_data: response.content_type = 'application/json' response.status = 422 return json.dumps("Error - request body is empty", default=serialize_everything) # Read in the request body as JSON then convert to a Spot object json_spot = json.loads(post_data) spot = Spot(**json_spot) # Reject if no timestamp or dx_call if not spot.time or not spot.dx_call: response.content_type = 'application/json' response.status = 422 return json.dumps("Error - 'time' and 'dx_call' must be provided as a minimum.", default=serialize_everything) # infer missing data, and add it to our database. spot.source = "API" spot.icon = "desktop" spot.infer_missing() self.spots.add(spot.id, spot, expire=MAX_SPOT_AGE) response.content_type = 'application/json' response.set_header('Cache-Control', 'no-store') return json.dumps("OK", default=serialize_everything) except Exception as e: logging.error(e) response.content_type = 'application/json' response.status = 500 return json.dumps("Error - " + str(e), default=serialize_everything) # Serve a templated page def serve_template(self, template_name): self.last_page_access_time = datetime.now(pytz.UTC) self.status = "OK" return template(template_name) # Serve general static files from "webassets" directory. def serve_static_file(self, filepath): return bottle.static_file(filepath, root="webassets") # Utility method to apply filters to the overall spot list and return only a subset. Enables query parameters in # the main "spots" GET call. def get_spot_list_with_filters(self): # Get the query (and the right one, with Bottle magic. This is a MultiDict object) query = bottle.request.query # Create a shallow copy of the spot list, ordered by spot time. We'll then filter it accordingly. # We can filter by spot time and received time with "since" and "received_since", which take a UNIX timestamp # in seconds UTC. # We can also filter by source, sig, band, mode, dx_continent and de_continent. Each of these accepts a single # value or a comma-separated list. # We can filter by comments, accepting a single string, where the API will only return spots where the comment # contains the provided value (case-insensitive). # We can "de-dupe" spots, so only the latest spot will be sent for each callsign. # We can provide a "limit" number as well. Spots are always returned newest-first; "limit" limits to only the # most recent X spots. spot_ids = list(self.spots.iterkeys()) spots = [] for k in spot_ids: s = self.spots.get(k) if s is not None: spots.append(s) spots = sorted(spots, key=lambda spot: (spot.time if spot and spot.time else 0), reverse=True) for k in query.keys(): match k: case "since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp() spots = [s for s in spots if s.time and s.time > since] case "max_age": max_age = int(query.get(k)) since = (datetime.now(pytz.UTC) - timedelta(seconds=max_age)).timestamp() spots = [s for s in spots if s.time and s.time > since] case "received_since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC).timestamp() spots = [s for s in spots if s.received_time and s.received_time > since] case "source": sources = query.get(k).split(",") spots = [s for s in spots if s.source and s.source in sources] case "sig": sigs = query.get(k).split(",") spots = [s for s in spots if s.sig and s.sig in sigs] case "band": bands = query.get(k).split(",") spots = [s for s in spots if s.band and s.band in bands] case "mode": modes = query.get(k).split(",") spots = [s for s in spots if s.mode in modes] case "mode_type": mode_families = query.get(k).split(",") spots = [s for s in spots if s.mode_type and s.mode_type in mode_families] case "dx_continent": dxconts = query.get(k).split(",") spots = [s for s in spots if s.dx_continent and s.dx_continent in dxconts] case "de_continent": deconts = query.get(k).split(",") spots = [s for s in spots if s.de_continent and s.de_continent in deconts] case "comment_includes": comment_includes = query.get(k).strip() spots = [s for s in spots if s.comment and comment_includes.upper() in s.comment.upper()] case "dedupe": # Ensure only the latest spot of each callsign is present in the list. This relies on the list being # in reverse time order, so if any future change allows re-ordering the list, that should be done # *after* this. dedupe = query.get(k).upper() == "TRUE" if dedupe: spots_temp = [] already_seen = [] for s in spots: if s.dx_call not in already_seen: spots_temp.append(s) already_seen.append(s.dx_call) spots = spots_temp # If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys. if "limit" in query.keys(): spots = spots[:int(query.get("limit"))] return spots # Utility method to apply filters to the overall alert list and return only a subset. Enables query parameters in # the main "alerts" GET call. def get_alert_list_with_filters(self): # Get the query (and the right one, with Bottle magic. This is a MultiDict object) query = bottle.request.query # Create a shallow copy of the alert list, ordered by start time. We'll then filter it accordingly. # We can filter by received time with "received_since", which take a UNIX timestamp in seconds UTC. # We can also filter by source, sig, and dx_continent. Each of these accepts a single # value or a comma-separated list. # We can provide a "limit" number as well. Alerts are always returned newest-first; "limit" limits to only the # most recent X alerts. alert_ids = list(self.alerts.iterkeys()) alerts = [] for k in alert_ids: a = self.alerts.get(k) if a is not None: alerts.append(a) # We never want alerts that seem to be in the past alerts = list(filter(lambda alert: not alert.expired(), alerts)) alerts = sorted(alerts, key=lambda alert: (alert.start_time if alert and alert.start_time else 0)) for k in query.keys(): match k: case "received_since": since = datetime.fromtimestamp(int(query.get(k)), pytz.UTC) alerts = [a for a in alerts if a.received_time and a.received_time > since] case "max_duration": max_duration = int(query.get(k)) # Check the duration if end_time is provided. If end_time is not provided, assume the activation is # "short", i.e. it always passes this check. If dxpeditions_skip_max_duration_check is true and # the alert is a dxpedition, it also always passes the check. dxpeditions_skip_check = bool(query.get( "dxpeditions_skip_max_duration_check")) if "dxpeditions_skip_max_duration_check" in query.keys() else False alerts = [a for a in alerts if (a.end_time and a.end_time - a.start_time <= max_duration) or not a.end_time or (dxpeditions_skip_check and a.is_dxpedition)] case "source": sources = query.get(k).split(",") alerts = [a for a in alerts if a.source and a.source in sources] case "sig": sigs = query.get(k).split(",") alerts = [a for a in alerts if a.sig and a.sig in sigs] case "dx_continent": dxconts = query.get(k).split(",") alerts = [a for a in alerts if a.dx_continent and a.dx_continent in dxconts] # If we have a "limit" parameter, we apply that last, regardless of where it appeared in the list of keys. if "limit" in query.keys(): alerts = alerts[:int(query.get("limit"))] return alerts # Return all the "options" for various things that the server is aware of. This can be fetched with an API call. # The idea is that this will include most of the things that can be provided as queries to the main spots call, # and thus a client can use this data to configure its filter controls. def get_options(self): options = {"bands": BANDS, "modes": ALL_MODES, "mode_types": MODE_TYPES, "sigs": SIGS, # Spot/alert sources are filtered for only ones that are enabled in config, no point letting the user toggle things that aren't even available. "spot_sources": list( map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["spot_providers"]))), "alert_sources": list( map(lambda p: p["name"], filter(lambda p: p["enabled"], self.status_data["alert_providers"]))), "continents": CONTINENTS, "max_spot_age": MAX_SPOT_AGE, "spot_allowed": ALLOW_SPOTTING} # If spotting to this server is enabled, "API" is another valid spot source even though it does not come from # one of our proviers. if ALLOW_SPOTTING: options["spot_sources"].append("API") return options # Convert objects to serialisable things. Used by JSON serialiser as a default when it encounters unserializable things. # Just converts objects to dict. Try to avoid doing anything clever here when serialising spots, because we also need # to receive spots without complex handling. def serialize_everything(obj): return obj.__dict__