Refactor looking up SIG reference details into a common location, taking it out of the individual spot providers. This means we can now look up references properly from Cluster spot comments, etc. Closes #74 as there is no longer any duplication of these lookups. Works towards #54 as sig_refs now specify their sig internally.

This commit is contained in:
Ian Renton
2025-11-02 15:45:19 +00:00
parent 28010a68ae
commit 286ff66721
22 changed files with 192 additions and 233 deletions

View File

@@ -38,9 +38,7 @@ class BOTA(HTTPAlertProvider):
# Convert to our alert format # Convert to our alert format
alert = Alert(source=self.name, alert = Alert(source=self.name,
dx_calls=[dx_call], dx_calls=[dx_call],
sig="BOTA", sig_refs=[SIGRef(id=ref_name, sig="BOTA", name=ref_name, url="https://www.beachesontheair.com/beaches/" + ref_name.lower().replace(" ", "-"))],
sig_refs=[SIGRef(id=ref_name, name=ref_name, url="https://www.beachesontheair.com/beaches/" + ref_name.lower().replace(" ", "-"))],
icon=get_icon_for_sig("BOTA"),
start_time=date_time.timestamp(), start_time=date_time.timestamp(),
is_dxpedition=False) is_dxpedition=False)

View File

@@ -38,9 +38,7 @@ class ParksNPeaks(HTTPAlertProvider):
dx_calls=[source_alert["CallSign"].upper()], dx_calls=[source_alert["CallSign"].upper()],
freqs_modes=source_alert["Freq"] + " " + source_alert["MODE"], freqs_modes=source_alert["Freq"] + " " + source_alert["MODE"],
comment=source_alert["Comments"], comment=source_alert["Comments"],
sig=source_alert["Class"], sig_refs=[SIGRef(id=sig_ref, sig=source_alert["Class"], name=sig_ref_name)],
sig_refs=[SIGRef(id=sig_ref, name=sig_ref_name)],
icon=get_icon_for_sig(source_alert["Class"]),
start_time=start_time, start_time=start_time,
is_dxpedition=False) is_dxpedition=False)

View File

@@ -26,9 +26,7 @@ class POTA(HTTPAlertProvider):
dx_calls=[source_alert["activator"].upper()], dx_calls=[source_alert["activator"].upper()],
freqs_modes=source_alert["frequencies"], freqs_modes=source_alert["frequencies"],
comment=source_alert["comments"], comment=source_alert["comments"],
sig="POTA", sig_refs=[SIGRef(id=source_alert["reference"], sig="POTA", name=source_alert["name"], url="https://pota.app/#/park/" + source_alert["reference"])],
sig_refs=[SIGRef(id=source_alert["reference"], name=source_alert["name"], url="https://pota.app/#/park/" + source_alert["reference"])],
icon=get_icon_for_sig("POTA"),
start_time=datetime.strptime(source_alert["startDate"] + source_alert["startTime"], start_time=datetime.strptime(source_alert["startDate"] + source_alert["startTime"],
"%Y-%m-%d%H:%M").replace(tzinfo=pytz.UTC).timestamp(), "%Y-%m-%d%H:%M").replace(tzinfo=pytz.UTC).timestamp(),
end_time=datetime.strptime(source_alert["endDate"] + source_alert["endTime"], end_time=datetime.strptime(source_alert["endDate"] + source_alert["endTime"],

View File

@@ -27,9 +27,7 @@ class SOTA(HTTPAlertProvider):
dx_names=[source_alert["activatorName"].upper()], dx_names=[source_alert["activatorName"].upper()],
freqs_modes=source_alert["frequency"], freqs_modes=source_alert["frequency"],
comment=source_alert["comments"], comment=source_alert["comments"],
sig="SOTA", sig_refs=[SIGRef(id=source_alert["associationCode"] + "/" + source_alert["summitCode"], sig="SOTA", name=source_alert["summitDetails"])],
sig_refs=[SIGRef(id=source_alert["associationCode"] + "/" + source_alert["summitCode"], name=source_alert["summitDetails"], url="https://www.sotadata.org.uk/en/summit/" + source_alert["summitCode"])],
icon=get_icon_for_sig("SOTA"),
start_time=datetime.strptime(source_alert["dateActivated"], start_time=datetime.strptime(source_alert["dateActivated"],
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.UTC).timestamp(), "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.UTC).timestamp(),
is_dxpedition=False) is_dxpedition=False)

View File

@@ -54,9 +54,7 @@ class WOTA(HTTPAlertProvider):
dx_calls=[dx_call], dx_calls=[dx_call],
freqs_modes=freqs_modes, freqs_modes=freqs_modes,
comment=comment, comment=comment,
sig="WOTA", sig_refs=[SIGRef(id=ref, sig="WOTA", name=ref_name)] if ref else [],
sig_refs=[SIGRef(id=ref, name=ref_name, url="https://www.wota.org.uk/MM_" + ref)] if ref else [],
icon=get_icon_for_sig("WOTA"),
start_time=time.timestamp()) start_time=time.timestamp())
# Add to our list. # Add to our list.

View File

