Files
spothole/server/handlers/api/addspot.py
2026-06-13 08:17:38 +01:00

232 lines
12 KiB
Python

import json
import logging
import re
import threading
from datetime import datetime
import pytz
import requests
import tornado
from core.config import ALLOW_SPOTTING, ALLOW_UPSTREAM_SPOTTING, MAX_SPOT_AGE, RECAPTCHA_SECRET_KEY
from core.constants import UNKNOWN_BAND
from core.lookup_helper import infer_band_from_freq
from core.prometheus_metrics_handler import api_requests_counter
from core.sig_utils import get_ref_regex_for_sig
from core.utils import serialize_everything
from data.sig_ref import SIGRef
from data.spot import Spot
RECAPTCHA_VERIFY_URL = "https://www.google.com/recaptcha/api/siteverify"
class APISpotHandler(tornado.web.RequestHandler):
"""API request handler for /api/v1/spot (POST)"""
def initialize(self, spots, web_server_metrics, spot_providers=None):
self._spots = spots
self._web_server_metrics = web_server_metrics
self._spot_providers = spot_providers or []
def post(self):
try:
# Metrics
self._web_server_metrics["last_api_access_time"] = datetime.now(pytz.UTC)
self._web_server_metrics["api_access_counter"] += 1
self._web_server_metrics["status"] = "OK"
api_requests_counter.inc()
# Reject if not allowed
if not ALLOW_SPOTTING:
self.set_status(401)
self.write(json.dumps("Error - this server does not allow new spots to be added via the API.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Reject if format not json
if not self.request.headers.get('Content-Type', '').startswith("application/json"):
self.set_status(415)
self.write(
json.dumps("Error - request Content-Type must be application/json", default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Reject if request body is empty
post_data = self.request.body
if not post_data:
self.set_status(422)
self.write(json.dumps("Error - request body is empty", default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Read in the request body as JSON
json_body = tornado.escape.json_decode(post_data)
# Extract fields relating to how we handle the spot, such as CAPTCHA and upstream submission. Remove these
# from the data so they don't accidentally end up in the spot object itself.
# todo: Better way of separating these out. Possible without API change or not?
submit_upstream = json_body.pop("submit_upstream", False)
upstream_provider_name = json_body.pop("upstream_provider", None)
upstream_credentials = json_body.pop("upstream_credentials", {})
captcha_token = json_body.pop("captcha_token", None)
# Verify CAPTCHA if required
if RECAPTCHA_SECRET_KEY:
if not captcha_token:
self.set_status(422)
self.write(json.dumps("Error - CAPTCHA token is required for spot submission.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
if not self._verify_recaptcha(captcha_token):
self.set_status(422)
self.write(json.dumps("Error - CAPTCHA verification failed.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Convert remaining fields to a Spot object
spot = Spot(**json_body)
# Converting to a spot object this way won't have coped with sig_ref objects, so fix that. (Would be nice to
# redo this in a functional style)
if spot.sig and spot.sig_refs:
real_sig_refs = []
for dict_obj in spot.sig_refs:
dict_obj = {**dict_obj, "sig": spot.sig}
real_sig_refs.append(json.loads(json.dumps(dict_obj), object_hook=lambda d: SIGRef(**d)))
spot.sig_refs = real_sig_refs
# Reject if no timestamp, frequency, dx_call or de_call
if not spot.time or not spot.dx_call or not spot.freq or not spot.de_call:
self.set_status(422)
self.write(json.dumps("Error - 'time', 'dx_call', 'freq' and 'de_call' must be provided as a minimum.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Reject invalid-looking callsigns
if not re.match(r"^[A-Za-z0-9/\-]*$", spot.dx_call):
self.set_status(422)
self.write(json.dumps("Error - '" + spot.dx_call + "' does not look like a valid callsign.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
if not re.match(r"^[A-Za-z0-9/\-]*$", spot.de_call):
self.set_status(422)
self.write(json.dumps("Error - '" + spot.de_call + "' does not look like a valid callsign.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Reject if frequency not in a known band
if infer_band_from_freq(spot.freq) == UNKNOWN_BAND:
self.set_status(422)
self.write(json.dumps("Error - Frequency of " + str(spot.freq / 1000.0) + "kHz is not in a known band.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Reject if grid formatting incorrect
if spot.dx_grid and not re.match(
r"^([A-R]{2}[0-9]{2}[A-X]{2}[0-9]{2}[A-X]{2}|[A-R]{2}[0-9]{2}[A-X]{2}[0-9]{2}|[A-R]{2}[0-9]{2}[A-X]{2}|[A-R]{2}[0-9]{2})$",
spot.dx_grid.upper()):
self.set_status(422)
self.write(json.dumps("Error - '" + spot.dx_grid + "' does not look like a valid Maidenhead grid.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Reject if sig_ref format incorrect for sig
if spot.sig and spot.sig_refs and len(spot.sig_refs) > 0 and spot.sig_refs[0].id and get_ref_regex_for_sig(
spot.sig) and not re.match(get_ref_regex_for_sig(spot.sig), spot.sig_refs[0].id):
self.set_status(422)
self.write(json.dumps(
"Error - '" + spot.sig_refs[0].id + "' does not look like a valid reference for " + spot.sig + ".",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Reject upstream submission if not permitted
if submit_upstream and not ALLOW_UPSTREAM_SPOTTING:
self.set_status(403)
self.write(json.dumps("Error - this server does not allow upstream spot submission.",
default=serialize_everything))
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
return
# Submit upstream if requested
upstream_warning = None
if submit_upstream and upstream_provider_name and spot.sig:
provider = self._find_provider(upstream_provider_name, spot.sig)
if provider:
try:
# Submit spot to the upstream provider
provider.submit_spot(spot, upstream_credentials)
# Trigger a re-poll after 1 second so the spot appears quickly
threading.Timer(1.0, lambda: provider.force_poll()).start()
except NotImplementedError as e:
upstream_warning = str(e)
except Exception as e:
logging.warning("Failed to submit spot upstream to " + upstream_provider_name + ": " + str(e))
upstream_warning = "Spot was saved locally but upstream submission to " + upstream_provider_name + " failed: " + str(
e)
else:
upstream_warning = "No enabled provider named '" + upstream_provider_name + "' supports upstream submission for " + spot.sig + " spots."
# If we successfully submitted the spot upstream, don't add it direct to Spothole, otherwise it will be a
# duplicate with what immediately comes back from the API. But if we weren't asked to send it upstream, or
# we were but it failed, we should still add it to our database anyway.
if not submit_upstream or upstream_warning:
spot.infer_missing()
self._spots.add(spot.id, spot, expire=MAX_SPOT_AGE)
if upstream_warning:
self.write(json.dumps("Warning - " + upstream_warning, default=serialize_everything))
self.set_status(201)
else:
self.write(json.dumps("OK", default=serialize_everything))
self.set_status(201)
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
except Exception as e:
logging.error(e)
self.write(json.dumps("Error - an internal server error occurred.", default=serialize_everything))
self.set_status(500)
self.set_header("Cache-Control", "no-store")
self.set_header("Content-Type", "application/json")
def _find_provider(self, provider_name, sig):
"""Find an enabled provider by name that can submit spots for the given SIG."""
for p in self._spot_providers:
if p.enabled and p.name == provider_name and p.can_submit_spot(sig):
return p
return None
def _verify_recaptcha(self, token):
"""Verify a Google reCAPTCHA v2 token. Returns True if valid."""
try:
response = requests.post(RECAPTCHA_VERIFY_URL,
data={"secret": RECAPTCHA_SECRET_KEY, "response": token},
timeout=(5, 10))
return response.ok and response.json().get("success", False)
except Exception as e:
logging.warning("reCAPTCHA verification request failed: " + str(e))
return False