Compare commits

...

10 Commits

28 changed files with 708 additions and 299 deletions

View File

@@ -38,9 +38,7 @@ class BOTA(HTTPAlertProvider):
# Convert to our alert format # Convert to our alert format
alert = Alert(source=self.name, alert = Alert(source=self.name,
dx_calls=[dx_call], dx_calls=[dx_call],
sig="BOTA", sig_refs=[SIGRef(id=ref_name, sig="BOTA")],
sig_refs=[SIGRef(id=ref_name, name=ref_name, url="https://www.beachesontheair.com/beaches/" + ref_name.lower().replace(" ", "-"))],
icon=get_icon_for_sig("BOTA"),
start_time=date_time.timestamp(), start_time=date_time.timestamp(),
is_dxpedition=False) is_dxpedition=False)

View File

@@ -22,6 +22,7 @@ class ParksNPeaks(HTTPAlertProvider):
# Iterate through source data # Iterate through source data
for source_alert in http_response.json(): for source_alert in http_response.json():
# Calculate some things # Calculate some things
sig = source_alert["Class"]
if " - " in source_alert["Location"]: if " - " in source_alert["Location"]:
split = source_alert["Location"].split(" - ") split = source_alert["Location"].split(" - ")
sig_ref = split[0] sig_ref = split[0]
@@ -38,19 +39,17 @@ class ParksNPeaks(HTTPAlertProvider):
dx_calls=[source_alert["CallSign"].upper()], dx_calls=[source_alert["CallSign"].upper()],
freqs_modes=source_alert["Freq"] + " " + source_alert["MODE"], freqs_modes=source_alert["Freq"] + " " + source_alert["MODE"],
comment=source_alert["Comments"], comment=source_alert["Comments"],
sig=source_alert["Class"], sig_refs=[SIGRef(id=sig_ref, sig=sig, name=sig_ref_name)],
sig_refs=[SIGRef(id=sig_ref, name=sig_ref_name)],
icon=get_icon_for_sig(source_alert["Class"]),
start_time=start_time, start_time=start_time,
is_dxpedition=False) is_dxpedition=False)
# Log a warning for the developer if PnP gives us an unknown programme we've never seen before # Log a warning for the developer if PnP gives us an unknown programme we've never seen before
if alert.sig not in ["POTA", "SOTA", "WWFF", "SiOTA", "ZLOTA", "KRMNPA"]: if sig and sig not in ["POTA", "SOTA", "WWFF", "SiOTA", "ZLOTA", "KRMNPA"]:
logging.warn("PNP alert found with sig " + alert.sig + ", developer needs to add support for this!") logging.warn("PNP alert found with sig " + sig + ", developer needs to add support for this!")
# If this is POTA, SOTA or WWFF data we already have it through other means, so ignore. Otherwise, add to # If this is POTA, SOTA or WWFF data we already have it through other means, so ignore. Otherwise, add to
# the alert list. Note that while ZLOTA has its own spots API, it doesn't have its own alerts API. So that # the alert list. Note that while ZLOTA has its own spots API, it doesn't have its own alerts API. So that
# means the PnP *spot* provider rejects ZLOTA spots here, but the PnP *alerts* provider here allows ZLOTA. # means the PnP *spot* provider rejects ZLOTA spots here, but the PnP *alerts* provider here allows ZLOTA.
if alert.sig not in ["POTA", "SOTA", "WWFF"]: if sig not in ["POTA", "SOTA", "WWFF"]:
new_alerts.append(alert) new_alerts.append(alert)
return new_alerts return new_alerts

View File

@@ -26,9 +26,7 @@ class POTA(HTTPAlertProvider):
dx_calls=[source_alert["activator"].upper()], dx_calls=[source_alert["activator"].upper()],
freqs_modes=source_alert["frequencies"], freqs_modes=source_alert["frequencies"],
comment=source_alert["comments"], comment=source_alert["comments"],
sig="POTA", sig_refs=[SIGRef(id=source_alert["reference"], sig="POTA", name=source_alert["name"], url="https://pota.app/#/park/" + source_alert["reference"])],
sig_refs=[SIGRef(id=source_alert["reference"], name=source_alert["name"], url="https://pota.app/#/park/" + source_alert["reference"])],
icon=get_icon_for_sig("POTA"),
start_time=datetime.strptime(source_alert["startDate"] + source_alert["startTime"], start_time=datetime.strptime(source_alert["startDate"] + source_alert["startTime"],
"%Y-%m-%d%H:%M").replace(tzinfo=pytz.UTC).timestamp(), "%Y-%m-%d%H:%M").replace(tzinfo=pytz.UTC).timestamp(),
end_time=datetime.strptime(source_alert["endDate"] + source_alert["endTime"], end_time=datetime.strptime(source_alert["endDate"] + source_alert["endTime"],

View File

@@ -27,9 +27,7 @@ class SOTA(HTTPAlertProvider):
dx_names=[source_alert["activatorName"].upper()], dx_names=[source_alert["activatorName"].upper()],
freqs_modes=source_alert["frequency"], freqs_modes=source_alert["frequency"],
comment=source_alert["comments"], comment=source_alert["comments"],
sig="SOTA", sig_refs=[SIGRef(id=source_alert["associationCode"] + "/" + source_alert["summitCode"], sig="SOTA", name=source_alert["summitDetails"])],
sig_refs=[SIGRef(id=source_alert["associationCode"] + "/" + source_alert["summitCode"], name=source_alert["summitDetails"], url="https://www.sotadata.org.uk/en/summit/" + source_alert["summitCode"])],
icon=get_icon_for_sig("SOTA"),
start_time=datetime.strptime(source_alert["dateActivated"], start_time=datetime.strptime(source_alert["dateActivated"],
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.UTC).timestamp(), "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.UTC).timestamp(),
is_dxpedition=False) is_dxpedition=False)

View File

@@ -54,9 +54,7 @@ class WOTA(HTTPAlertProvider):
dx_calls=[dx_call], dx_calls=[dx_call],
freqs_modes=freqs_modes, freqs_modes=freqs_modes,
comment=comment, comment=comment,
sig="WOTA", sig_refs=[SIGRef(id=ref, sig="WOTA", name=ref_name)] if ref else [],
sig_refs=[SIGRef(id=ref, name=ref_name, url="https://www.wota.org.uk/MM_" + ref)] if ref else [],
icon=get_icon_for_sig("WOTA"),
start_time=time.timestamp()) start_time=time.timestamp())
# Add to our list. # Add to our list.

View File

