First stab at submitting spots upstream. POTA is working, all other providers still to do. #95

This commit is contained in:
Ian Renton
2026-06-12 09:14:21 +01:00
parent 930d5357fe
commit 1afb407ca5
29 changed files with 640 additions and 92 deletions

View File

@@ -89,3 +89,11 @@ class GMA(HTTPSpotProvider):
logging.warning("Exception when looking up " + self.REF_INFO_URL_ROOT + source_spot[
"REF"] + ", ignoring this spot for now")
return new_spots
def can_submit_spot(self, sig):
return sig == "GMA"
def submit_spot(self, spot, credentials):
# TODO: Implement.
# Spotting to GMA is documented: https://www.cqgma.org/api/doc/apigma_spot.pdf We (or the user) need a GMA account, and to send the password in plaintext(!!)
raise NotImplementedError("GMA upstream spot submission is not yet implemented")

View File

@@ -64,3 +64,10 @@ class HEMA(HTTPSpotProvider):
# that for us.
new_spots.append(spot)
return new_spots
def can_submit_spot(self, sig):
return sig == "HEMA"
def submit_spot(self, spot, credentials):
# TODO: Implement. Spotting to HEMA is covered in the original email from the team.
raise NotImplementedError("HEMA upstream spot submission is not yet implemented")

View File

@@ -19,6 +19,7 @@ class HTTPSpotProvider(SpotProvider):
self._poll_interval = poll_interval
self._thread = None
self._stop_event = Event()
self._wakeup_event = Event()
def start(self):
# Fire off the polling thread. It will poll immediately on startup, then sleep for poll_interval between
@@ -29,11 +30,19 @@ class HTTPSpotProvider(SpotProvider):
def stop(self):
self._stop_event.set()
self._wakeup_event.set()
def force_poll(self):
"""Trigger an immediate poll without waiting for the normal interval."""
self._wakeup_event.set()
def _run(self):
while True:
self._wakeup_event.clear()
self._poll()
if self._stop_event.wait(timeout=self._poll_interval):
self._wakeup_event.wait(timeout=self._poll_interval)
if self._stop_event.is_set():
break
def _poll(self):

View File