@@ -26,9 +26,7 @@ class WWFF(HTTPAlertProvider):
dx_calls=[source_alert["activator_call"].upper()], dx_calls=[source_alert["activator_call"].upper()],
freqs_modes=source_alert["band"] + " " + source_alert["mode"], freqs_modes=source_alert["band"] + " " + source_alert["mode"],
comment=source_alert["remarks"], comment=source_alert["remarks"],
sig="WWFF", sig_refs=[SIGRef(id=source_alert["reference"], sig="WWFF")],
sig_refs=[SIGRef(id=source_alert["reference"], url="https://wwff.co/directory/?showRef=" + source_alert["reference"])],
icon=get_icon_for_sig("WWFF"),
start_time=datetime.strptime(source_alert["utc_start"], start_time=datetime.strptime(source_alert["utc_start"],
"%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC).timestamp(), "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC).timestamp(),
end_time=datetime.strptime(source_alert["utc_end"], end_time=datetime.strptime(source_alert["utc_end"],

View File

@@ -5,6 +5,7 @@ from pyhamtools.locator import latlong_to_locator
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.constants import SIGS, HTTP_HEADERS from core.constants import SIGS, HTTP_HEADERS
from core.geo_utils import wab_wai_square_to_lat_lon from core.geo_utils import wab_wai_square_to_lat_lon
from data.sig_ref import SIGRef
# Utility function to get the icon for a named SIG. If no match is found, the "circle-question" icon will be returned. # Utility function to get the icon for a named SIG. If no match is found, the "circle-question" icon will be returned.
@@ -14,6 +15,7 @@ def get_icon_for_sig(sig):
return s.icon return s.icon
return "circle-question" return "circle-question"
# Utility function to get the regex string for a SIG reference for a named SIG. If no match is found, None will be returned. # Utility function to get the regex string for a SIG reference for a named SIG. If no match is found, None will be returned.
def get_ref_regex_for_sig(sig): def get_ref_regex_for_sig(sig):
for s in SIGS: for s in SIGS:
@@ -21,72 +23,92 @@ def get_ref_regex_for_sig(sig):
return s.ref_regex return s.ref_regex
return None return None
# Look up details of a SIG reference (e.g. POTA park) such as name, lat/lon, and grid. # Look up details of a SIG reference (e.g. POTA park) such as name, lat/lon, and grid.
# Note there is currently no support for KRMNPA location lookup, see issue #61.
def get_sig_ref_info(sig, sig_ref_id): def get_sig_ref_info(sig, sig_ref_id):
sig_ref = SIGRef(id=sig_ref_id, sig=sig)
if sig.upper() == "POTA": if sig.upper() == "POTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.pota.app/park/" + sig_ref_id, headers=HTTP_HEADERS).json() data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.pota.app/park/" + sig_ref_id, headers=HTTP_HEADERS).json()
if data: if data:
return {"name": data["name"] if "name" in data else None, fullname = data["name"] if "name" in data else None
"grid": data["grid6"] if "grid6" in data else None, if fullname and "parktypeDesc" in data and data["parktypeDesc"] != "":
"latitude": data["latitude"] if "latitude" in data else None, fullname = fullname + " " + data["parktypeDesc"]
"longitude": data["longitude"] if "longitude" in data else None} sig_ref.name = fullname
sig_ref.url = "https://pota.app/#/park/" + sig_ref_id
sig_ref.grid = data["grid6"] if "grid6" in data else None
sig_ref.latitude = data["latitude"] if "latitude" in data else None
sig_ref.longitude = data["longitude"] if "longitude" in data else None
elif sig.upper() == "SOTA": elif sig.upper() == "SOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api-db2.sota.org.uk/api/summits/" + sig_ref_id, headers=HTTP_HEADERS).json() data = SEMI_STATIC_URL_DATA_CACHE.get("https://api-db2.sota.org.uk/api/summits/" + sig_ref_id,
headers=HTTP_HEADERS).json()
if data: if data:
return {"name": data["name"] if "name" in data else None, sig_ref.name = data["name"] if "name" in data else None
"grid": data["locator"] if "locator" in data else None, sig_ref.url = "https://www.sotadata.org.uk/en/summit/" + sig_ref_id
"latitude": data["latitude"] if "latitude" in data else None, sig_ref.grid = data["locator"] if "locator" in data else None
"longitude": data["longitude"] if "longitude" in data else None} sig_ref.latitude = data["latitude"] if "latitude" in data else None
sig_ref.longitude = data["longitude"] if "longitude" in data else None
elif sig.upper() == "WWBOTA": elif sig.upper() == "WWBOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.wwbota.org/bunkers/" + sig_ref_id, headers=HTTP_HEADERS).json() data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.wwbota.org/bunkers/" + sig_ref_id,
headers=HTTP_HEADERS).json()
if data: if data:
return {"name": data["name"] if "name" in data else None, sig_ref.name = data["name"] if "name" in data else None
"grid": data["locator"] if "locator" in data else None, sig_ref.url = "https://bunkerwiki.org/?s=" + sig_ref_id if sig_ref_id.startswith("B/G") else None
"latitude": data["lat"] if "lat" in data else None, sig_ref.grid = data["locator"] if "locator" in data else None
"longitude": data["long"] if "long" in data else None} sig_ref.latitude = data["lat"] if "lat" in data else None
sig_ref.longitude = data["long"] if "long" in data else None
elif sig.upper() == "GMA" or sig.upper() == "ARLHS" or sig.upper() == "ILLW" or sig.upper() == "WCA" or sig.upper() == "MOTA" or sig.upper() == "IOTA": elif sig.upper() == "GMA" or sig.upper() == "ARLHS" or sig.upper() == "ILLW" or sig.upper() == "WCA" or sig.upper() == "MOTA" or sig.upper() == "IOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.cqgma.org/api/ref/?" + sig_ref_id, headers=HTTP_HEADERS).json() data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.cqgma.org/api/ref/?" + sig_ref_id,
headers=HTTP_HEADERS).json()
if data: if data:
return {"name": data["name"] if "name" in data else None, sig_ref.name = data["name"] if "name" in data else None
"grid": data["locator"] if "locator" in data else None, sig_ref.url = "https://www.cqgma.org/zinfo.php?ref=" + sig_ref_id
"latitude": data["latitude"] if "latitude" in data else None, sig_ref.grid = data["locator"] if "locator" in data else None
"longitude": data["longitude"] if "longitude" in data else None} sig_ref.latitude = data["latitude"] if "latitude" in data else None
sig_ref.longitude = data["longitude"] if "longitude" in data else None
elif sig.upper() == "WWFF":
sig_ref.url = "https://wwff.co/directory/?showRef=" + sig_ref_id
elif sig.upper() == "SIOTA": elif sig.upper() == "SIOTA":
siota_csv_data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.silosontheair.com/data/silos.csv", headers=HTTP_HEADERS) siota_csv_data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.silosontheair.com/data/silos.csv",
headers=HTTP_HEADERS)
siota_dr = csv.DictReader(siota_csv_data.content.decode().splitlines()) siota_dr = csv.DictReader(siota_csv_data.content.decode().splitlines())
for row in siota_dr: for row in siota_dr:
if row["SILO_CODE"] == sig_ref_id: if row["SILO_CODE"] == sig_ref_id:
return {"name": row["NAME"] if "NAME" in row else None, sig_ref.name = row["NAME"] if "NAME" in row else None
"grid": row["LOCATOR"] if "LOCATOR" in row else None, sig_ref.grid = row["LOCATOR"] if "LOCATOR" in row else None
"latitude": float(row["LAT"]) if "LAT" in row else None, sig_ref.latitude = float(row["LAT"]) if "LAT" in row else None
"longitude": float(row["LNG"]) if "LNG" in row else None} sig_ref.longitude = float(row["LNG"]) if "LNG" in row else None
elif sig.upper() == "WOTA": elif sig.upper() == "WOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.wota.org.uk/mapping/data/summits.json", headers=HTTP_HEADERS).json() data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.wota.org.uk/mapping/data/summits.json",
headers=HTTP_HEADERS).json()
if data: if data:
for feature in data["features"]: for feature in data["features"]:
if feature["properties"]["wotaId"] == sig_ref_id: if feature["properties"]["wotaId"] == sig_ref_id:
return {"name": feature["properties"]["title"], sig_ref.name = feature["properties"]["title"]
"grid": feature["properties"]["qthLocator"], sig_ref.url = "https://www.wota.org.uk/MM_" + sig_ref_id
"latitude": feature["geometry"]["coordinates"][1], sig_ref.grid = feature["properties"]["qthLocator"]
"longitude": feature["geometry"]["coordinates"][0]} sig_ref.latitude = feature["geometry"]["coordinates"][1]
sig_ref.longitude = feature["geometry"]["coordinates"][0]
elif sig.upper() == "ZLOTA": elif sig.upper() == "ZLOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://ontheair.nz/assets/assets.json", headers=HTTP_HEADERS).json() data = SEMI_STATIC_URL_DATA_CACHE.get("https://ontheair.nz/assets/assets.json", headers=HTTP_HEADERS).json()
if data: if data:
for asset in data: for asset in data:
if asset["code"] == sig_ref_id: if asset["code"] == sig_ref_id:
return {"name": asset["name"], sig_ref.name = asset["name"]
"grid": latlong_to_locator(asset["y"], asset["x"], 6), sig_ref.url = "https://ontheair.nz/assets/ZLI_OT-030" + sig_ref_id.replace("/", "_")
"latitude": asset["y"], sig_ref.grid = latlong_to_locator(asset["y"], asset["x"], 6)
"longitude": asset["x"]} sig_ref.latitude = asset["y"]
sig_ref.longitude = asset["x"]
elif sig.upper() == "WAB" or sig.upper() == "WAI": elif sig.upper() == "WAB" or sig.upper() == "WAI":
ll = wab_wai_square_to_lat_lon(sig_ref_id) ll = wab_wai_square_to_lat_lon(sig_ref_id)
if ll: if ll:
return {"name": sig_ref_id, sig_ref.name = sig_ref_id
"grid": latlong_to_locator(ll[0], ll[1], 6), sig_ref.grid = latlong_to_locator(ll[0], ll[1], 6)
"latitude": ll[0], sig_ref.latitude = ll[0]
"longitude": ll[1]} sig_ref.longitude = ll[1]
return None return sig_ref
# Regex matching any SIG # Regex matching any SIG