@@ -26,9 +26,7 @@ class WWFF(HTTPAlertProvider):
dx_calls=[source_alert["activator_call"].upper()], dx_calls=[source_alert["activator_call"].upper()],
freqs_modes=source_alert["band"] + " " + source_alert["mode"], freqs_modes=source_alert["band"] + " " + source_alert["mode"],
comment=source_alert["remarks"], comment=source_alert["remarks"],
sig="WWFF", sig_refs=[SIGRef(id=source_alert["reference"], sig="WWFF")],
sig_refs=[SIGRef(id=source_alert["reference"], url="https://wwff.co/directory/?showRef=" + source_alert["reference"])],
icon=get_icon_for_sig("WWFF"),
start_time=datetime.strptime(source_alert["utc_start"], start_time=datetime.strptime(source_alert["utc_start"],
"%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC).timestamp(), "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC).timestamp(),
end_time=datetime.strptime(source_alert["utc_end"], end_time=datetime.strptime(source_alert["utc_end"],

View File

@@ -123,9 +123,13 @@ max-alert-age-sec: 604800
# Login for QRZ.com to look up information. Optional. You will need an "XML Subscriber" (paid) package to retrieve all # Login for QRZ.com to look up information. Optional. You will need an "XML Subscriber" (paid) package to retrieve all
# the data for a callsign via their system. # the data for a callsign via their system.
qrz-username: "N0CALL" qrz-username: ""
qrz-password: "" qrz-password: ""
# Login for HamQTH to look up information. Optional.
hamqth-username: ""
hamqth-password: ""
# API key for Clublog to look up information. Optional. You sill need to request one via their helpdesk portal if you # API key for Clublog to look up information. Optional. You sill need to request one via their helpdesk portal if you
# want to use callsign lookups from Clublog. # want to use callsign lookups from Clublog.
clublog-api-key: "" clublog-api-key: ""

10
core/cache_utils.py Normal file
View File

@@ -0,0 +1,10 @@
from datetime import timedelta
from requests_cache import CachedSession
# Cache for "semi-static" data such as the locations of parks, CSVs of reference lists, etc.
# This has an expiry time of 30 days, so will re-request from the source after that amount
# of time has passed. This is used throughout Spothole to cache data that does not change
# rapidly.
SEMI_STATIC_URL_DATA_CACHE = CachedSession("cache/semi_static_url_data_cache",
expire_after=timedelta(days=30))

View File

@@ -8,6 +8,7 @@ SOFTWARE_VERSION = "0.1"
# HTTP headers used for spot providers that use HTTP # HTTP headers used for spot providers that use HTTP
HTTP_HEADERS = {"User-Agent": SOFTWARE_NAME + ", v" + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")"} HTTP_HEADERS = {"User-Agent": SOFTWARE_NAME + ", v" + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")"}
HAMQTH_PRG = (SOFTWARE_NAME + " v" + SOFTWARE_VERSION + " operated by " + SERVER_OWNER_CALLSIGN).replace(" ", "_")
# Special Interest Groups # Special Interest Groups
SIGS = [ SIGS = [

View File

@@ -1,7 +1,9 @@
import gzip import gzip
import logging import logging
import urllib.parse
from datetime import timedelta from datetime import timedelta
import xmltodict
from diskcache import Cache from diskcache import Cache
from pyhamtools import LookupLib, Callinfo, callinfo from pyhamtools import LookupLib, Callinfo, callinfo
from pyhamtools.exceptions import APIKeyMissingError from pyhamtools.exceptions import APIKeyMissingError
@@ -9,9 +11,11 @@ from pyhamtools.frequency import freq_to_band
from pyhamtools.locator import latlong_to_locator from pyhamtools.locator import latlong_to_locator
from requests_cache import CachedSession from requests_cache import CachedSession
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.config import config from core.config import config
from core.constants import BANDS, UNKNOWN_BAND, CW_MODES, PHONE_MODES, DATA_MODES, ALL_MODES, \ from core.constants import BANDS, UNKNOWN_BAND, CW_MODES, PHONE_MODES, DATA_MODES, ALL_MODES, \
QRZCQ_CALLSIGN_LOOKUP_DATA, HTTP_HEADERS QRZCQ_CALLSIGN_LOOKUP_DATA, HTTP_HEADERS, HAMQTH_PRG
# Singleton class that provides lookup functionality. # Singleton class that provides lookup functionality.
class LookupHelper: class LookupHelper:
@@ -31,18 +35,24 @@ class LookupHelper:
self.QRZ_CALLSIGN_DATA_CACHE = None self.QRZ_CALLSIGN_DATA_CACHE = None
self.LOOKUP_LIB_QRZ = None self.LOOKUP_LIB_QRZ = None
self.QRZ_AVAILABLE = None self.QRZ_AVAILABLE = None
self.HAMQTH_AVAILABLE = None
self.HAMQTH_CALLSIGN_DATA_CACHE = None
self.HAMQTH_BASE_URL = "https://www.hamqth.com/xml.php"
# HamQTH session keys expire after an hour. Rather than working out how much time has passed manually, we cheat
# and cache the HTTP response for 55 minutes, so when the login URL is queried within 55 minutes of the previous
# time, you just get the cached response.
self.HAMQTH_SESSION_LOOKUP_CACHE = CachedSession("cache/hamqth_session_cache",
expire_after=timedelta(minutes=55))
self.CALL_INFO_BASIC = None self.CALL_INFO_BASIC = None
self.LOOKUP_LIB_BASIC = None self.LOOKUP_LIB_BASIC = None
self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION = None self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION = None
self.COUNTRY_FILES_CTY_PLIST_CACHE = None
def start(self): def start(self):
# Lookup helpers from pyhamtools. We use four (!) of these. The simplest is country-files.com, which downloads the data # Lookup helpers from pyhamtools. We use five (!) of these. The simplest is country-files.com, which downloads
# once on startup, and requires no login/key, but does not have the best coverage. # the data once on startup, and requires no login/key, but does not have the best coverage.
# If the user provides login details/API keys, we also set up helpers for QRZ.com, Clublog (live API request), and # If the user provides login details/API keys, we also set up helpers for QRZ.com, HamQTH, Clublog (live API
# Clublog (XML download). The lookup functions iterate through these in a sensible order, looking for suitable data. # request), and Clublog (XML download). The lookup functions iterate through these in a sensible order, looking
self.COUNTRY_FILES_CTY_PLIST_CACHE = CachedSession("cache/country_files_city_plist_cache", # for suitable data.
expire_after=timedelta(days=10))
self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION = "cache/cty.plist" self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION = "cache/cty.plist"
success = self.download_country_files_cty_plist() success = self.download_country_files_cty_plist()
if success: if success:
@@ -52,12 +62,15 @@ class LookupHelper:
self.LOOKUP_LIB_BASIC = LookupLib(lookuptype="countryfile") self.LOOKUP_LIB_BASIC = LookupLib(lookuptype="countryfile")
self.CALL_INFO_BASIC = Callinfo(self.LOOKUP_LIB_BASIC) self.CALL_INFO_BASIC = Callinfo(self.LOOKUP_LIB_BASIC)
self.QRZ_AVAILABLE = config["qrz-password"] != "" self.QRZ_AVAILABLE = config["qrz-username"] != "" and config["qrz-password"] != ""
if self.QRZ_AVAILABLE: if self.QRZ_AVAILABLE:
self.LOOKUP_LIB_QRZ = LookupLib(lookuptype="qrz", username=config["qrz-username"], self.LOOKUP_LIB_QRZ = LookupLib(lookuptype="qrz", username=config["qrz-username"],
pwd=config["qrz-password"]) pwd=config["qrz-password"])
self.QRZ_CALLSIGN_DATA_CACHE = Cache('cache/qrz_callsign_lookup_cache') self.QRZ_CALLSIGN_DATA_CACHE = Cache('cache/qrz_callsign_lookup_cache')
self.HAMQTH_AVAILABLE = config["hamqth-username"] != "" and config["hamqth-password"] != ""
self.HAMQTH_CALLSIGN_DATA_CACHE = Cache('cache/hamqth_callsign_lookup_cache')
self.CLUBLOG_API_KEY = config["clublog-api-key"] self.CLUBLOG_API_KEY = config["clublog-api-key"]
self.CLUBLOG_CTY_XML_CACHE = CachedSession("cache/clublog_cty_xml_cache", expire_after=timedelta(days=10)) self.CLUBLOG_CTY_XML_CACHE = CachedSession("cache/clublog_cty_xml_cache", expire_after=timedelta(days=10))
self.CLUBLOG_API_AVAILABLE = self.CLUBLOG_API_KEY != "" self.CLUBLOG_API_AVAILABLE = self.CLUBLOG_API_KEY != ""
@@ -78,8 +91,8 @@ class LookupHelper:
def download_country_files_cty_plist(self): def download_country_files_cty_plist(self):
try: try:
logging.info("Downloading Country-files.com cty.plist...") logging.info("Downloading Country-files.com cty.plist...")
response = self.COUNTRY_FILES_CTY_PLIST_CACHE.get("https://www.country-files.com/cty/cty.plist", response = SEMI_STATIC_URL_DATA_CACHE.get("https://www.country-files.com/cty/cty.plist",
headers=HTTP_HEADERS).text headers=HTTP_HEADERS).text
with open(self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION, "w") as f: with open(self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION, "w") as f:
f.write(response) f.write(response)
@@ -148,7 +161,12 @@ class LookupHelper:
qrz_data = self.get_qrz_data_for_callsign(call) qrz_data = self.get_qrz_data_for_callsign(call)
if qrz_data and "country" in qrz_data: if qrz_data and "country" in qrz_data:
country = qrz_data["country"] country = qrz_data["country"]
# Couldn't get anything from QRZ.com database, try Clublog data # Couldn't get anything from QRZ.com database, try HamQTH
if not country:
hamqth_data = self.get_hamqth_data_for_callsign(call)
if hamqth_data and "country" in hamqth_data:
country = hamqth_data["country"]
# Couldn't get anything from HamQTH database, try Clublog data
if not country: if not country:
clublog_data = self.get_clublog_xml_data_for_callsign(call) clublog_data = self.get_clublog_xml_data_for_callsign(call)
if clublog_data and "Name" in clublog_data: if clublog_data and "Name" in clublog_data:
@@ -166,7 +184,6 @@ class LookupHelper:
# Infer a DXCC ID from a callsign # Infer a DXCC ID from a callsign
def infer_dxcc_id_from_callsign(self, call): def infer_dxcc_id_from_callsign(self, call):
self.get_clublog_xml_data_for_callsign("M0TRT")
try: try:
# Start with the basic country-files.com-based decoder. # Start with the basic country-files.com-based decoder.
dxcc = self.CALL_INFO_BASIC.get_adif_id(call) dxcc = self.CALL_INFO_BASIC.get_adif_id(call)
@@ -177,7 +194,12 @@ class LookupHelper:
qrz_data = self.get_qrz_data_for_callsign(call) qrz_data = self.get_qrz_data_for_callsign(call)
if qrz_data and "adif" in qrz_data: if qrz_data and "adif" in qrz_data:
dxcc = qrz_data["adif"] dxcc = qrz_data["adif"]
# Couldn't get anything from QRZ.com database, try Clublog data # Couldn't get anything from QRZ.com database, try HamQTH
if not dxcc:
hamqth_data = self.get_hamqth_data_for_callsign(call)
if hamqth_data and "adif" in hamqth_data:
dxcc = hamqth_data["adif"]
# Couldn't get anything from HamQTH database, try Clublog data
if not dxcc: if not dxcc:
clublog_data = self.get_clublog_xml_data_for_callsign(call) clublog_data = self.get_clublog_xml_data_for_callsign(call)
if clublog_data and "DXCC" in clublog_data: if clublog_data and "DXCC" in clublog_data:
@@ -200,7 +222,12 @@ class LookupHelper:
continent = self.CALL_INFO_BASIC.get_continent(call) continent = self.CALL_INFO_BASIC.get_continent(call)
except (KeyError, ValueError) as e: except (KeyError, ValueError) as e:
continent = None continent = None
# Couldn't get anything from basic call info database, try Clublog data # Couldn't get anything from basic call info database, try HamQTH
if not continent:
hamqth_data = self.get_hamqth_data_for_callsign(call)
if hamqth_data and "continent" in hamqth_data:
country = hamqth_data["continent"]
# Couldn't get anything from HamQTH database, try Clublog data
if not continent: if not continent:
clublog_data = self.get_clublog_xml_data_for_callsign(call) clublog_data = self.get_clublog_xml_data_for_callsign(call)
if clublog_data and "Continent" in clublog_data: if clublog_data and "Continent" in clublog_data:
@@ -228,7 +255,12 @@ class LookupHelper:
qrz_data = self.get_qrz_data_for_callsign(call) qrz_data = self.get_qrz_data_for_callsign(call)
if qrz_data and "cqz" in qrz_data: if qrz_data and "cqz" in qrz_data:
cqz = qrz_data["cqz"] cqz = qrz_data["cqz"]
# Couldn't get anything from QRZ.com database, try Clublog data # Couldn't get anything from QRZ.com database, try HamQTH
if not cqz:
hamqth_data = self.get_hamqth_data_for_callsign(call)
if hamqth_data and "cq" in hamqth_data:
cqz = hamqth_data["cq"]
# Couldn't get anything from HamQTH database, try Clublog data
if not cqz: if not cqz:
clublog_data = self.get_clublog_xml_data_for_callsign(call) clublog_data = self.get_clublog_xml_data_for_callsign(call)
if clublog_data and "CQZ" in clublog_data: if clublog_data and "CQZ" in clublog_data:
@@ -256,13 +288,87 @@ class LookupHelper:
qrz_data = self.get_qrz_data_for_callsign(call) qrz_data = self.get_qrz_data_for_callsign(call)
if qrz_data and "ituz" in qrz_data: if qrz_data and "ituz" in qrz_data:
ituz = qrz_data["ituz"] ituz = qrz_data["ituz"]
# Couldn't get anything from QRZ.com database, Clublog doesn't provide this, so try QRZCQ data # Couldn't get anything from QRZ.com database, try HamQTH
if not ituz:
hamqth_data = self.get_hamqth_data_for_callsign(call)
if hamqth_data and "itu" in hamqth_data:
ituz = hamqth_data["itu"]
# Couldn't get anything from HamQTH database, Clublog doesn't provide this, so try QRZCQ data
if not ituz: if not ituz:
qrzcq_data = self.get_qrzcq_data_for_callsign(call) qrzcq_data = self.get_qrzcq_data_for_callsign(call)
if qrzcq_data and "ituz" in qrzcq_data: if qrzcq_data and "ituz" in qrzcq_data:
ituz = qrzcq_data["ituz"] ituz = qrzcq_data["ituz"]
return ituz return ituz
# Infer an operator name from a callsign (requires QRZ.com/HamQTH)
def infer_name_from_callsign(self, call):
data = self.get_qrz_data_for_callsign(call)
if data and "fname" in data:
name = data["fname"]
if "name" in data:
name = name + " " + data["name"]
return name
data = self.get_hamqth_data_for_callsign(call)
if data and "nick" in data:
return data["nick"]
else:
return None
# Infer a latitude and longitude from a callsign (requires QRZ.com/HamQTH)
def infer_latlon_from_callsign_qrz(self, call):
data = self.get_qrz_data_for_callsign(call)
if data and "latitude" in data and "longitude" in data:
return [data["latitude"], data["longitude"]]
data = self.get_hamqth_data_for_callsign(call)
if data and "latitude" in data and "longitude" in data:
return [data["latitude"], data["longitude"]]
else:
return None
# Infer a grid locator from a callsign (requires QRZ.com/HamQTH)
def infer_grid_from_callsign_qrz(self, call):
data = self.get_qrz_data_for_callsign(call)
if data and "locator" in data:
return data["locator"]
data = self.get_hamqth_data_for_callsign(call)
if data and "grid" in data:
return data["grid"]
else:
return None
# Infer a latitude and longitude from a callsign (using DXCC, probably very inaccurate)
def infer_latlon_from_callsign_dxcc(self, call):
try:
data = self.CALL_INFO_BASIC.get_lat_long(call)
if data and "latitude" in data and "longitude" in data:
loc = [data["latitude"], data["longitude"]]
else:
loc = None
except KeyError:
loc = None
# Couldn't get anything from basic call info database, try Clublog data
if not loc:
data = self.get_clublog_xml_data_for_callsign(call)
if data and "Lat" in data and "Lon" in data:
loc = [data["Lat"], data["Lon"]]
if not loc:
data = self.get_clublog_api_data_for_callsign(call)
if data and "Lat" in data and "Lon" in data:
loc = [data["Lat"], data["Lon"]]
return loc
# Infer a grid locator from a callsign (using DXCC, probably very inaccurate)
def infer_grid_from_callsign_dxcc(self, call):
latlon = self.infer_latlon_from_callsign_dxcc(call)
return latlong_to_locator(latlon[0], latlon[1], 8)
# Infer a mode from the frequency (in Hz) according to the band plan. Just a guess really.
def infer_mode_from_frequency(self, freq):
try:
return freq_to_band(freq / 1000.0)["mode"]
except KeyError:
return None
# Utility method to get QRZ.com data from cache if possible, if not get it from the API and cache it # Utility method to get QRZ.com data from cache if possible, if not get it from the API and cache it
def get_qrz_data_for_callsign(self, call): def get_qrz_data_for_callsign(self, call):
# Fetch from cache if we can, otherwise fetch from the API and cache it # Fetch from cache if we can, otherwise fetch from the API and cache it
@@ -286,6 +392,49 @@ class LookupHelper:
else: else:
return None return None
# Utility method to get HamQTH data from cache if possible, if not get it from the API and cache it
def get_hamqth_data_for_callsign(self, call):
# Fetch from cache if we can, otherwise fetch from the API and cache it
if call in self.HAMQTH_CALLSIGN_DATA_CACHE:
return self.HAMQTH_CALLSIGN_DATA_CACHE.get(call)
elif self.HAMQTH_AVAILABLE:
try:
# First we need to log in and get a session token.
session_data = self.HAMQTH_SESSION_LOOKUP_CACHE.get(
self.HAMQTH_BASE_URL + "?u=" + urllib.parse.quote_plus(config["hamqth-username"]) +
"&p=" + urllib.parse.quote_plus(config["hamqth-password"]), headers=HTTP_HEADERS).content
dict_data = xmltodict.parse(session_data)
if "session_id" in dict_data["HamQTH"]["session"]:
session_id = dict_data["HamQTH"]["session"]["session_id"]
# Now look up the actual data.
try:
lookup_data = SEMI_STATIC_URL_DATA_CACHE.get(
self.HAMQTH_BASE_URL + "?id=" + session_id + "&callsign=" + urllib.parse.quote_plus(
call) + "&prg=" + HAMQTH_PRG, headers=HTTP_HEADERS).content
data = xmltodict.parse(lookup_data)["HamQTH"]["search"]
self.HAMQTH_CALLSIGN_DATA_CACHE.add(call, data, expire=604800) # 1 week in seconds
return data
except (KeyError, ValueError):
# HamQTH had no info for the call, but maybe it had prefixes or suffixes. Try again with the base call.
try:
lookup_data = SEMI_STATIC_URL_DATA_CACHE.get(
self.HAMQTH_BASE_URL + "?id=" + session_id + "&callsign=" + urllib.parse.quote_plus(
callinfo.Callinfo.get_homecall(call)) + "&prg=" + HAMQTH_PRG, headers=HTTP_HEADERS).content
data = xmltodict.parse(lookup_data)["HamQTH"]["search"]
self.HAMQTH_CALLSIGN_DATA_CACHE.add(call, data, expire=604800) # 1 week in seconds
return data
except (KeyError, ValueError):
# HamQTH had no info for the call, that's OK. Cache a None so we don't try to look this up again
self.HAMQTH_CALLSIGN_DATA_CACHE.add(call, None, expire=604800) # 1 week in seconds
return None
else:
logging.warn("HamQTH login details incorrect, failed to look up with HamQTH.")
except:
logging.error("Exception when looking up HamQTH data")
return None
# Utility method to get Clublog API data from cache if possible, if not get it from the API and cache it # Utility method to get Clublog API data from cache if possible, if not get it from the API and cache it
def get_clublog_api_data_for_callsign(self, call): def get_clublog_api_data_for_callsign(self, call):
# Fetch from cache if we can, otherwise fetch from the API and cache it # Fetch from cache if we can, otherwise fetch from the API and cache it
@@ -334,70 +483,11 @@ class LookupHelper:
return entry return entry
return None return None
# Infer an operator name from a callsign (requires QRZ.com)
def infer_name_from_callsign(self, call):
data = self.get_qrz_data_for_callsign(call)
if data and "fname" in data:
name = data["fname"]
if "name" in data:
name = name + " " + data["name"]
return name
else:
return None
# Infer a latitude and longitude from a callsign (requires QRZ.com)
def infer_latlon_from_callsign_qrz(self, call):
data = self.get_qrz_data_for_callsign(call)
if data and "latitude" in data and "longitude" in data:
return [data["latitude"], data["longitude"]]
else:
return None
# Infer a grid locator from a callsign (requires QRZ.com)
def infer_grid_from_callsign_qrz(self, call):
data = self.get_qrz_data_for_callsign(call)
if data and "locator" in data:
return data["locator"]
else:
return None
# Infer a latitude and longitude from a callsign (using DXCC, probably very inaccurate)
def infer_latlon_from_callsign_dxcc(self, call):
try:
data = self.CALL_INFO_BASIC.get_lat_long(call)
if data and "latitude" in data and "longitude" in data:
loc = [data["latitude"], data["longitude"]]
else:
loc = None
except KeyError:
loc = None
# Couldn't get anything from basic call info database, try Clublog data
if not loc:
data = self.get_clublog_xml_data_for_callsign(call)
if data and "Lat" in data and "Lon" in data:
loc = [data["Lat"], data["Lon"]]
if not loc:
data = self.get_clublog_api_data_for_callsign(call)
if data and "Lat" in data and "Lon" in data:
loc = [data["Lat"], data["Lon"]]
return loc
# Infer a grid locator from a callsign (using DXCC, probably very inaccurate)
def infer_grid_from_callsign_dxcc(self, call):
latlon = self.infer_latlon_from_callsign_dxcc(call)
return latlong_to_locator(latlon[0], latlon[1], 8)
# Infer a mode from the frequency (in Hz) according to the band plan. Just a guess really.
def infer_mode_from_frequency(self, freq):
try:
return freq_to_band(freq / 1000.0)["mode"]
except KeyError:
return None
# Shutdown method to close down any caches neatly. # Shutdown method to close down any caches neatly.
def stop(self): def stop(self):
self.QRZ_CALLSIGN_DATA_CACHE.close() self.QRZ_CALLSIGN_DATA_CACHE.close()
self.CLUBLOG_CALLSIGN_DATA_CACHE.close() self.CLUBLOG_CALLSIGN_DATA_CACHE.close()
# Singleton object # Singleton object
lookup_helper = LookupHelper() lookup_helper = LookupHelper()

View File

@@ -1,4 +1,13 @@
from core.constants import SIGS import csv
import logging
from pyhamtools.locator import latlong_to_locator
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.constants import SIGS, HTTP_HEADERS
from core.geo_utils import wab_wai_square_to_lat_lon
from data.sig_ref import SIGRef
# Utility function to get the icon for a named SIG. If no match is found, the "circle-question" icon will be returned. # Utility function to get the icon for a named SIG. If no match is found, the "circle-question" icon will be returned.
def get_icon_for_sig(sig): def get_icon_for_sig(sig):
@@ -7,15 +16,109 @@ def get_icon_for_sig(sig):
return s.icon return s.icon
return "circle-question" return "circle-question"
# Utility function to get the regex string for a SIG reference for a named SIG. If no match is found, None will be returned. # Utility function to get the regex string for a SIG reference for a named SIG. If no match is found, None will be returned.
def get_ref_regex_for_sig(sig): def get_ref_regex_for_sig(sig):
for s in SIGS: for s in SIGS:
if s.name == sig: if s.name.upper() == sig.upper():
return s.ref_regex return s.ref_regex
return None return None
# Look up details of a SIG reference (e.g. POTA park) such as name, lat/lon, and grid.
# Note there is currently no support for KRMNPA location lookup, see issue #61.
def get_sig_ref_info(sig, sig_ref_id):
sig_ref = SIGRef(id=sig_ref_id, sig=sig)
try:
if sig.upper() == "POTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.pota.app/park/" + sig_ref_id, headers=HTTP_HEADERS).json()
if data:
fullname = data["name"] if "name" in data else None
if fullname and "parktypeDesc" in data and data["parktypeDesc"] != "":
fullname = fullname + " " + data["parktypeDesc"]
sig_ref.name = fullname
sig_ref.url = "https://pota.app/#/park/" + sig_ref_id
sig_ref.grid = data["grid6"] if "grid6" in data else None
sig_ref.latitude = data["latitude"] if "latitude" in data else None
sig_ref.longitude = data["longitude"] if "longitude" in data else None
elif sig.upper() == "SOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api-db2.sota.org.uk/api/summits/" + sig_ref_id,
headers=HTTP_HEADERS).json()
if data:
sig_ref.name = data["name"] if "name" in data else None
sig_ref.url = "https://www.sotadata.org.uk/en/summit/" + sig_ref_id
sig_ref.grid = data["locator"] if "locator" in data else None
sig_ref.latitude = data["latitude"] if "latitude" in data else None
sig_ref.longitude = data["longitude"] if "longitude" in data else None
elif sig.upper() == "WWBOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.wwbota.org/bunkers/" + sig_ref_id,
headers=HTTP_HEADERS).json()
if data:
sig_ref.name = data["name"] if "name" in data else None
sig_ref.url = "https://bunkerwiki.org/?s=" + sig_ref_id if sig_ref_id.startswith("B/G") else None
sig_ref.grid = data["locator"] if "locator" in data else None
sig_ref.latitude = data["lat"] if "lat" in data else None
sig_ref.longitude = data["long"] if "long" in data else None
elif sig.upper() == "GMA" or sig.upper() == "ARLHS" or sig.upper() == "ILLW" or sig.upper() == "WCA" or sig.upper() == "MOTA" or sig.upper() == "IOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.cqgma.org/api/ref/?" + sig_ref_id,
headers=HTTP_HEADERS).json()
if data:
sig_ref.name = data["name"] if "name" in data else None
sig_ref.url = "https://www.cqgma.org/zinfo.php?ref=" + sig_ref_id
sig_ref.grid = data["locator"] if "locator" in data else None
sig_ref.latitude = data["latitude"] if "latitude" in data else None
sig_ref.longitude = data["longitude"] if "longitude" in data else None
elif sig.upper() == "WWFF":
sig_ref.url = "https://wwff.co/directory/?showRef=" + sig_ref_id
elif sig.upper() == "SIOTA":
siota_csv_data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.silosontheair.com/data/silos.csv",
headers=HTTP_HEADERS)
siota_dr = csv.DictReader(siota_csv_data.content.decode().splitlines())
for row in siota_dr:
if row["SILO_CODE"] == sig_ref_id:
sig_ref.name = row["NAME"] if "NAME" in row else None
sig_ref.grid = row["LOCATOR"] if "LOCATOR" in row else None
sig_ref.latitude = float(row["LAT"]) if "LAT" in row else None
sig_ref.longitude = float(row["LNG"]) if "LNG" in row else None
elif sig.upper() == "WOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.wota.org.uk/mapping/data/summits.json",
headers=HTTP_HEADERS).json()
if data:
for feature in data["features"]:
if feature["properties"]["wotaId"] == sig_ref_id:
sig_ref.name = feature["properties"]["title"]
sig_ref.url = "https://www.wota.org.uk/MM_" + sig_ref_id
sig_ref.grid = feature["properties"]["qthLocator"]
sig_ref.latitude = feature["geometry"]["coordinates"][1]
sig_ref.longitude = feature["geometry"]["coordinates"][0]
elif sig.upper() == "ZLOTA":
data = SEMI_STATIC_URL_DATA_CACHE.get("https://ontheair.nz/assets/assets.json", headers=HTTP_HEADERS).json()
if data:
for asset in data:
if asset["code"] == sig_ref_id:
sig_ref.name = asset["name"]
sig_ref.url = "https://ontheair.nz/assets/ZLI_OT-030" + sig_ref_id.replace("/", "_")
sig_ref.grid = latlong_to_locator(asset["y"], asset["x"], 6)
sig_ref.latitude = asset["y"]
sig_ref.longitude = asset["x"]
elif sig.upper() == "BOTA":
if not sig_ref.name:
sig_ref.name = sig_ref.id
sig_ref.url = "https://www.beachesontheair.com/beaches/" + sig_ref.name.lower().replace(" ", "-")
elif sig.upper() == "WAB" or sig.upper() == "WAI":
ll = wab_wai_square_to_lat_lon(sig_ref_id)
if ll:
sig_ref.name = sig_ref_id
sig_ref.grid = latlong_to_locator(ll[0], ll[1], 6)
sig_ref.latitude = ll[0]
sig_ref.longitude = ll[1]
except:
logging.warn("Failed to look up sig_ref info for " + sig + " ref " + sig_ref_id + ".")
return sig_ref
# Regex matching any SIG # Regex matching any SIG
ANY_SIG_REGEX = r"(" + r"|".join(list(map(lambda p: p.name, SIGS))) + r")" ANY_SIG_REGEX = r"(" + r"|".join(list(map(lambda p: p.name, SIGS))) + r")"
# Regex matching any SIG reference # Regex matching any SIG reference
ANY_XOTA_SIG_REF_REGEX = r"[\w\/]+\-\d+" ANY_XOTA_SIG_REF_REGEX = r"[\w\/]+\-\d+"