@@ -3,7 +3,9 @@ import re
from datetime import datetime
import pytz
import requests
from core.constants import HTTP_HEADERS
from data.sig_ref import SIGRef
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
@@ -14,7 +16,9 @@ class ParksNPeaks(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://www.parksnpeaks.org/api/ALL"
SUBMIT_URL = "https://www.parksnpeaks.org/api/SPOT/"
SIOTA_LIST_URL = "https://www.silosontheair.com/data/silos.csv"
SUBMITTABLE_SIGS = ["POTA", "SOTA", "WWFF", "HEMA", "WOTA", "ZLOTA", "SIOTA", "KRMNPA"]
def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
@@ -62,3 +66,27 @@ class ParksNPeaks(HTTPSpotProvider):
# Add new spot to the list
new_spots.append(spot)
return new_spots
def can_submit_spot(self, sig):
return sig in self.SUBMITTABLE_SIGS
def submit_spot(self, spot, credentials):
# TODO test this works
user_id = credentials.get("user_id", "")
api_key = credentials.get("api_key", "")
if not user_id or not api_key:
raise ValueError("Parks N Peaks user ID and API key are required. Get yours from your Parks N Peaks account.")
sig_ref = spot.sig_refs[0].id if spot.sig_refs else ""
body = {
"actClass": spot.sig or "",
"actCallsign": spot.dx_call,
"actSite": sig_ref,
"mode": spot.mode or "",
"freq": str(spot.freq / 1000000.0),
"comments": spot.comment or "",
"userID": user_id,
"APIKey": api_key,
}
response = requests.post(self.SUBMIT_URL, json=body, headers=HTTP_HEADERS, timeout=(5, 30))
if not response.ok:
raise RuntimeError("Parks N Peaks API returned " + str(response.status_code) + ": " + response.text)

View File

@@ -1,7 +1,9 @@
from datetime import datetime
import pytz
import requests
from core.constants import HTTP_HEADERS
from data.sig_ref import SIGRef
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
@@ -12,6 +14,7 @@ class POTA(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://api.pota.app/spot/activator"
SUBMIT_URL = "https://api.pota.app/spot"
def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
@@ -40,3 +43,25 @@ class POTA(HTTPSpotProvider):
# that for us.
new_spots.append(spot)
return new_spots
def can_submit_spot(self, sig):
return sig == "POTA"
def submit_spot(self, spot, credentials):
sig_ref = spot.sig_refs[0].id if spot.sig_refs else None
if sig_ref:
body = {
"activator": spot.dx_call,
"spotter": spot.de_call,
"frequency": str(spot.freq / 1000.0),
"mode": spot.mode or "",
"reference": sig_ref,
"comments": spot.comment or "",
"source": "Spothole",
}
headers = {**HTTP_HEADERS, "Content-Type": "application/json"}
response = requests.post(self.SUBMIT_URL, json=body, headers=headers, timeout=(5, 30))
if not response.ok:
raise RuntimeError("POTA API returned " + str(response.status_code) + ": " + response.text)
else:
raise RuntimeError("Park reference is required for submitting POTA spots.")

View File

@@ -2,7 +2,7 @@ from datetime import datetime
import requests
from core.constants import HTTP_HEADERS
from core.constants import HTTP_HEADERS, SSB_SUB_MODES, DV_SUB_MODES
from data.sig_ref import SIGRef
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
@@ -20,6 +20,10 @@ class SOTA(HTTPSpotProvider):
# SOTA spots don't contain lat/lon, we need a separate lookup for that
SUMMIT_URL_ROOT = "https://api-db2.sota.org.uk/api/summits/"
SUBMIT_URL = "https://api-db2.sota.org.uk/api/spots"
VALID_MODES = ["AM", "CW", "Data", "DV", "FM", "SSB"]
def __init__(self, provider_config):
super().__init__(provider_config, self.EPOCH_URL, self.POLL_INTERVAL_SEC)
self._api_epoch = ""
@@ -56,3 +60,44 @@ class SOTA(HTTPSpotProvider):
# that for us.
new_spots.append(spot)
return new_spots
def can_submit_spot(self, sig):
return sig == "SOTA"
def submit_spot(self, spot, credentials):
# TODO test this method works
access_token = credentials.get("access_token", "")
id_token = credentials.get("id_token", "")
if not access_token or not id_token:
raise ValueError("SOTA API tokens are required. Please log into SOTA in order to spot to it.")
sig_ref = spot.sig_refs[0].id if spot.sig_refs else ""
if sig_ref:
# Split reference into association and summit codes
ref_split = sig_ref.split("/")
# Figure out a valid mode. Borrowed this from PoLo :)
# https://github.com/ham2k/app-polo/blob/main/src/extensions/activities/sota/SOTAPostSelfSpot.js
if spot.mode and spot.mode not in self.VALID_MODES:
if spot.mode in SSB_SUB_MODES:
spot.mode = "SSB"
elif spot.mode in DV_SUB_MODES:
spot.mode = "DV"
else:
spot.mode = "Data"
body = {
"activatorCallsign": spot.dx_call,
"associationCode": ref_split[0],
"summitCode": ref_split[1],
"frequency": str(spot.freq / 1000000.0),
"mode": spot.mode or "",
"posterCallsign": spot.de_call,
"comments": spot.comment or "",
"type": "TEST" # todo remove once testing complete
}
headers = {**HTTP_HEADERS, "Authorization": "bearer " + access_token, "id_token": id_token, "Content-Type": "application/json"}
response = requests.post(self.SUBMIT_URL, json=body, headers=headers, timeout=(5, 30))
if not response.ok:
raise RuntimeError("SOTA API returned " + str(response.status_code) + ": " + response.text)
else:
raise RuntimeError("Summit reference is required for submitting SOTA spots.")

View File

@@ -68,3 +68,20 @@ class SpotProvider:
"""Stop any threads and prepare for application shutdown"""
raise NotImplementedError("Subclasses must implement this method")
def can_submit_spot(self, sig):
"""Return True if this provider supports submitting spots upstream for the given SIG."""
return False
def submit_spot(self, spot, credentials):
"""Submit a spot upstream to this provider's API. credentials is a dict with provider-specific keys.
Raises an exception with a descriptive message on failure."""
raise NotImplementedError("This provider does not support spot submission")
def force_poll(self):
"""Trigger an immediate poll without waiting for the normal interval. Default implementation here does nothing
because not all spot providers have a polling mechanism. Providers that do should override this method."""
return

View File

@@ -77,3 +77,10 @@ class WOTA(HTTPSpotProvider):
except Exception as e:
logging.error("Exception parsing WOTA spot", e)
return new_spots
def can_submit_spot(self, sig):
return sig == "WOTA"
def submit_spot(self, spot, credentials):
# TODO Ask M5TEA if he's happy to share how this is done from his app
raise NotImplementedError("WOTA upstream spot submission is not yet implemented")

View File

@@ -41,3 +41,10 @@ class WWBOTA(SSESpotProvider):
# WWBOTA does support a special "Test" spot type, we need to avoid adding that.
return spot if source_spot["type"] != "Test" else None
def can_submit_spot(self, sig):
return sig == "WWBOTA"
def submit_spot(self, spot, credentials):
# TODO: Implement. WWBOTA API docs cover this: https://api.wwbota.org/#tag/Spots/operation/create_spot_spots__post
raise NotImplementedError("WWBOTA upstream spot submission is not yet implemented")

View File

@@ -38,3 +38,10 @@ class WWFF(HTTPSpotProvider):
# that for us.
new_spots.append(spot)
return new_spots
def can_submit_spot(self, sig):
return sig == "WWFF"
def submit_spot(self, spot, credentials):
# TODO: Implement. Spotting to WWFF should be possible, need to look up the Spotline docs or copy approach from PoLo. Either way I think we need an API key for the app (but maybe not for the user?)
raise NotImplementedError("WWFF upstream spot submission is not yet implemented")

View File

@@ -41,3 +41,10 @@ class ZLOTA(HTTPSpotProvider):
new_spots.append(spot)
return new_spots
def can_submit_spot(self, sig):
return sig == "ZLOTA"
def submit_spot(self, spot, credentials):
# TODO: Implement. Spotting to ZLOTA is supported via POST, see https://ontheair.nz/api
raise NotImplementedError("ZLOTA upstream spot submission is not yet implemented")