View File

@@ -99,6 +99,11 @@ class Alert:
if self.dx_dxcc_id and self.dx_dxcc_id in DXCC_FLAGS and not self.dx_flag: if self.dx_dxcc_id and self.dx_dxcc_id in DXCC_FLAGS and not self.dx_flag:
self.dx_flag = DXCC_FLAGS[self.dx_dxcc_id] self.dx_flag = DXCC_FLAGS[self.dx_dxcc_id]
# If the spot itself doesn't have a SIG yet, but we have at least one SIG reference, take that reference's SIG
# and apply it to the whole spot.
if self.sig_refs and len(self.sig_refs) > 0 and not self.sig:
self.sig = self.sig_refs[0].sig
# Icon from SIG # Icon from SIG
if self.sig and not self.icon: if self.sig and not self.icon:
self.icon = get_icon_for_sig(self.sig) self.icon = get_icon_for_sig(self.sig)

View File

@@ -6,7 +6,15 @@ from dataclasses import dataclass
class SIGRef: class SIGRef:
# Reference ID, e.g. "GB-0001". # Reference ID, e.g. "GB-0001".
id: str id: str
# SIG that this reference is in, e.g. "POTA".
sig: str
# Name of the reference, e.g. "Null Country Park", if known. # Name of the reference, e.g. "Null Country Park", if known.
name: str = None name: str = None
# URL to look up more information about the reference, if known. # URL to look up more information about the reference, if known.
url: str = None url: str = None
# Latitude of the reference, if known.
latitude: float = None
# Longitude of the reference, if known.
longitude: float = None
# Maidenhead grid reference of the reference, if known.
grid: str = None

View File