View File

@@ -1,7 +1,6 @@
import copy import copy
import hashlib import hashlib
import json import json
import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -9,7 +8,7 @@ import pytz
from core.constants import DXCC_FLAGS from core.constants import DXCC_FLAGS
from core.lookup_helper import lookup_helper from core.lookup_helper import lookup_helper
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig, get_sig_ref_info
# Data class that defines an alert. # Data class that defines an alert.
@@ -99,6 +98,21 @@ class Alert:
if self.dx_dxcc_id and self.dx_dxcc_id in DXCC_FLAGS and not self.dx_flag: if self.dx_dxcc_id and self.dx_dxcc_id in DXCC_FLAGS and not self.dx_flag:
self.dx_flag = DXCC_FLAGS[self.dx_dxcc_id] self.dx_flag = DXCC_FLAGS[self.dx_dxcc_id]
# Fetch SIG data. In case a particular API doesn't provide a full set of name, lat, lon & grid for a reference
# in its initial call, we use this code to populate the rest of the data. This includes working out grid refs
# from WAB and WAI, which count as a SIG even though there's no real lookup, just maths
if self.sig_refs and len(self.sig_refs) > 0:
for sig_ref in self.sig_refs:
lookup_data = get_sig_ref_info(sig_ref.sig, sig_ref.id)
if lookup_data:
# Update the sig_ref data from the lookup
sig_ref.__dict__.update(lookup_data.__dict__)
# If the spot itself doesn't have a SIG yet, but we have at least one SIG reference, take that reference's SIG
# and apply it to the whole spot.
if self.sig_refs and len(self.sig_refs) > 0 and not self.sig:
self.sig = self.sig_refs[0].sig
# Icon from SIG # Icon from SIG
if self.sig and not self.icon: if self.sig and not self.icon:
self.icon = get_icon_for_sig(self.sig) self.icon = get_icon_for_sig(self.sig)

