mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-06-23 21:25:12 +00:00
95 lines
4.4 KiB
Python
95 lines
4.4 KiB
Python
import logging
|
|
import re
|
|
from datetime import datetime
|
|
|
|
import pytz
|
|
import requests
|
|
|
|
from core.constants import HTTP_HEADERS
|
|
from data.sig_ref import SIGRef
|
|
from data.spot import Spot
|
|
from spotproviders.http_spot_provider import HTTPSpotProvider
|
|
|
|
|
|
class ParksNPeaks(HTTPSpotProvider):
|
|
"""Spot provider for Parks n Peaks"""
|
|
|
|
POLL_INTERVAL_SEC = 120
|
|
SPOTS_URL = "https://www.parksnpeaks.org/api/ALL"
|
|
SUBMIT_URL = "https://www.parksnpeaks.org/api/SPOT/"
|
|
SIOTA_LIST_URL = "https://www.silosontheair.com/data/silos.csv"
|
|
SUBMITTABLE_SIGS = ["POTA", "SOTA", "WWFF", "HEMA", "WOTA", "ZLOTA", "SIOTA", "KRMNPA"]
|
|
|
|
def __init__(self, provider_config):
|
|
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
|
|
|
|
def _http_response_to_spots(self, http_response):
|
|
new_spots = []
|
|
# Iterate through source data
|
|
if http_response and http_response != "":
|
|
for source_spot in http_response.json():
|
|
# Convert to our spot format
|
|
spot = Spot(source=self.name,
|
|
source_id=source_spot["actID"],
|
|
dx_call=source_spot["actCallsign"].upper(),
|
|
de_call=source_spot["actSpoter"].upper() if source_spot["actSpoter"] != "" else None,
|
|
# typo exists in API
|
|
freq=float(source_spot["actFreq"].replace(",", "")) * 1000000 if (
|
|
source_spot["actFreq"] != "") else None,
|
|
# Seen PNP spots with empty frequency, and with comma-separated thousands digits
|
|
mode=source_spot["actMode"].upper(),
|
|
comment=source_spot["actComments"],
|
|
time=datetime.strptime(source_spot["actTime"], "%Y-%m-%d %H:%M:%S").replace(
|
|
tzinfo=pytz.UTC).timestamp())
|
|
|
|
# Extract a de_call if it's in the comment but not in the "actSpoter" field
|
|
m = re.search(r"\(de ([A-Za-z0-9]*)\)", spot.comment or "")
|
|
if not spot.de_call and m:
|
|
spot.de_call = str(m.group(1))
|
|
|
|
# Record SIG information. Sometimes we get a "SIG" of "QRP", which we ignore as it's not a programme with a
|
|
# defined set of references
|
|
sig = source_spot["actClass"].upper()
|
|
sig_ref = source_spot["actSiteID"]
|
|
if sig and sig != "" and sig != "QRP" and sig_ref and sig_ref != "":
|
|
spot.sig = sig
|
|
sig_refs = [SIGRef(id=source_spot["actSiteID"], sig=source_spot["actClass"].upper())]
|
|
spot.sig_refs = sig_refs
|
|
|
|
# Free text location is not present in all spots, so only add it if it's set
|
|
if "actLocation" in source_spot and source_spot["actLocation"] != "":
|
|
sig_refs[0].name = source_spot["actLocation"]
|
|
|
|
# Log a warning for the developer if PnP gives us an unknown programme we've never seen before
|
|
if sig not in ["POTA", "SOTA", "WWFF", "SIOTA", "ZLOTA", "KRMNPA", "LLOTA"]:
|
|
logging.warning("PNP spot found with sig " + sig + ", developer needs to add support for this!")
|
|
|
|
# Add new spot to the list
|
|
new_spots.append(spot)
|
|
return new_spots
|
|
|
|
def can_submit_spot(self, sig):
|
|
return sig in self.SUBMITTABLE_SIGS
|
|
|
|
def submit_spot(self, spot, credentials):
|
|
# TODO test this works
|
|
user_id = credentials.get("user_id", "")
|
|
api_key = credentials.get("api_key", "")
|
|
if not user_id or not api_key:
|
|
raise ValueError(
|
|
"Parks N Peaks user ID and API key are required. Get yours from your Parks N Peaks account.")
|
|
sig_ref = spot.sig_refs[0].id if spot.sig_refs else ""
|
|
body = {
|
|
"actClass": spot.sig or "",
|
|
"actCallsign": spot.dx_call,
|
|
"actSite": sig_ref,
|
|
"mode": spot.mode or "",
|
|
"freq": str(spot.freq / 1000000.0),
|
|
"comments": spot.comment or "",
|
|
"userID": user_id,
|
|
"APIKey": api_key,
|
|
}
|
|
response = requests.post(self.SUBMIT_URL, json=body, headers=HTTP_HEADERS, timeout=(5, 30))
|
|
if not response.ok:
|
|
raise RuntimeError("Parks N Peaks API returned " + str(response.status_code) + ": " + response.text)
|