@@ -10,9 +10,8 @@ import pytz
from pyhamtools.locator import locator_to_latlong, latlong_to_locator from pyhamtools.locator import locator_to_latlong, latlong_to_locator
from core.constants import DXCC_FLAGS from core.constants import DXCC_FLAGS
from core.geo_utils import wab_wai_square_to_lat_lon
from core.lookup_helper import lookup_helper from core.lookup_helper import lookup_helper
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig, get_sig_ref_info
# Data class that defines a spot. # Data class that defines a spot.
@@ -232,11 +231,40 @@ class Spot:
if self.mode and not self.mode_type: if self.mode and not self.mode_type:
self.mode_type = lookup_helper.infer_mode_type_from_mode(self.mode) self.mode_type = lookup_helper.infer_mode_type_from_mode(self.mode)
# If we have a latitude at this point, it can only have been provided by the spot itself
if self.dx_latitude:
self.dx_location_source = "SPOT"
# Fetch SIG data. In case a particular API doesn't provide a full set of name, lat, lon & grid for a reference
# in its initial call, we use this code to populate the rest of the data. This includes working out grid refs
# from WAB and WAI, which count as a SIG even though there's no real lookup, just maths
if self.sig_refs and len(self.sig_refs) > 0:
for sig_ref in self.sig_refs:
lookup_data = get_sig_ref_info(sig_ref.sig, sig_ref.id)
if lookup_data:
# Update the sig_ref data from the lookup
sig_ref.__dict__.update(lookup_data.__dict__)
# If the spot itself doesn't have location yet, but the SIG ref does, extract it
if lookup_data.grid and not self.dx_grid:
self.dx_grid = lookup_data.grid
if lookup_data.latitude and not self.dx_latitude:
self.dx_latitude = lookup_data.latitude
self.dx_longitude = lookup_data.longitude
if self.sig == "WAB" or self.sig == "WAI":
self.dx_location_source = "WAB/WAI GRID"
else:
self.dx_location_source = "SIG REF LOOKUP"
# If the spot itself doesn't have a SIG yet, but we have at least one SIG reference, take that reference's SIG
# and apply it to the whole spot.
if self.sig_refs and len(self.sig_refs) > 0 and not self.sig:
self.sig = self.sig_refs[0].sig
# Icon from SIG # Icon from SIG
if self.sig and not self.icon: if self.sig and not self.icon:
self.icon = get_icon_for_sig(self.sig) self.icon = get_icon_for_sig(self.sig)
# DX Grid to lat/lon and vice versa # DX Grid to lat/lon and vice versa in case one is missing
if self.dx_grid and not self.dx_latitude: if self.dx_grid and not self.dx_latitude:
ll = locator_to_latlong(self.dx_grid) ll = locator_to_latlong(self.dx_grid)
self.dx_latitude = ll[0] self.dx_latitude = ll[0]
@@ -246,21 +274,6 @@ class Spot:
self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8) self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8)
except: except:
logging.debug("Invalid lat/lon received for spot") logging.debug("Invalid lat/lon received for spot")
if self.dx_latitude:
self.dx_location_source = "SPOT"
# WAB/WAI grid to lat/lon
if not self.dx_latitude and self.sig and self.sig_refs and len(self.sig_refs) > 0 and (
self.sig == "WAB" or self.sig == "WAI"):
ll = wab_wai_square_to_lat_lon(self.sig_refs[0])
if ll:
self.dx_latitude = ll[0]
self.dx_longitude = ll[1]
try:
self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8)
except:
logging.debug("Invalid lat/lon received from WAB/WAI grid")
self.dx_location_source = "WAB/WAI GRID"
# QRT comment detection # QRT comment detection
if self.comment and not self.qrt: if self.comment and not self.qrt:
@@ -290,8 +303,9 @@ class Spot:
# DX Location is "good" if it is from a spot, or from QRZ if the callsign doesn't contain a slash, so the operator # DX Location is "good" if it is from a spot, or from QRZ if the callsign doesn't contain a slash, so the operator
# is likely at home. # is likely at home.
self.dx_location_good = self.dx_location_source == "SPOT" or self.dx_location_source == "WAB/WAI GRID" or ( self.dx_location_good = (self.dx_location_source == "SPOT" or self.dx_location_source == "SIG REF LOOKUP"
self.dx_location_source == "QRZ" and not "/" in self.dx_call) or self.dx_location_source == "WAB/WAI GRID"
or (self.dx_location_source == "QRZ" and not "/" in self.dx_call))
# DE with no digits and APRS servers starting "T2" are not things we can look up location for # DE with no digits and APRS servers starting "T2" are not things we can look up location for
if any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"): if any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"):

View File

@@ -106,29 +106,28 @@ class WebServer:
try: try:
# Reject if no sig or sig_ref # Reject if no sig or sig_ref
query = bottle.request.query query = bottle.request.query
if not "sig" in query.keys() or not "sig_ref_id" in query.keys(): if not "sig" in query.keys() or not "id" in query.keys():
response.content_type = 'application/json' response.content_type = 'application/json'
response.status = 422 response.status = 422
return json.dumps("Error - sig and sig_ref_id must be provided", default=serialize_everything) return json.dumps("Error - sig and id must be provided", default=serialize_everything)
sig = query.get("sig").upper()
id = query.get("id").upper()
# Reject if sig unknown
if not sig in list(map(lambda p: p.name, SIGS)):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - sig '" + sig + "' is not known.", default=serialize_everything)
# Reject if sig_ref format incorrect for sig # Reject if sig_ref format incorrect for sig
sig = query.get("sig") if get_ref_regex_for_sig(sig) and not re.match(get_ref_regex_for_sig(sig), id):
sig_ref_id = query.get("sig_ref_id")
if get_ref_regex_for_sig(sig) and not re.match(get_ref_regex_for_sig(sig), sig_ref_id):
response.content_type = 'application/json' response.content_type = 'application/json'
response.status = 422 response.status = 422
return json.dumps("Error - '" + sig_ref_id + "' does not look like a valid reference for " + sig + ".", default=serialize_everything) return json.dumps("Error - '" + id + "' does not look like a valid reference ID for " + sig + ".", default=serialize_everything)
data = get_sig_ref_info(sig, sig_ref_id) data = get_sig_ref_info(sig, id)
# 404 if we don't have any data
if not data:
response.content_type = 'application/json'
response.status = 404
return json.dumps("Error - could not find any data for this reference.", default=serialize_everything)
# Return success
return self.serve_api(data) return self.serve_api(data)
except Exception as e: except Exception as e:
logging.error(e) logging.error(e)
response.content_type = 'application/json' response.content_type = 'application/json'