View File

@@ -6,7 +6,15 @@ from dataclasses import dataclass
class SIGRef: class SIGRef:
# Reference ID, e.g. "GB-0001". # Reference ID, e.g. "GB-0001".
id: str id: str
# SIG that this reference is in, e.g. "POTA".
sig: str
# Name of the reference, e.g. "Null Country Park", if known. # Name of the reference, e.g. "Null Country Park", if known.
name: str = None name: str = None
# URL to look up more information about the reference, if known. # URL to look up more information about the reference, if known.
url: str = None url: str = None
# Latitude of the reference, if known.
latitude: float = None
# Longitude of the reference, if known.
longitude: float = None
# Maidenhead grid reference of the reference, if known.
grid: str = None

View File

@@ -10,9 +10,9 @@ import pytz
from pyhamtools.locator import locator_to_latlong, latlong_to_locator from pyhamtools.locator import locator_to_latlong, latlong_to_locator
from core.constants import DXCC_FLAGS from core.constants import DXCC_FLAGS
from core.geo_utils import wab_wai_square_to_lat_lon
from core.lookup_helper import lookup_helper from core.lookup_helper import lookup_helper
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig, get_sig_ref_info, ANY_SIG_REGEX, get_ref_regex_for_sig
from data.sig_ref import SIGRef
# Data class that defines a spot. # Data class that defines a spot.
@@ -35,8 +35,6 @@ class Spot:
dx_continent: str = None dx_continent: str = None
# DXCC ID of the DX operator # DXCC ID of the DX operator
dx_dxcc_id: int = None dx_dxcc_id: int = None
# DXCC ID of the spotter
de_dxcc_id: int = None
# CQ zone of the DX operator # CQ zone of the DX operator
dx_cq_zone: int = None dx_cq_zone: int = None
# ITU zone of the DX operator # ITU zone of the DX operator
@@ -50,11 +48,12 @@ class Spot:
# lookup # lookup
dx_latitude: float = None dx_latitude: float = None
dx_longitude: float = None dx_longitude: float = None
# DX Location source. Indicates how accurate the location might be. Values: "SPOT", "WAB/WAI GRID", "QRZ", "DXCC", "NONE" # DX Location source. Indicates how accurate the location might be. Values: "SPOT", "WAB/WAI GRID", "HOME QTH",
# "DXCC", "NONE"
dx_location_source: str = "NONE" dx_location_source: str = "NONE"
# DX Location good. Indicates that the software thinks the location data is good enough to plot on a map. This is # DX Location good. Indicates that the software thinks the location data is good enough to plot on a map. This is
# true if the location source is "SPOT" or "WAB/WAI GRID", or if the location source is "QRZ" and the DX callsign # true if the location source is "SPOT" or "WAB/WAI GRID", or if the location source is "HOME QTH" and the DX
# doesn't have a suffix like /P. # callsign doesn't have a suffix like /P.
dx_location_good: bool = False dx_location_good: bool = False
# DE (Spotter) info # DE (Spotter) info
@@ -67,6 +66,8 @@ class Spot:
de_flag: str = None de_flag: str = None
# Continent of the spotter # Continent of the spotter
de_continent: str = None de_continent: str = None
# DXCC ID of the spotter
de_dxcc_id: int = None
# If this is an APRS/Packet/etc spot, what SSID was the spotter/receiver using? # If this is an APRS/Packet/etc spot, what SSID was the spotter/receiver using?
de_ssid: str = None de_ssid: str = None
# Maidenhead grid locator for the spotter. This is not going to be from a xOTA reference so it will likely just be # Maidenhead grid locator for the spotter. This is not going to be from a xOTA reference so it will likely just be
@@ -196,12 +197,12 @@ class Spot:
# Spotter country, continent, zones etc. from callsign. # Spotter country, continent, zones etc. from callsign.
# DE call with no digits, or APRS servers starting "T2" are not things we can look up location for # DE call with no digits, or APRS servers starting "T2" are not things we can look up location for
if any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"): if self.de_call and any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"):
if self.de_call and not self.de_country: if not self.de_country:
self.de_country = lookup_helper.infer_country_from_callsign(self.de_call) self.de_country = lookup_helper.infer_country_from_callsign(self.de_call)
if self.de_call and not self.de_continent: if not self.de_continent:
self.de_continent = lookup_helper.infer_continent_from_callsign(self.de_call) self.de_continent = lookup_helper.infer_continent_from_callsign(self.de_call)
if self.de_call and not self.de_dxcc_id: if not self.de_dxcc_id:
self.de_dxcc_id = lookup_helper.infer_dxcc_id_from_callsign(self.de_call) self.de_dxcc_id = lookup_helper.infer_dxcc_id_from_callsign(self.de_call)
if self.de_dxcc_id and self.de_dxcc_id in DXCC_FLAGS and not self.de_flag: if self.de_dxcc_id and self.de_dxcc_id in DXCC_FLAGS and not self.de_flag:
self.de_flag = DXCC_FLAGS[self.de_dxcc_id] self.de_flag = DXCC_FLAGS[self.de_dxcc_id]
@@ -232,11 +233,68 @@ class Spot:
if self.mode and not self.mode_type: if self.mode and not self.mode_type:
self.mode_type = lookup_helper.infer_mode_type_from_mode(self.mode) self.mode_type = lookup_helper.infer_mode_type_from_mode(self.mode)
# Icon from SIG # If we have a latitude at this point, it can only have been provided by the spot itself
if self.sig and not self.icon: if self.dx_latitude:
self.dx_location_source = "SPOT"
# See if we already have a SIG reference, but the comment looks like it contains more for the same SIG. This
# should catch e.g. POTA comments like "2-fer: GB-0001 GB-0002".
if self.comment and self.sig_refs and len(self.sig_refs) > 0:
sig = self.sig_refs[0].sig.upper()
all_comment_refs = re.findall(get_ref_regex_for_sig(sig), self.comment)
for ref in all_comment_refs:
self.append_sig_ref_if_missing(SIGRef(id=ref.upper(), sig=sig))
# See if the comment looks like it contains any SIGs (and optionally SIG references) that we can
# add to the spot. This should catch cluster spot comments like "POTA GB-0001 WWFF GFF-0001" and e.g. POTA
# comments like "also WWFF GFF-0001".
if self.comment:
sig_matches = re.finditer(r"(^|\W)" + ANY_SIG_REGEX + r"($|\W)", self.comment, re.IGNORECASE)
for sig_match in sig_matches:
# First of all, if we haven't got a SIG for this spot set yet, now we have. This covers things like cluster
# spots where the comment is just "POTA".
found_sig = sig_match.group(2).upper()
if not self.sig:
self.sig = found_sig
# Now look to see if that SIG name was followed by something that looks like a reference ID for that SIG.
# If so, add that to the sig_refs list for this spot.
ref_regex = get_ref_regex_for_sig(found_sig)
if ref_regex:
ref_matches = re.finditer(r"(^|\W)" + found_sig + r"($|\W)(" + ref_regex + r")($|\W)", self.comment, re.IGNORECASE)
for ref_match in ref_matches:
self.append_sig_ref_if_missing(SIGRef(id=ref_match.group(3).upper(), sig=found_sig))
# Fetch SIG data. In case a particular API doesn't provide a full set of name, lat, lon & grid for a reference
# in its initial call, we use this code to populate the rest of the data. This includes working out grid refs
# from WAB and WAI, which count as a SIG even though there's no real lookup, just maths
if self.sig_refs and len(self.sig_refs) > 0:
for sig_ref in self.sig_refs:
lookup_data = get_sig_ref_info(sig_ref.sig, sig_ref.id)
if lookup_data:
# Update the sig_ref data from the lookup
sig_ref.__dict__.update(lookup_data.__dict__)
# If the spot itself doesn't have location yet, but the SIG ref does, extract it
if lookup_data.grid and not self.dx_grid:
self.dx_grid = lookup_data.grid
if lookup_data.latitude and not self.dx_latitude:
self.dx_latitude = lookup_data.latitude
self.dx_longitude = lookup_data.longitude
if self.sig == "WAB" or self.sig == "WAI":
self.dx_location_source = "WAB/WAI GRID"
else:
self.dx_location_source = "SIG REF LOOKUP"
# If the spot itself doesn't have a SIG yet, but we have at least one SIG reference, take that reference's SIG
# and apply it to the whole spot.
if self.sig_refs and len(self.sig_refs) > 0 and not self.sig:
self.sig = self.sig_refs[0].sig
# Icon from SIG if we have one
if self.sig:
self.icon = get_icon_for_sig(self.sig) self.icon = get_icon_for_sig(self.sig)
# DX Grid to lat/lon and vice versa # DX Grid to lat/lon and vice versa in case one is missing
if self.dx_grid and not self.dx_latitude: if self.dx_grid and not self.dx_latitude:
ll = locator_to_latlong(self.dx_grid) ll = locator_to_latlong(self.dx_grid)
self.dx_latitude = ll[0] self.dx_latitude = ll[0]
@@ -246,21 +304,6 @@ class Spot:
self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8) self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8)
except: except:
logging.debug("Invalid lat/lon received for spot") logging.debug("Invalid lat/lon received for spot")
if self.dx_latitude:
self.dx_location_source = "SPOT"
# WAB/WAI grid to lat/lon
if not self.dx_latitude and self.sig and self.sig_refs and len(self.sig_refs) > 0 and (
self.sig == "WAB" or self.sig == "WAI"):
ll = wab_wai_square_to_lat_lon(self.sig_refs[0])
if ll:
self.dx_latitude = ll[0]
self.dx_longitude = ll[1]
try:
self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8)
except:
logging.debug("Invalid lat/lon received from WAB/WAI grid")
self.dx_location_source = "WAB/WAI GRID"
# QRT comment detection # QRT comment detection
if self.comment and not self.qrt: if self.comment and not self.qrt:
@@ -277,7 +320,7 @@ class Spot:
self.dx_latitude = latlon[0] self.dx_latitude = latlon[0]
self.dx_longitude = latlon[1] self.dx_longitude = latlon[1]
self.dx_grid = lookup_helper.infer_grid_from_callsign_qrz(self.dx_call) self.dx_grid = lookup_helper.infer_grid_from_callsign_qrz(self.dx_call)
self.dx_location_source = "QRZ" self.dx_location_source = "HOME QTH"
# Last resort for getting a DX position, use the DXCC entity. # Last resort for getting a DX position, use the DXCC entity.
if self.dx_call and not self.dx_latitude: if self.dx_call and not self.dx_latitude:
@@ -290,13 +333,14 @@ class Spot:
# DX Location is "good" if it is from a spot, or from QRZ if the callsign doesn't contain a slash, so the operator # DX Location is "good" if it is from a spot, or from QRZ if the callsign doesn't contain a slash, so the operator
# is likely at home. # is likely at home.
self.dx_location_good = self.dx_location_source == "SPOT" or self.dx_location_source == "WAB/WAI GRID" or ( self.dx_location_good = (self.dx_location_source == "SPOT" or self.dx_location_source == "SIG REF LOOKUP"
self.dx_location_source == "QRZ" and not "/" in self.dx_call) or self.dx_location_source == "WAB/WAI GRID"
or (self.dx_location_source == "HOME QTH" and not "/" in self.dx_call))
# DE with no digits and APRS servers starting "T2" are not things we can look up location for # DE with no digits and APRS servers starting "T2" are not things we can look up location for
if any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"): if self.de_call and any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"):
# DE operator position lookup, using QRZ.com. # DE operator position lookup, using QRZ.com.
if self.de_call and not self.de_latitude: if not self.de_latitude:
latlon = lookup_helper.infer_latlon_from_callsign_qrz(self.de_call) latlon = lookup_helper.infer_latlon_from_callsign_qrz(self.de_call)
if latlon: if latlon:
self.de_latitude = latlon[0] self.de_latitude = latlon[0]
@@ -304,7 +348,7 @@ class Spot:
self.de_grid = lookup_helper.infer_grid_from_callsign_qrz(self.de_call) self.de_grid = lookup_helper.infer_grid_from_callsign_qrz(self.de_call)
# Last resort for getting a DE position, use the DXCC entity. # Last resort for getting a DE position, use the DXCC entity.
if self.de_call and not self.de_latitude: if not self.de_latitude:
latlon = lookup_helper.infer_latlon_from_callsign_dxcc(self.de_call) latlon = lookup_helper.infer_latlon_from_callsign_dxcc(self.de_call)
if latlon: if latlon:
self.de_latitude = latlon[0] self.de_latitude = latlon[0]
@@ -324,3 +368,12 @@ class Spot:
# JSON serialise # JSON serialise
def to_json(self): def to_json(self):
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True) return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)
# Append a sig_ref to the list, so long as it's not already there.
def append_sig_ref_if_missing(self, new_sig_ref):
if not self.sig_refs:
self.sig_refs = []
for sig_ref in self.sig_refs:
if sig_ref.id.upper() == new_sig_ref.id.upper() and sig_ref.sig.upper() == new_sig_ref.sig.upper():
return
self.sig_refs.append(new_sig_ref)