View File

@@ -8,7 +8,7 @@ import pytz
import telnetlib3 import telnetlib3
from core.config import SERVER_OWNER_CALLSIGN from core.config import SERVER_OWNER_CALLSIGN
from core.sig_utils import ANY_SIG_REGEX, get_icon_for_sig, get_ref_regex_for_sig from core.sig_utils import ANY_SIG_REGEX, get_ref_regex_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
from spotproviders.spot_provider import SpotProvider from spotproviders.spot_provider import SpotProvider
@@ -85,12 +85,11 @@ class DXCluster(SpotProvider):
sig_match = re.search(r"(^|\W)" + ANY_SIG_REGEX + r"($|\W)", spot.comment, re.IGNORECASE) sig_match = re.search(r"(^|\W)" + ANY_SIG_REGEX + r"($|\W)", spot.comment, re.IGNORECASE)
if sig_match: if sig_match:
spot.sig = sig_match.group(2).upper() spot.sig = sig_match.group(2).upper()
spot.icon = get_icon_for_sig(spot.sig)
ref_regex = get_ref_regex_for_sig(spot.sig) ref_regex = get_ref_regex_for_sig(spot.sig)
if ref_regex: if ref_regex:
sig_ref_match = re.search(r"(^|\W)" + spot.sig + r"($|\W)(" + ref_regex + r")($|\W)", spot.comment, re.IGNORECASE) sig_ref_match = re.search(r"(^|\W)" + spot.sig + r"($|\W)(" + ref_regex + r")($|\W)", spot.comment, re.IGNORECASE)
if sig_ref_match: if sig_ref_match:
spot.sig_refs = [SIGRef(id=sig_ref_match.group(3).upper())] spot.sig_refs = [SIGRef(id=sig_ref_match.group(3).upper(), sig=spot.sig)]
# Add to our list # Add to our list
self.submit(spot) self.submit(spot)

View File

@@ -34,7 +34,7 @@ class GMA(HTTPSpotProvider):
mode=source_spot["MODE"].upper() if "<>" not in source_spot["MODE"] else None, mode=source_spot["MODE"].upper() if "<>" not in source_spot["MODE"] else None,
# Filter out some weird mode strings # Filter out some weird mode strings
comment=source_spot["TEXT"], comment=source_spot["TEXT"],
sig_refs=[SIGRef(id=source_spot["REF"], name=source_spot["NAME"], url="https://www.cqgma.org/zinfo.php?ref=" + source_spot["REF"])], sig_refs=[SIGRef(id=source_spot["REF"], sig="", name=source_spot["NAME"])],
time=datetime.strptime(source_spot["DATE"] + source_spot["TIME"], "%Y%m%d%H%M").replace( time=datetime.strptime(source_spot["DATE"] + source_spot["TIME"], "%Y%m%d%H%M").replace(
tzinfo=pytz.UTC).timestamp(), tzinfo=pytz.UTC).timestamp(),
dx_latitude=float(source_spot["LAT"]) if (source_spot["LAT"] and source_spot["LAT"] != "") else None, dx_latitude=float(source_spot["LAT"]) if (source_spot["LAT"] and source_spot["LAT"] != "") else None,
@@ -54,22 +54,21 @@ class GMA(HTTPSpotProvider):
if ref_info["reftype"] not in ["POTA", "WWFF"] and (ref_info["reftype"] != "Summit" or ref_info["sota"] == ""): if ref_info["reftype"] not in ["POTA", "WWFF"] and (ref_info["reftype"] != "Summit" or ref_info["sota"] == ""):
match ref_info["reftype"]: match ref_info["reftype"]:
case "Summit": case "Summit":
spot.sig = "GMA" spot.sig_refs[0].sig = "GMA"
case "IOTA Island": case "IOTA Island":
spot.sig = "IOTA" spot.sig_refs[0].sig = "IOTA"
case "Lighthouse (ILLW)": case "Lighthouse (ILLW)":
spot.sig = "ILLW" spot.sig_refs[0].sig = "ILLW"
case "Lighthouse (ARLHS)": case "Lighthouse (ARLHS)":
spot.sig = "ARLHS" spot.sig_refs[0].sig = "ARLHS"
case "Castle": case "Castle":
spot.sig = "WCA" spot.sig_refs[0].sig = "WCA"
case "Mill": case "Mill":
spot.sig = "MOTA" spot.sig_refs[0].sig = "MOTA"
case _: case _:
logging.warn("GMA spot found with ref type " + ref_info[ logging.warn("GMA spot found with ref type " + ref_info[
"reftype"] + ", developer needs to add support for this!") "reftype"] + ", developer needs to add support for this!")
spot.sig = ref_info["reftype"] spot.sig_refs[0].sig = ref_info["reftype"]
spot.icon = get_icon_for_sig(spot.sig)
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us. # that for us.

View File

@@ -53,9 +53,7 @@ class HEMA(HTTPSpotProvider):
freq=float(freq_mode_match.group(1)) * 1000000, freq=float(freq_mode_match.group(1)) * 1000000,
mode=freq_mode_match.group(2).upper(), mode=freq_mode_match.group(2).upper(),
comment=spotter_comment_match.group(2), comment=spotter_comment_match.group(2),
sig="HEMA", sig_refs=[SIGRef(id=spot_items[3].upper(), sig="HEMA", name=spot_items[4])],
sig_refs=[SIGRef(id=spot_items[3].upper(), name=spot_items[4])],
icon=get_icon_for_sig("HEMA"),
time=datetime.strptime(spot_items[0], "%d/%m/%Y %H:%M").replace(tzinfo=pytz.UTC).timestamp(), time=datetime.strptime(spot_items[0], "%d/%m/%Y %H:%M").replace(tzinfo=pytz.UTC).timestamp(),
dx_latitude=float(spot_items[7]), dx_latitude=float(spot_items[7]),
dx_longitude=float(spot_items[8])) dx_longitude=float(spot_items[8]))

View File

@@ -1,12 +1,9 @@
import csv
import logging import logging
import re import re
from datetime import datetime from datetime import datetime
import pytz import pytz
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -36,9 +33,7 @@ class ParksNPeaks(HTTPSpotProvider):
# Seen PNP spots with empty frequency, and with comma-separated thousands digits # Seen PNP spots with empty frequency, and with comma-separated thousands digits
mode=source_spot["actMode"].upper(), mode=source_spot["actMode"].upper(),
comment=source_spot["actComments"], comment=source_spot["actComments"],
sig=source_spot["actClass"].upper(), sig_refs=[SIGRef(id=source_spot["actSiteID"], sig=source_spot["actClass"].upper())],
sig_refs=[SIGRef(id=source_spot["actSiteID"])],
icon=get_icon_for_sig(source_spot["actClass"]),
time=datetime.strptime(source_spot["actTime"], "%Y-%m-%d %H:%M:%S").replace( time=datetime.strptime(source_spot["actTime"], "%Y-%m-%d %H:%M:%S").replace(
tzinfo=pytz.UTC).timestamp()) tzinfo=pytz.UTC).timestamp())
@@ -52,24 +47,11 @@ class ParksNPeaks(HTTPSpotProvider):
spot.de_call = m.group(1) spot.de_call = m.group(1)
# Log a warning for the developer if PnP gives us an unknown programme we've never seen before # Log a warning for the developer if PnP gives us an unknown programme we've never seen before
if spot.sig not in ["POTA", "SOTA", "WWFF", "SIOTA", "ZLOTA", "KRMNPA"]: if spot.sig_refs[0].sig not in ["POTA", "SOTA", "WWFF", "SIOTA", "ZLOTA", "KRMNPA"]:
logging.warn("PNP spot found with sig " + spot.sig + ", developer needs to add support for this!") logging.warn("PNP spot found with sig " + spot.sig + ", developer needs to add support for this!")
# SiOTA lat/lon/grid lookup
if spot.sig == "SIOTA":
siota_csv_data = SEMI_STATIC_URL_DATA_CACHE.get(self.SIOTA_LIST_URL, headers=HTTP_HEADERS)
siota_dr = csv.DictReader(siota_csv_data.content.decode().splitlines())
for row in siota_dr:
if row["SILO_CODE"] == spot.sig_refs[0]:
spot.dx_latitude = float(row["LAT"])
spot.dx_longitude = float(row["LNG"])
spot.dx_grid = row["LOCATOR"]
break
# Note there is currently no support for KRMNPA location lookup, see issue #61.
# If this is POTA, SOTA, WWFF or ZLOTA data we already have it through other means, so ignore. Otherwise, # If this is POTA, SOTA, WWFF or ZLOTA data we already have it through other means, so ignore. Otherwise,
# add to the spot list. # add to the spot list.
if spot.sig not in ["POTA", "SOTA", "WWFF", "ZLOTA"]: if spot.sig_refs[0].sig not in ["POTA", "SOTA", "WWFF", "ZLOTA"]:
new_spots.append(spot) new_spots.append(spot)
return new_spots return new_spots

View File

@@ -3,8 +3,6 @@ from datetime import datetime
import pytz import pytz
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig, get_ref_regex_for_sig from core.sig_utils import get_icon_for_sig, get_ref_regex_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -33,9 +31,7 @@ class POTA(HTTPSpotProvider):
freq=float(source_spot["frequency"]) * 1000, freq=float(source_spot["frequency"]) * 1000,
mode=source_spot["mode"].upper(), mode=source_spot["mode"].upper(),
comment=source_spot["comments"], comment=source_spot["comments"],
sig="POTA", sig_refs=[SIGRef(id=source_spot["reference"], sig="POTA", name=source_spot["name"])],
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["name"], url="https://pota.app/#/park/" + source_spot["reference"])],
icon=get_icon_for_sig("POTA"),
time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace( time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(
tzinfo=pytz.UTC).timestamp(), tzinfo=pytz.UTC).timestamp(),
dx_grid=source_spot["grid6"], dx_grid=source_spot["grid6"],
@@ -46,16 +42,7 @@ class POTA(HTTPSpotProvider):
all_comment_refs = re.findall(get_ref_regex_for_sig("POTA"), spot.comment) all_comment_refs = re.findall(get_ref_regex_for_sig("POTA"), spot.comment)
for r in all_comment_refs: for r in all_comment_refs:
if r not in list(map(lambda ref: ref.id, spot.sig_refs)): if r not in list(map(lambda ref: ref.id, spot.sig_refs)):
ref = SIGRef(id=r.upper(), url="https://pota.app/#/park/" + r.upper()) spot.sig_refs.append(SIGRef(id=r.upper(), sig="POTA"))
# Now we need to look up the name of that reference from the API, because the comment won't have it
park_response = SEMI_STATIC_URL_DATA_CACHE.get(self.PARK_URL_ROOT + r.upper(), headers=HTTP_HEADERS)
park_data = park_response.json()
if park_data and "name" in park_data:
ref.name = park_data["name"]
# Finally append our new reference to the spot's reference list
spot.sig_refs.append(ref)
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us. # that for us.