View File

@@ -12,7 +12,7 @@ from core.config import MAX_SPOT_AGE, ALLOW_SPOTTING
from core.constants import BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS, SOFTWARE_VERSION, UNKNOWN_BAND from core.constants import BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS, SOFTWARE_VERSION, UNKNOWN_BAND
from core.lookup_helper import lookup_helper from core.lookup_helper import lookup_helper
from core.prometheus_metrics_handler import page_requests_counter, get_metrics, api_requests_counter from core.prometheus_metrics_handler import page_requests_counter, get_metrics, api_requests_counter
from core.sig_utils import get_ref_regex_for_sig from core.sig_utils import get_ref_regex_for_sig, get_sig_ref_info
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -43,6 +43,8 @@ class WebServer:
bottle.get("/api/v1/alerts")(lambda: self.serve_alerts_api()) bottle.get("/api/v1/alerts")(lambda: self.serve_alerts_api())
bottle.get("/api/v1/options")(lambda: self.serve_api(self.get_options())) bottle.get("/api/v1/options")(lambda: self.serve_api(self.get_options()))
bottle.get("/api/v1/status")(lambda: self.serve_api(self.status_data)) bottle.get("/api/v1/status")(lambda: self.serve_api(self.status_data))
bottle.get("/api/v1/lookup/call")(lambda: self.serve_call_lookup_api())
bottle.get("/api/v1/lookup/sigref")(lambda: self.serve_sig_ref_lookup_api())
bottle.post("/api/v1/spot")(lambda: self.accept_spot()) bottle.post("/api/v1/spot")(lambda: self.accept_spot())
# Routes for templated pages # Routes for templated pages
bottle.get("/")(lambda: self.serve_template('webpage_spots')) bottle.get("/")(lambda: self.serve_template('webpage_spots'))
@@ -100,6 +102,82 @@ class WebServer:
response.status = 500 response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything) return json.dumps("Error - " + str(e), default=serialize_everything)
# Look up data for a callsign
def serve_call_lookup_api(self):
try:
# Reject if no callsign
query = bottle.request.query
if not "call" in query.keys():
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - call must be provided", default=serialize_everything)
call = query.get("call").upper()
# Reject badly formatted callsigns
if not re.match(r"^[A-Za-z0-9/\-]*$", call):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - '" + call + "' does not look like a valid callsign.",
default=serialize_everything)
# Take the callsign, make a "fake spot" so we can run infer_missing() on it, then repack the resulting data
# in the correct way for the API response.
fake_spot = Spot(dx_call=call)
fake_spot.infer_missing()
return self.serve_api({
"call": call,
"name": fake_spot.dx_name,
"country": fake_spot.dx_country,
"flag": fake_spot.dx_flag,
"continent": fake_spot.dx_continent,
"dxcc_id": fake_spot.dx_dxcc_id,
"cq_zone": fake_spot.dx_cq_zone,
"itu_zone": fake_spot.dx_itu_zone,
"grid": fake_spot.dx_grid,
"latitude": fake_spot.dx_latitude,
"longitude": fake_spot.dx_longitude,
"location_source": fake_spot.dx_location_source
})
except Exception as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything)
# Look up data for a SIG reference
def serve_sig_ref_lookup_api(self):
try:
# Reject if no sig or sig_ref
query = bottle.request.query
if not "sig" in query.keys() or not "id" in query.keys():
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - sig and id must be provided", default=serialize_everything)
sig = query.get("sig").upper()
id = query.get("id").upper()
# Reject if sig unknown
if not sig in list(map(lambda p: p.name, SIGS)):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - sig '" + sig + "' is not known.", default=serialize_everything)
# Reject if sig_ref format incorrect for sig
if get_ref_regex_for_sig(sig) and not re.match(get_ref_regex_for_sig(sig), id):
response.content_type = 'application/json'
response.status = 422
return json.dumps("Error - '" + id + "' does not look like a valid reference ID for " + sig + ".", default=serialize_everything)
data = get_sig_ref_info(sig, id)
return self.serve_api(data)
except Exception as e:
logging.error(e)
response.content_type = 'application/json'
response.status = 500
return json.dumps("Error - " + str(e), default=serialize_everything)
# Serve a JSON API endpoint # Serve a JSON API endpoint
def serve_api(self, data): def serve_api(self, data):
self.last_api_access_time = datetime.now(pytz.UTC) self.last_api_access_time = datetime.now(pytz.UTC)
@@ -182,7 +260,6 @@ class WebServer:
return json.dumps("Error - '" + spot.dx_grid + "' does not look like a valid Maidenhead grid.", default=serialize_everything) return json.dumps("Error - '" + spot.dx_grid + "' does not look like a valid Maidenhead grid.", default=serialize_everything)
# Reject if sig_ref format incorrect for sig # Reject if sig_ref format incorrect for sig
print(spot.sig_refs[0])
if spot.sig and spot.sig_refs and len(spot.sig_refs) > 0 and spot.sig_refs[0].id and get_ref_regex_for_sig(spot.sig) and not re.match(get_ref_regex_for_sig(spot.sig), spot.sig_refs[0].id): if spot.sig and spot.sig_refs and len(spot.sig_refs) > 0 and spot.sig_refs[0].id and get_ref_regex_for_sig(spot.sig) and not re.match(get_ref_regex_for_sig(spot.sig), spot.sig_refs[0].id):
response.content_type = 'application/json' response.content_type = 'application/json'
response.status = 422 response.status = 422

View File