View File

@@ -1,10 +1,7 @@
import logging from datetime import datetime
from datetime import datetime, timedelta
import requests import requests
from requests_cache import CachedSession
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.constants import HTTP_HEADERS from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
@@ -48,22 +45,10 @@ class SOTA(HTTPSpotProvider):
freq=(float(source_spot["frequency"]) * 1000000) if (source_spot["frequency"] is not None) else None, # Seen SOTA spots with no frequency! freq=(float(source_spot["frequency"]) * 1000000) if (source_spot["frequency"] is not None) else None, # Seen SOTA spots with no frequency!
mode=source_spot["mode"].upper(), mode=source_spot["mode"].upper(),
comment=source_spot["comments"], comment=source_spot["comments"],
sig="SOTA", sig_refs=[SIGRef(id=source_spot["summitCode"], sig="SOTA", name=source_spot["summitName"])],
sig_refs=[SIGRef(id=source_spot["summitCode"], name=source_spot["summitName"], url="https://www.sotadata.org.uk/en/summit/" + source_spot["summitCode"])],
icon=get_icon_for_sig("SOTA"),
time=datetime.fromisoformat(source_spot["timeStamp"]).timestamp(), time=datetime.fromisoformat(source_spot["timeStamp"]).timestamp(),
activation_score=source_spot["points"]) activation_score=source_spot["points"])
# SOTA doesn't give summit lat/lon/grid in the main call, so we need another separate call for this
try:
summit_response = SEMI_STATIC_URL_DATA_CACHE.get(self.SUMMIT_URL_ROOT + source_spot["summitCode"], headers=HTTP_HEADERS)
summit_data = summit_response.json()
spot.dx_grid = summit_data["locator"]
spot.dx_latitude = summit_data["latitude"]
spot.dx_longitude = summit_data["longitude"]
except Exception:
logging.warn("Looking up summit " + source_spot["summitCode"] + " from the SOTA API failed. No summit data was available.")
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us. # that for us.
new_spots.append(spot) new_spots.append(spot)

View File

@@ -3,8 +3,6 @@ from datetime import datetime
import pytz import pytz
from rss_parser import RSSParser from rss_parser import RSSParser
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -66,21 +64,8 @@ class WOTA(HTTPSpotProvider):
freq=freq_hz, freq=freq_hz,
mode=mode, mode=mode,
comment=comment, comment=comment,
sig="WOTA", sig_refs=[SIGRef(id=ref, sig="WOTA", name=ref_name)] if ref else [],
sig_refs=[SIGRef(id=ref, name=ref_name, url="https://www.wota.org.uk/MM_" + ref)] if ref else [],
icon=get_icon_for_sig("WOTA"),
time=time.timestamp()) time=time.timestamp())
# WOTA name/grid/lat/lon lookup
if ref:
wota_data = SEMI_STATIC_URL_DATA_CACHE.get(self.LIST_URL, headers=HTTP_HEADERS).json()
for feature in wota_data["features"]:
if feature["properties"]["wotaId"] == ref:
spot.sig_refs[0].name = feature["properties"]["title"]
spot.dx_latitude = feature["geometry"]["coordinates"][1]
spot.dx_longitude = feature["geometry"]["coordinates"][0]
spot.dx_grid = feature["properties"]["qthLocator"]
break
new_spots.append(spot) new_spots.append(spot)
return new_spots return new_spots

View File

@@ -20,10 +20,7 @@ class WWBOTA(SSESpotProvider):
# n-fer activations. # n-fer activations.
refs = [] refs = []
for ref in source_spot["references"]: for ref in source_spot["references"]:
sigref = SIGRef(id=ref["reference"], name=ref["name"]) sigref = SIGRef(id=ref["reference"], sig="WWBOTA", name=ref["name"])
# Bunkerbase URLs only work for UK bunkers, so only add a URL if we have a B/G prefix.
if ref["reference"].startswith("B/G"):
sigref.url="https://bunkerwiki.org/?s=" + ref["reference"]
refs.append(sigref) refs.append(sigref)
spot = Spot(source=self.name, spot = Spot(source=self.name,
@@ -32,9 +29,7 @@ class WWBOTA(SSESpotProvider):
freq=float(source_spot["freq"]) * 1000000, freq=float(source_spot["freq"]) * 1000000,
mode=source_spot["mode"].upper(), mode=source_spot["mode"].upper(),
comment=source_spot["comment"], comment=source_spot["comment"],
sig="WWBOTA",
sig_refs=refs, sig_refs=refs,
icon=get_icon_for_sig("WWBOTA"),
time=datetime.fromisoformat(source_spot["time"]).timestamp(), time=datetime.fromisoformat(source_spot["time"]).timestamp(),
# WWBOTA spots can contain multiple references for bunkers being activated simultaneously. For # WWBOTA spots can contain multiple references for bunkers being activated simultaneously. For
# now, we will just pick the first one to use as our grid, latitude and longitude. # now, we will just pick the first one to use as our grid, latitude and longitude.

View File

@@ -28,9 +28,7 @@ class WWFF(HTTPSpotProvider):
freq=float(source_spot["frequency_khz"]) * 1000, freq=float(source_spot["frequency_khz"]) * 1000,
mode=source_spot["mode"].upper(), mode=source_spot["mode"].upper(),
comment=source_spot["remarks"], comment=source_spot["remarks"],
sig="WWFF", sig_refs=[SIGRef(id=source_spot["reference"], sig="WWFF", name=source_spot["reference_name"])],
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["reference_name"], url="https://wwff.co/directory/?showRef=" + source_spot["reference"])],
icon=get_icon_for_sig("WWFF"),
time=datetime.fromtimestamp(source_spot["spot_time"], tz=pytz.UTC).timestamp(), time=datetime.fromtimestamp(source_spot["spot_time"], tz=pytz.UTC).timestamp(),
dx_latitude=source_spot["latitude"], dx_latitude=source_spot["latitude"],
dx_longitude=source_spot["longitude"]) dx_longitude=source_spot["longitude"])

View File

@@ -2,8 +2,6 @@ from datetime import datetime
import pytz import pytz
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -36,18 +34,8 @@ class ZLOTA(HTTPSpotProvider):
freq=freq_hz, freq=freq_hz,
mode=source_spot["mode"].upper().strip(), mode=source_spot["mode"].upper().strip(),
comment=source_spot["comments"], comment=source_spot["comments"],
sig="ZLOTA", sig_refs=[SIGRef(id=source_spot["reference"], sig="ZLOTA", name=source_spot["name"])],
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["name"])],
icon=get_icon_for_sig("ZLOTA"),
time=datetime.fromisoformat(source_spot["referenced_time"]).astimezone(pytz.UTC).timestamp()) time=datetime.fromisoformat(source_spot["referenced_time"]).astimezone(pytz.UTC).timestamp())
# ZLOTA lat/lon lookup
zlota_data = SEMI_STATIC_URL_DATA_CACHE.get(self.LIST_URL, headers=HTTP_HEADERS).json()
for asset in zlota_data:
if asset["code"] == spot.sig_refs[0]:
spot.dx_latitude = asset["y"]
spot.dx_longitude = asset["x"]
break
new_spots.append(spot) new_spots.append(spot)
return new_spots return new_spots

View File

@@ -493,7 +493,7 @@ paths:
tags: tags:
- Utilities - Utilities
summary: Look up SIG Ref details summary: Look up SIG Ref details
description: Perform a lookup of data about a certain reference, providing the SIG and the ID of the reference. description: Perform a lookup of data about a certain reference, providing the SIG and the ID of the reference. A SIGRef structure will be returned containing the SIG and ID, plus any other information Spothole could find about it.
operationId: sigref operationId: sigref
parameters: parameters:
- name: sig - name: sig
@@ -520,7 +520,7 @@ paths:
- WAB - WAB
- WAI - WAI
example: POTA example: POTA
- name: sig_ref_id - name: id
in: query in: query
description: ID of a reference in that SIG description: ID of a reference in that SIG
required: true required: true
@@ -532,37 +532,9 @@ paths:
content: content:
application/json: application/json:
schema: schema:
type: object $ref: '#/components/schemas/SIGRef'
properties:
name:
type: string
description: SIG reference name
example: Null Country Park
url:
type: string
description: SIG reference URL, which the user can look up for more information
example: "https://pota.app/#/park/GB-0001"
grid:
type: string
description: Maidenhead grid locator for the reference.
example: IO91aa
latitude:
type: number
description: Latitude of the reference, in degrees.
example: 51.2345
longitude:
type: number
description: Longitude of the reference, in degrees.
example: -1.2345
'404':
description: Reference not found, or SIG not supported
content:
application/json:
schema:
type: string
example: "Failed"
'422': '422':
description: Validation error description: Validation error e.g. SIG not supported or reference format incorrect
content: content:
application/json: application/json:
schema: schema:
@@ -624,6 +596,28 @@ components:
type: string type: string
description: SIG reference ID. description: SIG reference ID.
example: GB-0001 example: GB-0001
sig:
type: string
description: SIG that this reference is in.
enum:
- POTA
- SOTA
- WWFF
- WWBOTA
- GMA
- HEMA
- WCA
- MOTA
- SIOTA
- ARLHS
- ILLW
- ZLOTA
- IOTA
- WOTA
- BOTA
- WAB
- WAI
example: POTA
name: name:
type: string type: string
description: SIG reference name description: SIG reference name
@@ -632,6 +626,18 @@ components:
type: string type: string
description: SIG reference URL, which the user can look up for more information description: SIG reference URL, which the user can look up for more information
example: "https://pota.app/#/park/GB-0001" example: "https://pota.app/#/park/GB-0001"
grid:
type: string
description: Maidenhead grid locator for the reference, if known.
example: IO91aa
latitude:
type: number
description: Latitude of the reference, in degrees, if known.
example: 51.2345
longitude:
type: number
description: Longitude of the reference, in degrees, if known.
example: -1.2345
Spot: Spot:
type: object type: object
@@ -698,9 +704,10 @@ components:
example: -1.2345 example: -1.2345
dx_location_source: dx_location_source:
type: string type: string
description: Where we got the DX location (grid/latitude/longitude) from. If this was from the spot itself, it's likely quite accurate, but if we had to fall back to QRZ lookup, or even a location based on the DXCC itself, it will be a lot less accurate. description: Where we got the DX location (grid/latitude/longitude) from. If this was from the spot itself, or from a lookup of the SIG ref (e.g. park) it's likely quite accurate, but if we had to fall back to QRZ lookup, or even a location based on the DXCC itself, it will be a lot less accurate.
enum: enum:
- SPOT - SPOT
- "SIG REF LOOKUP"
- "WAB/WAI GRID" - "WAB/WAI GRID"
- QRZ - QRZ
- DXCC - DXCC
@@ -708,7 +715,7 @@ components:
example: SPOT example: SPOT
dx_location_good: dx_location_good:
type: boolean type: boolean
description: Does the software think the location is good enough to put a marker on a map? This is true if the source is "SPOT" or "WAB/WAI GRID", or alternatively if the source is "QRZ" and the callsign doesn't have a slash in it (i.e. operator likely at home). description: Does the software think the location is good enough to put a marker on a map? This is true if the source is "SPOT", "SIG REF LOOKUP" or "WAB/WAI GRID", or alternatively if the source is "QRZ" and the callsign doesn't have a slash in it (i.e. operator likely at home).
example: true example: true
de_call: de_call:
type: string type: string