@@ -8,8 +8,6 @@ import pytz
import telnetlib3 import telnetlib3
from core.config import SERVER_OWNER_CALLSIGN from core.config import SERVER_OWNER_CALLSIGN
from core.sig_utils import ANY_SIG_REGEX, get_icon_for_sig, get_ref_regex_for_sig
from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
from spotproviders.spot_provider import SpotProvider from spotproviders.spot_provider import SpotProvider
@@ -77,21 +75,6 @@ class DXCluster(SpotProvider):
icon="desktop", icon="desktop",
time=spot_datetime.timestamp()) time=spot_datetime.timestamp())
# See if the comment looks like it contains a SIG (and optionally SIG reference). Currently,
# only one sig ref is supported. Note that this code is specifically in the DX Cluster class and
# not in the general "spot" infer_missing() method. Because we only support one SIG per spot
# at the moment (see issue #54), we don't want to risk e.g. a POTA spot with comment "WWFF GFF-0001"
# being converted into a WWFF spot.
sig_match = re.search(r"(^|\W)" + ANY_SIG_REGEX + r"($|\W)", spot.comment, re.IGNORECASE)
if sig_match:
spot.sig = sig_match.group(2).upper()
spot.icon = get_icon_for_sig(spot.sig)
ref_regex = get_ref_regex_for_sig(spot.sig)
if ref_regex:
sig_ref_match = re.search(r"(^|\W)" + spot.sig + r"($|\W)(" + ref_regex + r")($|\W)", spot.comment, re.IGNORECASE)
if sig_ref_match:
spot.sig_refs = [SIGRef(id=sig_ref_match.group(3).upper())]
# Add to our list # Add to our list
self.submit(spot) self.submit(spot)

View File

@@ -1,9 +1,9 @@
import logging import logging
from datetime import datetime, timedelta from datetime import datetime
import pytz import pytz
from requests_cache import CachedSession
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
from core.constants import HTTP_HEADERS from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
@@ -17,8 +17,6 @@ class GMA(HTTPSpotProvider):
SPOTS_URL = "https://www.cqgma.org/api/spots/25/" SPOTS_URL = "https://www.cqgma.org/api/spots/25/"
# GMA spots don't contain the details of the programme they are for, we need a separate lookup for that # GMA spots don't contain the details of the programme they are for, we need a separate lookup for that
REF_INFO_URL_ROOT = "https://www.cqgma.org/api/ref/?" REF_INFO_URL_ROOT = "https://www.cqgma.org/api/ref/?"
REF_INFO_CACHE_TIME_DAYS = 30
REF_INFO_CACHE = CachedSession("cache/gma_ref_info_cache", expire_after=timedelta(days=REF_INFO_CACHE_TIME_DAYS))
def __init__(self, provider_config): def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC) super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
@@ -36,7 +34,7 @@ class GMA(HTTPSpotProvider):
mode=source_spot["MODE"].upper() if "<>" not in source_spot["MODE"] else None, mode=source_spot["MODE"].upper() if "<>" not in source_spot["MODE"] else None,
# Filter out some weird mode strings # Filter out some weird mode strings
comment=source_spot["TEXT"], comment=source_spot["TEXT"],
sig_refs=[SIGRef(id=source_spot["REF"], name=source_spot["NAME"], url="https://www.cqgma.org/zinfo.php?ref=" + source_spot["REF"])], sig_refs=[SIGRef(id=source_spot["REF"], sig="", name=source_spot["NAME"])],
time=datetime.strptime(source_spot["DATE"] + source_spot["TIME"], "%Y%m%d%H%M").replace( time=datetime.strptime(source_spot["DATE"] + source_spot["TIME"], "%Y%m%d%H%M").replace(
tzinfo=pytz.UTC).timestamp(), tzinfo=pytz.UTC).timestamp(),
dx_latitude=float(source_spot["LAT"]) if (source_spot["LAT"] and source_spot["LAT"] != "") else None, dx_latitude=float(source_spot["LAT"]) if (source_spot["LAT"] and source_spot["LAT"] != "") else None,
@@ -44,7 +42,7 @@ class GMA(HTTPSpotProvider):
dx_longitude=float(source_spot["LON"]) if (source_spot["LON"] and source_spot["LON"] != "") else None) dx_longitude=float(source_spot["LON"]) if (source_spot["LON"] and source_spot["LON"] != "") else None)
# GMA doesn't give what programme (SIG) the reference is for until we separately look it up. # GMA doesn't give what programme (SIG) the reference is for until we separately look it up.
ref_response = self.REF_INFO_CACHE.get(self.REF_INFO_URL_ROOT + source_spot["REF"], ref_response = SEMI_STATIC_URL_DATA_CACHE.get(self.REF_INFO_URL_ROOT + source_spot["REF"],
headers=HTTP_HEADERS) headers=HTTP_HEADERS)
# Sometimes this is blank, so handle that # Sometimes this is blank, so handle that
if ref_response.text is not None and ref_response.text != "": if ref_response.text is not None and ref_response.text != "":
@@ -56,22 +54,21 @@ class GMA(HTTPSpotProvider):
if ref_info["reftype"] not in ["POTA", "WWFF"] and (ref_info["reftype"] != "Summit" or ref_info["sota"] == ""): if ref_info["reftype"] not in ["POTA", "WWFF"] and (ref_info["reftype"] != "Summit" or ref_info["sota"] == ""):
match ref_info["reftype"]: match ref_info["reftype"]:
case "Summit": case "Summit":
spot.sig = "GMA" spot.sig_refs[0].sig = "GMA"
case "IOTA Island": case "IOTA Island":
spot.sig = "IOTA" spot.sig_refs[0].sig = "IOTA"
case "Lighthouse (ILLW)": case "Lighthouse (ILLW)":
spot.sig = "ILLW" spot.sig_refs[0].sig = "ILLW"
case "Lighthouse (ARLHS)": case "Lighthouse (ARLHS)":
spot.sig = "ARLHS" spot.sig_refs[0].sig = "ARLHS"
case "Castle": case "Castle":
spot.sig = "WCA" spot.sig_refs[0].sig = "WCA"
case "Mill": case "Mill":
spot.sig = "MOTA" spot.sig_refs[0].sig = "MOTA"
case _: case _:
logging.warn("GMA spot found with ref type " + ref_info[ logging.warn("GMA spot found with ref type " + ref_info[
"reftype"] + ", developer needs to add support for this!") "reftype"] + ", developer needs to add support for this!")
spot.sig = ref_info["reftype"] spot.sig_refs[0].sig = ref_info["reftype"]
spot.icon = get_icon_for_sig(spot.sig)
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us. # that for us.

View File

@@ -53,9 +53,7 @@ class HEMA(HTTPSpotProvider):
freq=float(freq_mode_match.group(1)) * 1000000, freq=float(freq_mode_match.group(1)) * 1000000,
mode=freq_mode_match.group(2).upper(), mode=freq_mode_match.group(2).upper(),
comment=spotter_comment_match.group(2), comment=spotter_comment_match.group(2),
sig="HEMA", sig_refs=[SIGRef(id=spot_items[3].upper(), sig="HEMA", name=spot_items[4])],
sig_refs=[SIGRef(id=spot_items[3].upper(), name=spot_items[4])],
icon=get_icon_for_sig("HEMA"),
time=datetime.strptime(spot_items[0], "%d/%m/%Y %H:%M").replace(tzinfo=pytz.UTC).timestamp(), time=datetime.strptime(spot_items[0], "%d/%m/%Y %H:%M").replace(tzinfo=pytz.UTC).timestamp(),
dx_latitude=float(spot_items[7]), dx_latitude=float(spot_items[7]),
dx_longitude=float(spot_items[8])) dx_longitude=float(spot_items[8]))

View File

@@ -1,12 +1,9 @@
import csv
import logging import logging
import re import re
from datetime import datetime, timedelta from datetime import datetime
import pytz import pytz
from requests_cache import CachedSession
from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -18,8 +15,6 @@ class ParksNPeaks(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120 POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://www.parksnpeaks.org/api/ALL" SPOTS_URL = "https://www.parksnpeaks.org/api/ALL"
SIOTA_LIST_URL = "https://www.silosontheair.com/data/silos.csv" SIOTA_LIST_URL = "https://www.silosontheair.com/data/silos.csv"
SIOTA_LIST_CACHE_TIME_DAYS = 30
SIOTA_LIST_CACHE = CachedSession("cache/siota_data_cache", expire_after=timedelta(days=SIOTA_LIST_CACHE_TIME_DAYS))
def __init__(self, provider_config): def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC) super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
@@ -38,9 +33,7 @@ class ParksNPeaks(HTTPSpotProvider):
# Seen PNP spots with empty frequency, and with comma-separated thousands digits # Seen PNP spots with empty frequency, and with comma-separated thousands digits
mode=source_spot["actMode"].upper(), mode=source_spot["actMode"].upper(),
comment=source_spot["actComments"], comment=source_spot["actComments"],
sig=source_spot["actClass"], sig_refs=[SIGRef(id=source_spot["actSiteID"], sig=source_spot["actClass"].upper())],
sig_refs=[SIGRef(id=source_spot["actSiteID"])],
icon=get_icon_for_sig(source_spot["actClass"]),
time=datetime.strptime(source_spot["actTime"], "%Y-%m-%d %H:%M:%S").replace( time=datetime.strptime(source_spot["actTime"], "%Y-%m-%d %H:%M:%S").replace(
tzinfo=pytz.UTC).timestamp()) tzinfo=pytz.UTC).timestamp())
@@ -54,24 +47,11 @@ class ParksNPeaks(HTTPSpotProvider):
spot.de_call = m.group(1) spot.de_call = m.group(1)
# Log a warning for the developer if PnP gives us an unknown programme we've never seen before # Log a warning for the developer if PnP gives us an unknown programme we've never seen before
if spot.sig not in ["POTA", "SOTA", "WWFF", "SiOTA", "ZLOTA", "KRMNPA"]: if spot.sig_refs[0].sig not in ["POTA", "SOTA", "WWFF", "SIOTA", "ZLOTA", "KRMNPA"]:
logging.warn("PNP spot found with sig " + spot.sig + ", developer needs to add support for this!") logging.warn("PNP spot found with sig " + spot.sig + ", developer needs to add support for this!")
# SiOTA lat/lon/grid lookup
if spot.sig == "SiOTA":
siota_csv_data = self.SIOTA_LIST_CACHE.get(self.SIOTA_LIST_URL, headers=HTTP_HEADERS)
siota_dr = csv.DictReader(siota_csv_data.content.decode().splitlines())
for row in siota_dr:
if row["SILO_CODE"] == spot.sig_refs[0]:
spot.dx_latitude = float(row["LAT"])
spot.dx_longitude = float(row["LNG"])
spot.dx_grid = row["LOCATOR"]
break
# Note there is currently no support for KRMNPA location lookup, see issue #61.
# If this is POTA, SOTA, WWFF or ZLOTA data we already have it through other means, so ignore. Otherwise, # If this is POTA, SOTA, WWFF or ZLOTA data we already have it through other means, so ignore. Otherwise,
# add to the spot list. # add to the spot list.
if spot.sig not in ["POTA", "SOTA", "WWFF", "ZLOTA"]: if spot.sig_refs[0].sig not in ["POTA", "SOTA", "WWFF", "ZLOTA"]:
new_spots.append(spot) new_spots.append(spot)
return new_spots return new_spots

View File

@@ -1,10 +1,8 @@
import re import re
from datetime import datetime, timedelta from datetime import datetime
import pytz import pytz
from requests_cache import CachedSession
from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig, get_ref_regex_for_sig from core.sig_utils import get_icon_for_sig, get_ref_regex_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -17,9 +15,6 @@ class POTA(HTTPSpotProvider):
SPOTS_URL = "https://api.pota.app/spot/activator" SPOTS_URL = "https://api.pota.app/spot/activator"
# Might need to look up extra park data # Might need to look up extra park data
PARK_URL_ROOT = "https://api.pota.app/park/" PARK_URL_ROOT = "https://api.pota.app/park/"
PARK_DATA_CACHE_TIME_DAYS = 30
PARK_DATA_CACHE = CachedSession("cache/pota_park_data_cache",
expire_after=timedelta(days=PARK_DATA_CACHE_TIME_DAYS))
def __init__(self, provider_config): def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC) super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
@@ -36,30 +31,13 @@ class POTA(HTTPSpotProvider):
freq=float(source_spot["frequency"]) * 1000, freq=float(source_spot["frequency"]) * 1000,
mode=source_spot["mode"].upper(), mode=source_spot["mode"].upper(),
comment=source_spot["comments"], comment=source_spot["comments"],
sig="POTA", sig_refs=[SIGRef(id=source_spot["reference"], sig="POTA", name=source_spot["name"])],
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["name"], url="https://pota.app/#/park/" + source_spot["reference"])],
icon=get_icon_for_sig("POTA"),
time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace( time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(
tzinfo=pytz.UTC).timestamp(), tzinfo=pytz.UTC).timestamp(),
dx_grid=source_spot["grid6"], dx_grid=source_spot["grid6"],
dx_latitude=source_spot["latitude"], dx_latitude=source_spot["latitude"],
dx_longitude=source_spot["longitude"]) dx_longitude=source_spot["longitude"])
# Sometimes we can get other refs in the comments for n-fer activations, extract them
all_comment_refs = re.findall(get_ref_regex_for_sig("POTA"), spot.comment)
for r in all_comment_refs:
if r not in list(map(lambda ref: ref.id, spot.sig_refs)):
ref = SIGRef(id=r.upper(), url="https://pota.app/#/park/" + r.upper())
# Now we need to look up the name of that reference from the API, because the comment won't have it
park_response = self.PARK_DATA_CACHE.get(self.PARK_URL_ROOT + r.upper(), headers=HTTP_HEADERS)
park_data = park_response.json()
if park_data and "name" in park_data:
ref.name = park_data["name"]
# Finally append our new reference to the spot's reference list
spot.sig_refs.append(ref)
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us. # that for us.
new_spots.append(spot) new_spots.append(spot)

View File

@@ -1,8 +1,6 @@
import logging from datetime import datetime
from datetime import datetime, timedelta
import requests import requests
from requests_cache import CachedSession
from core.constants import HTTP_HEADERS from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
@@ -21,8 +19,6 @@ class SOTA(HTTPSpotProvider):
SPOTS_URL = "https://api-db2.sota.org.uk/api/spots/60/all/all" SPOTS_URL = "https://api-db2.sota.org.uk/api/spots/60/all/all"
# SOTA spots don't contain lat/lon, we need a separate lookup for that # SOTA spots don't contain lat/lon, we need a separate lookup for that
SUMMIT_URL_ROOT = "https://api-db2.sota.org.uk/api/summits/" SUMMIT_URL_ROOT = "https://api-db2.sota.org.uk/api/summits/"
SUMMIT_DATA_CACHE_TIME_DAYS = 30
SUMMIT_DATA_CACHE = CachedSession("cache/sota_summit_data_cache", expire_after=timedelta(days=SUMMIT_DATA_CACHE_TIME_DAYS))
def __init__(self, provider_config): def __init__(self, provider_config):
super().__init__(provider_config, self.EPOCH_URL, self.POLL_INTERVAL_SEC) super().__init__(provider_config, self.EPOCH_URL, self.POLL_INTERVAL_SEC)
@@ -49,22 +45,10 @@ class SOTA(HTTPSpotProvider):
freq=(float(source_spot["frequency"]) * 1000000) if (source_spot["frequency"] is not None) else None, # Seen SOTA spots with no frequency! freq=(float(source_spot["frequency"]) * 1000000) if (source_spot["frequency"] is not None) else None, # Seen SOTA spots with no frequency!
mode=source_spot["mode"].upper(), mode=source_spot["mode"].upper(),
comment=source_spot["comments"], comment=source_spot["comments"],
sig="SOTA", sig_refs=[SIGRef(id=source_spot["summitCode"], sig="SOTA", name=source_spot["summitName"])],
sig_refs=[SIGRef(id=source_spot["summitCode"], name=source_spot["summitName"], url="https://www.sotadata.org.uk/en/summit/" + source_spot["summitCode"])],
icon=get_icon_for_sig("SOTA"),
time=datetime.fromisoformat(source_spot["timeStamp"]).timestamp(), time=datetime.fromisoformat(source_spot["timeStamp"]).timestamp(),
activation_score=source_spot["points"]) activation_score=source_spot["points"])
# SOTA doesn't give summit lat/lon/grid in the main call, so we need another separate call for this
try:
summit_response = self.SUMMIT_DATA_CACHE.get(self.SUMMIT_URL_ROOT + source_spot["summitCode"], headers=HTTP_HEADERS)
summit_data = summit_response.json()
spot.dx_grid = summit_data["locator"]
spot.dx_latitude = summit_data["latitude"]
spot.dx_longitude = summit_data["longitude"]
except Exception:
logging.warn("Looking up summit " + source_spot["summitCode"] + " from the SOTA API failed. No summit data was available.")
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us. # that for us.
new_spots.append(spot) new_spots.append(spot)

View File

@@ -1,10 +1,8 @@
from datetime import timedelta, datetime from datetime import datetime
import pytz import pytz
from requests_cache import CachedSession
from rss_parser import RSSParser from rss_parser import RSSParser
from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -16,8 +14,6 @@ class WOTA(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120 POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://www.wota.org.uk/spots_rss.php" SPOTS_URL = "https://www.wota.org.uk/spots_rss.php"
LIST_URL = "https://www.wota.org.uk/mapping/data/summits.json" LIST_URL = "https://www.wota.org.uk/mapping/data/summits.json"
LIST_CACHE_TIME_DAYS = 30
LIST_CACHE = CachedSession("cache/wota_data_cache", expire_after=timedelta(days=LIST_CACHE_TIME_DAYS))
RSS_DATE_TIME_FORMAT = "%a, %d %b %Y %H:%M:%S %z" RSS_DATE_TIME_FORMAT = "%a, %d %b %Y %H:%M:%S %z"
def __init__(self, provider_config): def __init__(self, provider_config):
@@ -68,21 +64,8 @@ class WOTA(HTTPSpotProvider):
freq=freq_hz, freq=freq_hz,
mode=mode, mode=mode,
comment=comment, comment=comment,
sig="WOTA", sig_refs=[SIGRef(id=ref, sig="WOTA", name=ref_name)] if ref else [],
sig_refs=[SIGRef(id=ref, name=ref_name, url="https://www.wota.org.uk/MM_" + ref)] if ref else [],
icon=get_icon_for_sig("WOTA"),
time=time.timestamp()) time=time.timestamp())
# WOTA name/grid/lat/lon lookup
if ref:
wota_data = self.LIST_CACHE.get(self.LIST_URL, headers=HTTP_HEADERS).json()
for feature in wota_data["features"]:
if feature["properties"]["wotaId"] == ref:
spot.sig_refs[0].name = feature["properties"]["title"]
spot.dx_latitude = feature["geometry"]["coordinates"][1]
spot.dx_longitude = feature["geometry"]["coordinates"][0]
spot.dx_grid = feature["properties"]["qthLocator"]
break
new_spots.append(spot) new_spots.append(spot)
return new_spots return new_spots

View File

@@ -20,10 +20,7 @@ class WWBOTA(SSESpotProvider):
# n-fer activations. # n-fer activations.
refs = [] refs = []
for ref in source_spot["references"]: for ref in source_spot["references"]:
sigref = SIGRef(id=ref["reference"], name=ref["name"]) sigref = SIGRef(id=ref["reference"], sig="WWBOTA", name=ref["name"])
# Bunkerbase URLs only work for UK bunkers, so only add a URL if we have a B/G prefix.
if ref["reference"].startswith("B/G"):
sigref.url="https://bunkerwiki.org/?s=" + ref["reference"]
refs.append(sigref) refs.append(sigref)
spot = Spot(source=self.name, spot = Spot(source=self.name,
@@ -32,9 +29,7 @@ class WWBOTA(SSESpotProvider):
freq=float(source_spot["freq"]) * 1000000, freq=float(source_spot["freq"]) * 1000000,
mode=source_spot["mode"].upper(), mode=source_spot["mode"].upper(),
comment=source_spot["comment"], comment=source_spot["comment"],
sig="WWBOTA",
sig_refs=refs, sig_refs=refs,
icon=get_icon_for_sig("WWBOTA"),
time=datetime.fromisoformat(source_spot["time"]).timestamp(), time=datetime.fromisoformat(source_spot["time"]).timestamp(),
# WWBOTA spots can contain multiple references for bunkers being activated simultaneously. For # WWBOTA spots can contain multiple references for bunkers being activated simultaneously. For
# now, we will just pick the first one to use as our grid, latitude and longitude. # now, we will just pick the first one to use as our grid, latitude and longitude.

View File

@@ -28,9 +28,7 @@ class WWFF(HTTPSpotProvider):
freq=float(source_spot["frequency_khz"]) * 1000, freq=float(source_spot["frequency_khz"]) * 1000,
mode=source_spot["mode"].upper(), mode=source_spot["mode"].upper(),
comment=source_spot["remarks"], comment=source_spot["remarks"],
sig="WWFF", sig_refs=[SIGRef(id=source_spot["reference"], sig="WWFF", name=source_spot["reference_name"])],
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["reference_name"], url="https://wwff.co/directory/?showRef=" + source_spot["reference"])],
icon=get_icon_for_sig("WWFF"),
time=datetime.fromtimestamp(source_spot["spot_time"], tz=pytz.UTC).timestamp(), time=datetime.fromtimestamp(source_spot["spot_time"], tz=pytz.UTC).timestamp(),
dx_latitude=source_spot["latitude"], dx_latitude=source_spot["latitude"],
dx_longitude=source_spot["longitude"]) dx_longitude=source_spot["longitude"])

View File

@@ -1,12 +1,7 @@
import csv from datetime import datetime
import logging
import re
from datetime import datetime, timedelta
import pytz import pytz
from requests_cache import CachedSession
from core.constants import HTTP_HEADERS
from core.sig_utils import get_icon_for_sig from core.sig_utils import get_icon_for_sig
from data.sig_ref import SIGRef from data.sig_ref import SIGRef
from data.spot import Spot from data.spot import Spot
@@ -18,8 +13,6 @@ class ZLOTA(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120 POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://ontheair.nz/api/spots?zlota_only=true" SPOTS_URL = "https://ontheair.nz/api/spots?zlota_only=true"
LIST_URL = "https://ontheair.nz/assets/assets.json" LIST_URL = "https://ontheair.nz/assets/assets.json"
LIST_CACHE_TIME_DAYS = 30
LIST_CACHE = CachedSession("cache/zlota_data_cache", expire_after=timedelta(days=LIST_CACHE_TIME_DAYS))
def __init__(self, provider_config): def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC) super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
@@ -41,18 +34,8 @@ class ZLOTA(HTTPSpotProvider):
freq=freq_hz, freq=freq_hz,
mode=source_spot["mode"].upper().strip(), mode=source_spot["mode"].upper().strip(),
comment=source_spot["comments"], comment=source_spot["comments"],
sig="ZLOTA", sig_refs=[SIGRef(id=source_spot["reference"], sig="ZLOTA", name=source_spot["name"])],
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["name"])],
icon=get_icon_for_sig("ZLOTA"),
time=datetime.fromisoformat(source_spot["referenced_time"]).astimezone(pytz.UTC).timestamp()) time=datetime.fromisoformat(source_spot["referenced_time"]).astimezone(pytz.UTC).timestamp())
# ZLOTA lat/lon lookup
zlota_data = self.LIST_CACHE.get(self.LIST_URL, headers=HTTP_HEADERS).json()
for asset in zlota_data:
if asset["code"] == spot.sig_refs[0]:
spot.dx_latitude = asset["y"]
spot.dx_longitude = asset["x"]
break
new_spots.append(spot) new_spots.append(spot)
return new_spots return new_spots

View File

@@ -17,7 +17,7 @@ paths:
/spots: /spots:
get: get:
tags: tags:
- spots - Spots
summary: Get spots summary: Get spots
description: The main API call that retrieves spots from the system. Supply this with no query parameters to retrieve all spots known to the system. Supply query parameters to filter what is retrieved. description: The main API call that retrieves spots from the system. Supply this with no query parameters to retrieve all spots known to the system. Supply query parameters to filter what is retrieved.
operationId: spots operationId: spots
@@ -247,7 +247,7 @@ paths:
/alerts: /alerts:
get: get:
tags: tags:
- alerts - Alerts
summary: Get alerts summary: Get alerts
description: Retrieves alerts (indications of upcoming activations) from the system. Supply this with no query parameters to retrieve all alerts known to the system. Supply query parameters to filter what is retrieved. description: Retrieves alerts (indications of upcoming activations) from the system. Supply this with no query parameters to retrieve all alerts known to the system. Supply query parameters to filter what is retrieved.
operationId: spots operationId: spots
@@ -355,7 +355,7 @@ paths:
/status: /status:
get: get:
tags: tags:
- general - General
summary: Get server status summary: Get server status
description: Query information about the server for use in a diagnostics display. description: Query information about the server for use in a diagnostics display.
operationId: status operationId: status
@@ -432,7 +432,7 @@ paths:
/options: /options:
get: get:
tags: tags:
- general - General
summary: Get enumeration options summary: Get enumeration options
description: Retrieves the list of options for various enumerated types, which can be found in the spots and also provided back to the API as query parameters. While these enumerated options are defined in this spec anyway, providing them in an API call allows us to define extra parameters, like the colours associated with bands, and also allows clients to set up their filters and features without having to have internal knowledge about, for example, what bands the server knows about. description: Retrieves the list of options for various enumerated types, which can be found in the spots and also provided back to the API as query parameters. While these enumerated options are defined in this spec anyway, providing them in an API call allows us to define extra parameters, like the colours associated with bands, and also allows clients to set up their filters and features without having to have internal knowledge about, for example, what bands the server knows about.
operationId: options operationId: options
@@ -488,10 +488,155 @@ paths:
example: true example: true
/lookup/call:
get:
tags:
- Utilities
summary: Look up callsign details
description: Perform a lookup of data about a certain callsign, using any of the lookup services available to the Spothole server.
operationId: call
parameters:
- name: call
in: query
description: A callsign
required: true
type: string
example: M0TRT
responses:
'200':
description: Success
content:
application/json:
schema:
type: object
properties:
call:
type: string
description: Callsign, as provided to the API
example: M0TRT
name:
type: string
description: Name of the operator
example: Ian
country:
type: string
description: Country of the operator
example: United Kingdom
flag:
type: string
description: Country flag of the operator
example: ""
continent:
type: string
description: Continent of the operator
enum:
- EU
- NA
- SA
- AS
- AF
- OC
- AN
example: EU
dxcc_id:
type: integer
description: DXCC ID of the operator
example: 235
cq_zone:
type: integer
description: CQ zone of the operator
example: 27
itu_zone:
type: integer
description: ITU zone of the operator
example: 14
grid:
type: string
description: Maidenhead grid locator for the operator's QTH. This could be from an online lookup service, or just based on the DXCC.
example: IO91aa
latitude:
type: number
description: Latitude of the operator's QTH, in degrees. This could be from an online lookup service, or just based on the DXCC.
example: 51.2345
longitude:
type: number
description: Longitude of the opertor's QTH, in degrees. This could be from an online lookup service, or just based on the DXCC.
example: -1.2345
location_source:
type: string
description: Where we got the location (grid/latitude/longitude) from. Unlike a spot where we might have a summit position or WAB square, here the only options are an online QTH lookup, or a location based purely on DXCC, or nothing.
enum:
- "HOME QTH"
- DXCC
- NONE
example: "HOME QTH"
'422':
description: Validation error e.g. callsign missing or format incorrect
content:
application/json:
schema:
type: string
example: "Failed"
/lookup/sigref:
get:
tags:
- Utilities
summary: Look up SIG ref details
description: Perform a lookup of data about a certain reference, providing the SIG and the ID of the reference. A SIGRef structure will be returned containing the SIG and ID, plus any other information Spothole could find about it.
operationId: sigref
parameters:
- name: sig
in: query
description: Special Interest Group (SIG), e.g. outdoor activity programme such as POTA
required: true
type: string
enum:
- POTA
- SOTA
- WWFF
- WWBOTA
- GMA
- HEMA
- WCA
- MOTA
- SIOTA
- ARLHS
- ILLW
- ZLOTA
- IOTA
- WOTA
- BOTA
- WAB
- WAI
example: POTA
- name: id
in: query
description: ID of a reference in that SIG
required: true
type: string
example: GB-0001
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/SIGRef'
'422':
description: Validation error e.g. SIG not supported or reference format incorrect
content:
application/json:
schema:
type: string
example: "Failed"
/spot: /spot:
post: post:
tags: tags:
- spots - Spots
summary: Add a spot summary: Add a spot
description: "Supply a new spot object, which will be added to the system. Currently, this will not be reported up the chain to a cluster, POTA, SOTA etc. This may be introduced in a future version. cURL example: `curl --request POST --header \"Content-Type: application/json\" --data '{\"dx_call\":\"M0TRT\",\"time\":1760019539, \"freq\":14200000, \"comment\":\"Test spot please ignore\", \"de_call\":\"M0TRT\"}' https://spothole.app/api/v1/spot`" description: "Supply a new spot object, which will be added to the system. Currently, this will not be reported up the chain to a cluster, POTA, SOTA etc. This may be introduced in a future version. cURL example: `curl --request POST --header \"Content-Type: application/json\" --data '{\"dx_call\":\"M0TRT\",\"time\":1760019539, \"freq\":14200000, \"comment\":\"Test spot please ignore\", \"de_call\":\"M0TRT\"}' https://spothole.app/api/v1/spot`"
operationId: spot operationId: spot
@@ -542,6 +687,28 @@ components:
type: string type: string
description: SIG reference ID. description: SIG reference ID.
example: GB-0001 example: GB-0001
sig:
type: string
description: SIG that this reference is in.
enum:
- POTA
- SOTA
- WWFF
- WWBOTA
- GMA
- HEMA
- WCA
- MOTA
- SIOTA
- ARLHS
- ILLW
- ZLOTA
- IOTA
- WOTA
- BOTA
- WAB
- WAI
example: POTA
name: name:
type: string type: string
description: SIG reference name description: SIG reference name
@@ -550,6 +717,18 @@ components:
type: string type: string
description: SIG reference URL, which the user can look up for more information description: SIG reference URL, which the user can look up for more information
example: "https://pota.app/#/park/GB-0001" example: "https://pota.app/#/park/GB-0001"
grid:
type: string
description: Maidenhead grid locator for the reference, if known.
example: IO91aa
latitude:
type: number
description: Latitude of the reference, in degrees, if known.
example: 51.2345
longitude:
type: number
description: Longitude of the reference, in degrees, if known.
example: -1.2345
Spot: Spot:
type: object type: object
@@ -616,17 +795,18 @@ components:
example: -1.2345 example: -1.2345
dx_location_source: dx_location_source:
type: string type: string
description: Where we got the DX location (grid/latitude/longitude) from. If this was from the spot itself, it's likely quite accurate, but if we had to fall back to QRZ lookup, or even a location based on the DXCC itself, it will be a lot less accurate. description: Where we got the DX location (grid/latitude/longitude) from. If this was from the spot itself, or from a lookup of the SIG ref (e.g. park) it's likely quite accurate, but if we had to fall back to QRZ lookup, or even a location based on the DXCC itself, it will be a lot less accurate.
enum: enum:
- SPOT - SPOT
- "SIG REF LOOKUP"
- "WAB/WAI GRID" - "WAB/WAI GRID"
- QRZ - "HOME QTH"
- DXCC - DXCC
- NONE - NONE
example: SPOT example: SPOT
dx_location_good: dx_location_good:
type: boolean type: boolean
description: Does the software think the location is good enough to put a marker on a map? This is true if the source is "SPOT" or "WAB/WAI GRID", or alternatively if the source is "QRZ" and the callsign doesn't have a slash in it (i.e. operator likely at home). description: Does the software think the location is good enough to put a marker on a map? This is true if the source is "SPOT", "SIG REF LOOKUP" or "WAB/WAI GRID", or alternatively if the source is "HOME QTH" and the callsign doesn't have a slash in it (i.e. operator likely at home).
example: true example: true
de_call: de_call:
type: string type: string

View File

@@ -249,6 +249,7 @@ $(document).ready(function() {
setUpMap(); setUpMap();
// Call loadOptions(), this will then trigger loading spots and setting up timers. // Call loadOptions(), this will then trigger loading spots and setting up timers.
loadOptions(); loadOptions();
// Prevent scrolling actions in the popup menus being passed through to the map // Prevent mouse scroll and touch actions in the popup menus being passed through to the map
L.DomEvent.disableScrollPropagation(document.getElementById('maptools')); L.DomEvent.disableScrollPropagation(document.getElementById('maptools'));
L.DomEvent.disableClickPropagation(document.getElementById('maptools'));
}); });

View File

@@ -156,7 +156,7 @@ function updateTable() {
var bearing = calcBearing(userPos[0], userPos[1], s["dx_latitude"], s["dx_longitude"]); var bearing = calcBearing(userPos[0], userPos[1], s["dx_latitude"], s["dx_longitude"]);
bearingText = bearing.toFixed(0).padStart(3, '0') + "°"; bearingText = bearing.toFixed(0).padStart(3, '0') + "°";
if (s["dx_location_good"] == null || s["dx_location_good"] == false) { if (s["dx_location_good"] == null || s["dx_location_good"] == false) {
if (s["dx_location_source"] == "QRZ") { if (s["dx_location_source"] == "HOME QTH") {
bearingText = bearingText + "<span class='bearing-q hideonmobile'><i class='fa-solid fa-circle-question' title='The position was not reported via the spotting service. We had to fall back to a QRZ \"home\" location for a portable/mobile/alternative spot, so this bearing may not be accurate if the DX is close to you..'></i></span>"; bearingText = bearingText + "<span class='bearing-q hideonmobile'><i class='fa-solid fa-circle-question' title='The position was not reported via the spotting service. We had to fall back to a QRZ \"home\" location for a portable/mobile/alternative spot, so this bearing may not be accurate if the DX is close to you..'></i></span>";
} else { } else {
bearingText = bearingText + "<span class='bearing-q hideonmobile'><i class='fa-solid fa-circle-question' title='The position was not reported via the spotting service. We had to fall back to just using the centre of a DXCC entity, so this bearing may not be accurate if the DX is close to you.'></i></span>"; bearingText = bearingText + "<span class='bearing-q hideonmobile'><i class='fa-solid fa-circle-question' title='The position was not reported via the spotting service. We had to fall back to just using the centre of a DXCC entity, so this bearing may not be accurate if the DX is close to you.'></i></span>";