mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2025-12-15 16:43:38 +00:00
Compare commits
10 Commits
fa92657d9c
...
e61d7bedb4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e61d7bedb4 | ||
|
|
ebf07f352f | ||
|
|
e83ddead62 | ||
|
|
b8e1506846 | ||
|
|
d80c4cfbeb | ||
|
|
92af0761aa | ||
|
|
286ff66721 | ||
|
|
28010a68ae | ||
|
|
0e8c7873d8 | ||
|
|
649b57a570 |
@@ -38,9 +38,7 @@ class BOTA(HTTPAlertProvider):
|
||||
# Convert to our alert format
|
||||
alert = Alert(source=self.name,
|
||||
dx_calls=[dx_call],
|
||||
sig="BOTA",
|
||||
sig_refs=[SIGRef(id=ref_name, name=ref_name, url="https://www.beachesontheair.com/beaches/" + ref_name.lower().replace(" ", "-"))],
|
||||
icon=get_icon_for_sig("BOTA"),
|
||||
sig_refs=[SIGRef(id=ref_name, sig="BOTA")],
|
||||
start_time=date_time.timestamp(),
|
||||
is_dxpedition=False)
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ class ParksNPeaks(HTTPAlertProvider):
|
||||
# Iterate through source data
|
||||
for source_alert in http_response.json():
|
||||
# Calculate some things
|
||||
sig = source_alert["Class"]
|
||||
if " - " in source_alert["Location"]:
|
||||
split = source_alert["Location"].split(" - ")
|
||||
sig_ref = split[0]
|
||||
@@ -38,19 +39,17 @@ class ParksNPeaks(HTTPAlertProvider):
|
||||
dx_calls=[source_alert["CallSign"].upper()],
|
||||
freqs_modes=source_alert["Freq"] + " " + source_alert["MODE"],
|
||||
comment=source_alert["Comments"],
|
||||
sig=source_alert["Class"],
|
||||
sig_refs=[SIGRef(id=sig_ref, name=sig_ref_name)],
|
||||
icon=get_icon_for_sig(source_alert["Class"]),
|
||||
sig_refs=[SIGRef(id=sig_ref, sig=sig, name=sig_ref_name)],
|
||||
start_time=start_time,
|
||||
is_dxpedition=False)
|
||||
|
||||
# Log a warning for the developer if PnP gives us an unknown programme we've never seen before
|
||||
if alert.sig not in ["POTA", "SOTA", "WWFF", "SiOTA", "ZLOTA", "KRMNPA"]:
|
||||
logging.warn("PNP alert found with sig " + alert.sig + ", developer needs to add support for this!")
|
||||
if sig and sig not in ["POTA", "SOTA", "WWFF", "SiOTA", "ZLOTA", "KRMNPA"]:
|
||||
logging.warn("PNP alert found with sig " + sig + ", developer needs to add support for this!")
|
||||
|
||||
# If this is POTA, SOTA or WWFF data we already have it through other means, so ignore. Otherwise, add to
|
||||
# the alert list. Note that while ZLOTA has its own spots API, it doesn't have its own alerts API. So that
|
||||
# means the PnP *spot* provider rejects ZLOTA spots here, but the PnP *alerts* provider here allows ZLOTA.
|
||||
if alert.sig not in ["POTA", "SOTA", "WWFF"]:
|
||||
if sig not in ["POTA", "SOTA", "WWFF"]:
|
||||
new_alerts.append(alert)
|
||||
return new_alerts
|
||||
|
||||
@@ -26,9 +26,7 @@ class POTA(HTTPAlertProvider):
|
||||
dx_calls=[source_alert["activator"].upper()],
|
||||
freqs_modes=source_alert["frequencies"],
|
||||
comment=source_alert["comments"],
|
||||
sig="POTA",
|
||||
sig_refs=[SIGRef(id=source_alert["reference"], name=source_alert["name"], url="https://pota.app/#/park/" + source_alert["reference"])],
|
||||
icon=get_icon_for_sig("POTA"),
|
||||
sig_refs=[SIGRef(id=source_alert["reference"], sig="POTA", name=source_alert["name"], url="https://pota.app/#/park/" + source_alert["reference"])],
|
||||
start_time=datetime.strptime(source_alert["startDate"] + source_alert["startTime"],
|
||||
"%Y-%m-%d%H:%M").replace(tzinfo=pytz.UTC).timestamp(),
|
||||
end_time=datetime.strptime(source_alert["endDate"] + source_alert["endTime"],
|
||||
|
||||
@@ -27,9 +27,7 @@ class SOTA(HTTPAlertProvider):
|
||||
dx_names=[source_alert["activatorName"].upper()],
|
||||
freqs_modes=source_alert["frequency"],
|
||||
comment=source_alert["comments"],
|
||||
sig="SOTA",
|
||||
sig_refs=[SIGRef(id=source_alert["associationCode"] + "/" + source_alert["summitCode"], name=source_alert["summitDetails"], url="https://www.sotadata.org.uk/en/summit/" + source_alert["summitCode"])],
|
||||
icon=get_icon_for_sig("SOTA"),
|
||||
sig_refs=[SIGRef(id=source_alert["associationCode"] + "/" + source_alert["summitCode"], sig="SOTA", name=source_alert["summitDetails"])],
|
||||
start_time=datetime.strptime(source_alert["dateActivated"],
|
||||
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=pytz.UTC).timestamp(),
|
||||
is_dxpedition=False)
|
||||
|
||||
@@ -54,9 +54,7 @@ class WOTA(HTTPAlertProvider):
|
||||
dx_calls=[dx_call],
|
||||
freqs_modes=freqs_modes,
|
||||
comment=comment,
|
||||
sig="WOTA",
|
||||
sig_refs=[SIGRef(id=ref, name=ref_name, url="https://www.wota.org.uk/MM_" + ref)] if ref else [],
|
||||
icon=get_icon_for_sig("WOTA"),
|
||||
sig_refs=[SIGRef(id=ref, sig="WOTA", name=ref_name)] if ref else [],
|
||||
start_time=time.timestamp())
|
||||
|
||||
# Add to our list.
|
||||
|
||||
@@ -26,9 +26,7 @@ class WWFF(HTTPAlertProvider):
|
||||
dx_calls=[source_alert["activator_call"].upper()],
|
||||
freqs_modes=source_alert["band"] + " " + source_alert["mode"],
|
||||
comment=source_alert["remarks"],
|
||||
sig="WWFF",
|
||||
sig_refs=[SIGRef(id=source_alert["reference"], url="https://wwff.co/directory/?showRef=" + source_alert["reference"])],
|
||||
icon=get_icon_for_sig("WWFF"),
|
||||
sig_refs=[SIGRef(id=source_alert["reference"], sig="WWFF")],
|
||||
start_time=datetime.strptime(source_alert["utc_start"],
|
||||
"%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC).timestamp(),
|
||||
end_time=datetime.strptime(source_alert["utc_end"],
|
||||
|
||||
@@ -123,9 +123,13 @@ max-alert-age-sec: 604800
|
||||
|
||||
# Login for QRZ.com to look up information. Optional. You will need an "XML Subscriber" (paid) package to retrieve all
|
||||
# the data for a callsign via their system.
|
||||
qrz-username: "N0CALL"
|
||||
qrz-username: ""
|
||||
qrz-password: ""
|
||||
|
||||
# Login for HamQTH to look up information. Optional.
|
||||
hamqth-username: ""
|
||||
hamqth-password: ""
|
||||
|
||||
# API key for Clublog to look up information. Optional. You sill need to request one via their helpdesk portal if you
|
||||
# want to use callsign lookups from Clublog.
|
||||
clublog-api-key: ""
|
||||
|
||||
10
core/cache_utils.py
Normal file
10
core/cache_utils.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from requests_cache import CachedSession
|
||||
|
||||
# Cache for "semi-static" data such as the locations of parks, CSVs of reference lists, etc.
|
||||
# This has an expiry time of 30 days, so will re-request from the source after that amount
|
||||
# of time has passed. This is used throughout Spothole to cache data that does not change
|
||||
# rapidly.
|
||||
SEMI_STATIC_URL_DATA_CACHE = CachedSession("cache/semi_static_url_data_cache",
|
||||
expire_after=timedelta(days=30))
|
||||
@@ -8,6 +8,7 @@ SOFTWARE_VERSION = "0.1"
|
||||
|
||||
# HTTP headers used for spot providers that use HTTP
|
||||
HTTP_HEADERS = {"User-Agent": SOFTWARE_NAME + ", v" + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")"}
|
||||
HAMQTH_PRG = (SOFTWARE_NAME + " v" + SOFTWARE_VERSION + " operated by " + SERVER_OWNER_CALLSIGN).replace(" ", "_")
|
||||
|
||||
# Special Interest Groups
|
||||
SIGS = [
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import gzip
|
||||
import logging
|
||||
import urllib.parse
|
||||
from datetime import timedelta
|
||||
|
||||
import xmltodict
|
||||
from diskcache import Cache
|
||||
from pyhamtools import LookupLib, Callinfo, callinfo
|
||||
from pyhamtools.exceptions import APIKeyMissingError
|
||||
@@ -9,9 +11,11 @@ from pyhamtools.frequency import freq_to_band
|
||||
from pyhamtools.locator import latlong_to_locator
|
||||
from requests_cache import CachedSession
|
||||
|
||||
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
|
||||
from core.config import config
|
||||
from core.constants import BANDS, UNKNOWN_BAND, CW_MODES, PHONE_MODES, DATA_MODES, ALL_MODES, \
|
||||
QRZCQ_CALLSIGN_LOOKUP_DATA, HTTP_HEADERS
|
||||
QRZCQ_CALLSIGN_LOOKUP_DATA, HTTP_HEADERS, HAMQTH_PRG
|
||||
|
||||
|
||||
# Singleton class that provides lookup functionality.
|
||||
class LookupHelper:
|
||||
@@ -31,18 +35,24 @@ class LookupHelper:
|
||||
self.QRZ_CALLSIGN_DATA_CACHE = None
|
||||
self.LOOKUP_LIB_QRZ = None
|
||||
self.QRZ_AVAILABLE = None
|
||||
self.HAMQTH_AVAILABLE = None
|
||||
self.HAMQTH_CALLSIGN_DATA_CACHE = None
|
||||
self.HAMQTH_BASE_URL = "https://www.hamqth.com/xml.php"
|
||||
# HamQTH session keys expire after an hour. Rather than working out how much time has passed manually, we cheat
|
||||
# and cache the HTTP response for 55 minutes, so when the login URL is queried within 55 minutes of the previous
|
||||
# time, you just get the cached response.
|
||||
self.HAMQTH_SESSION_LOOKUP_CACHE = CachedSession("cache/hamqth_session_cache",
|
||||
expire_after=timedelta(minutes=55))
|
||||
self.CALL_INFO_BASIC = None
|
||||
self.LOOKUP_LIB_BASIC = None
|
||||
self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION = None
|
||||
self.COUNTRY_FILES_CTY_PLIST_CACHE = None
|
||||
|
||||
def start(self):
|
||||
# Lookup helpers from pyhamtools. We use four (!) of these. The simplest is country-files.com, which downloads the data
|
||||
# once on startup, and requires no login/key, but does not have the best coverage.
|
||||
# If the user provides login details/API keys, we also set up helpers for QRZ.com, Clublog (live API request), and
|
||||
# Clublog (XML download). The lookup functions iterate through these in a sensible order, looking for suitable data.
|
||||
self.COUNTRY_FILES_CTY_PLIST_CACHE = CachedSession("cache/country_files_city_plist_cache",
|
||||
expire_after=timedelta(days=10))
|
||||
# Lookup helpers from pyhamtools. We use five (!) of these. The simplest is country-files.com, which downloads
|
||||
# the data once on startup, and requires no login/key, but does not have the best coverage.
|
||||
# If the user provides login details/API keys, we also set up helpers for QRZ.com, HamQTH, Clublog (live API
|
||||
# request), and Clublog (XML download). The lookup functions iterate through these in a sensible order, looking
|
||||
# for suitable data.
|
||||
self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION = "cache/cty.plist"
|
||||
success = self.download_country_files_cty_plist()
|
||||
if success:
|
||||
@@ -52,12 +62,15 @@ class LookupHelper:
|
||||
self.LOOKUP_LIB_BASIC = LookupLib(lookuptype="countryfile")
|
||||
self.CALL_INFO_BASIC = Callinfo(self.LOOKUP_LIB_BASIC)
|
||||
|
||||
self.QRZ_AVAILABLE = config["qrz-password"] != ""
|
||||
self.QRZ_AVAILABLE = config["qrz-username"] != "" and config["qrz-password"] != ""
|
||||
if self.QRZ_AVAILABLE:
|
||||
self.LOOKUP_LIB_QRZ = LookupLib(lookuptype="qrz", username=config["qrz-username"],
|
||||
pwd=config["qrz-password"])
|
||||
self.QRZ_CALLSIGN_DATA_CACHE = Cache('cache/qrz_callsign_lookup_cache')
|
||||
|
||||
self.HAMQTH_AVAILABLE = config["hamqth-username"] != "" and config["hamqth-password"] != ""
|
||||
self.HAMQTH_CALLSIGN_DATA_CACHE = Cache('cache/hamqth_callsign_lookup_cache')
|
||||
|
||||
self.CLUBLOG_API_KEY = config["clublog-api-key"]
|
||||
self.CLUBLOG_CTY_XML_CACHE = CachedSession("cache/clublog_cty_xml_cache", expire_after=timedelta(days=10))
|
||||
self.CLUBLOG_API_AVAILABLE = self.CLUBLOG_API_KEY != ""
|
||||
@@ -78,8 +91,8 @@ class LookupHelper:
|
||||
def download_country_files_cty_plist(self):
|
||||
try:
|
||||
logging.info("Downloading Country-files.com cty.plist...")
|
||||
response = self.COUNTRY_FILES_CTY_PLIST_CACHE.get("https://www.country-files.com/cty/cty.plist",
|
||||
headers=HTTP_HEADERS).text
|
||||
response = SEMI_STATIC_URL_DATA_CACHE.get("https://www.country-files.com/cty/cty.plist",
|
||||
headers=HTTP_HEADERS).text
|
||||
|
||||
with open(self.COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION, "w") as f:
|
||||
f.write(response)
|
||||
@@ -148,7 +161,12 @@ class LookupHelper:
|
||||
qrz_data = self.get_qrz_data_for_callsign(call)
|
||||
if qrz_data and "country" in qrz_data:
|
||||
country = qrz_data["country"]
|
||||
# Couldn't get anything from QRZ.com database, try Clublog data
|
||||
# Couldn't get anything from QRZ.com database, try HamQTH
|
||||
if not country:
|
||||
hamqth_data = self.get_hamqth_data_for_callsign(call)
|
||||
if hamqth_data and "country" in hamqth_data:
|
||||
country = hamqth_data["country"]
|
||||
# Couldn't get anything from HamQTH database, try Clublog data
|
||||
if not country:
|
||||
clublog_data = self.get_clublog_xml_data_for_callsign(call)
|
||||
if clublog_data and "Name" in clublog_data:
|
||||
@@ -166,7 +184,6 @@ class LookupHelper:
|
||||
|
||||
# Infer a DXCC ID from a callsign
|
||||
def infer_dxcc_id_from_callsign(self, call):
|
||||
self.get_clublog_xml_data_for_callsign("M0TRT")
|
||||
try:
|
||||
# Start with the basic country-files.com-based decoder.
|
||||
dxcc = self.CALL_INFO_BASIC.get_adif_id(call)
|
||||
@@ -177,7 +194,12 @@ class LookupHelper:
|
||||
qrz_data = self.get_qrz_data_for_callsign(call)
|
||||
if qrz_data and "adif" in qrz_data:
|
||||
dxcc = qrz_data["adif"]
|
||||
# Couldn't get anything from QRZ.com database, try Clublog data
|
||||
# Couldn't get anything from QRZ.com database, try HamQTH
|
||||
if not dxcc:
|
||||
hamqth_data = self.get_hamqth_data_for_callsign(call)
|
||||
if hamqth_data and "adif" in hamqth_data:
|
||||
dxcc = hamqth_data["adif"]
|
||||
# Couldn't get anything from HamQTH database, try Clublog data
|
||||
if not dxcc:
|
||||
clublog_data = self.get_clublog_xml_data_for_callsign(call)
|
||||
if clublog_data and "DXCC" in clublog_data:
|
||||
@@ -200,7 +222,12 @@ class LookupHelper:
|
||||
continent = self.CALL_INFO_BASIC.get_continent(call)
|
||||
except (KeyError, ValueError) as e:
|
||||
continent = None
|
||||
# Couldn't get anything from basic call info database, try Clublog data
|
||||
# Couldn't get anything from basic call info database, try HamQTH
|
||||
if not continent:
|
||||
hamqth_data = self.get_hamqth_data_for_callsign(call)
|
||||
if hamqth_data and "continent" in hamqth_data:
|
||||
country = hamqth_data["continent"]
|
||||
# Couldn't get anything from HamQTH database, try Clublog data
|
||||
if not continent:
|
||||
clublog_data = self.get_clublog_xml_data_for_callsign(call)
|
||||
if clublog_data and "Continent" in clublog_data:
|
||||
@@ -228,7 +255,12 @@ class LookupHelper:
|
||||
qrz_data = self.get_qrz_data_for_callsign(call)
|
||||
if qrz_data and "cqz" in qrz_data:
|
||||
cqz = qrz_data["cqz"]
|
||||
# Couldn't get anything from QRZ.com database, try Clublog data
|
||||
# Couldn't get anything from QRZ.com database, try HamQTH
|
||||
if not cqz:
|
||||
hamqth_data = self.get_hamqth_data_for_callsign(call)
|
||||
if hamqth_data and "cq" in hamqth_data:
|
||||
cqz = hamqth_data["cq"]
|
||||
# Couldn't get anything from HamQTH database, try Clublog data
|
||||
if not cqz:
|
||||
clublog_data = self.get_clublog_xml_data_for_callsign(call)
|
||||
if clublog_data and "CQZ" in clublog_data:
|
||||
@@ -256,13 +288,87 @@ class LookupHelper:
|
||||
qrz_data = self.get_qrz_data_for_callsign(call)
|
||||
if qrz_data and "ituz" in qrz_data:
|
||||
ituz = qrz_data["ituz"]
|
||||
# Couldn't get anything from QRZ.com database, Clublog doesn't provide this, so try QRZCQ data
|
||||
# Couldn't get anything from QRZ.com database, try HamQTH
|
||||
if not ituz:
|
||||
hamqth_data = self.get_hamqth_data_for_callsign(call)
|
||||
if hamqth_data and "itu" in hamqth_data:
|
||||
ituz = hamqth_data["itu"]
|
||||
# Couldn't get anything from HamQTH database, Clublog doesn't provide this, so try QRZCQ data
|
||||
if not ituz:
|
||||
qrzcq_data = self.get_qrzcq_data_for_callsign(call)
|
||||
if qrzcq_data and "ituz" in qrzcq_data:
|
||||
ituz = qrzcq_data["ituz"]
|
||||
return ituz
|
||||
|
||||
# Infer an operator name from a callsign (requires QRZ.com/HamQTH)
|
||||
def infer_name_from_callsign(self, call):
|
||||
data = self.get_qrz_data_for_callsign(call)
|
||||
if data and "fname" in data:
|
||||
name = data["fname"]
|
||||
if "name" in data:
|
||||
name = name + " " + data["name"]
|
||||
return name
|
||||
data = self.get_hamqth_data_for_callsign(call)
|
||||
if data and "nick" in data:
|
||||
return data["nick"]
|
||||
else:
|
||||
return None
|
||||
|
||||
# Infer a latitude and longitude from a callsign (requires QRZ.com/HamQTH)
|
||||
def infer_latlon_from_callsign_qrz(self, call):
|
||||
data = self.get_qrz_data_for_callsign(call)
|
||||
if data and "latitude" in data and "longitude" in data:
|
||||
return [data["latitude"], data["longitude"]]
|
||||
data = self.get_hamqth_data_for_callsign(call)
|
||||
if data and "latitude" in data and "longitude" in data:
|
||||
return [data["latitude"], data["longitude"]]
|
||||
else:
|
||||
return None
|
||||
|
||||
# Infer a grid locator from a callsign (requires QRZ.com/HamQTH)
|
||||
def infer_grid_from_callsign_qrz(self, call):
|
||||
data = self.get_qrz_data_for_callsign(call)
|
||||
if data and "locator" in data:
|
||||
return data["locator"]
|
||||
data = self.get_hamqth_data_for_callsign(call)
|
||||
if data and "grid" in data:
|
||||
return data["grid"]
|
||||
else:
|
||||
return None
|
||||
|
||||
# Infer a latitude and longitude from a callsign (using DXCC, probably very inaccurate)
|
||||
def infer_latlon_from_callsign_dxcc(self, call):
|
||||
try:
|
||||
data = self.CALL_INFO_BASIC.get_lat_long(call)
|
||||
if data and "latitude" in data and "longitude" in data:
|
||||
loc = [data["latitude"], data["longitude"]]
|
||||
else:
|
||||
loc = None
|
||||
except KeyError:
|
||||
loc = None
|
||||
# Couldn't get anything from basic call info database, try Clublog data
|
||||
if not loc:
|
||||
data = self.get_clublog_xml_data_for_callsign(call)
|
||||
if data and "Lat" in data and "Lon" in data:
|
||||
loc = [data["Lat"], data["Lon"]]
|
||||
if not loc:
|
||||
data = self.get_clublog_api_data_for_callsign(call)
|
||||
if data and "Lat" in data and "Lon" in data:
|
||||
loc = [data["Lat"], data["Lon"]]
|
||||
return loc
|
||||
|
||||
# Infer a grid locator from a callsign (using DXCC, probably very inaccurate)
|
||||
def infer_grid_from_callsign_dxcc(self, call):
|
||||
latlon = self.infer_latlon_from_callsign_dxcc(call)
|
||||
return latlong_to_locator(latlon[0], latlon[1], 8)
|
||||
|
||||
# Infer a mode from the frequency (in Hz) according to the band plan. Just a guess really.
|
||||
def infer_mode_from_frequency(self, freq):
|
||||
try:
|
||||
return freq_to_band(freq / 1000.0)["mode"]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
# Utility method to get QRZ.com data from cache if possible, if not get it from the API and cache it
|
||||
def get_qrz_data_for_callsign(self, call):
|
||||
# Fetch from cache if we can, otherwise fetch from the API and cache it
|
||||
@@ -286,6 +392,49 @@ class LookupHelper:
|
||||
else:
|
||||
return None
|
||||
|
||||
# Utility method to get HamQTH data from cache if possible, if not get it from the API and cache it
|
||||
def get_hamqth_data_for_callsign(self, call):
|
||||
# Fetch from cache if we can, otherwise fetch from the API and cache it
|
||||
if call in self.HAMQTH_CALLSIGN_DATA_CACHE:
|
||||
return self.HAMQTH_CALLSIGN_DATA_CACHE.get(call)
|
||||
elif self.HAMQTH_AVAILABLE:
|
||||
try:
|
||||
# First we need to log in and get a session token.
|
||||
session_data = self.HAMQTH_SESSION_LOOKUP_CACHE.get(
|
||||
self.HAMQTH_BASE_URL + "?u=" + urllib.parse.quote_plus(config["hamqth-username"]) +
|
||||
"&p=" + urllib.parse.quote_plus(config["hamqth-password"]), headers=HTTP_HEADERS).content
|
||||
dict_data = xmltodict.parse(session_data)
|
||||
if "session_id" in dict_data["HamQTH"]["session"]:
|
||||
session_id = dict_data["HamQTH"]["session"]["session_id"]
|
||||
|
||||
# Now look up the actual data.
|
||||
try:
|
||||
lookup_data = SEMI_STATIC_URL_DATA_CACHE.get(
|
||||
self.HAMQTH_BASE_URL + "?id=" + session_id + "&callsign=" + urllib.parse.quote_plus(
|
||||
call) + "&prg=" + HAMQTH_PRG, headers=HTTP_HEADERS).content
|
||||
data = xmltodict.parse(lookup_data)["HamQTH"]["search"]
|
||||
self.HAMQTH_CALLSIGN_DATA_CACHE.add(call, data, expire=604800) # 1 week in seconds
|
||||
return data
|
||||
except (KeyError, ValueError):
|
||||
# HamQTH had no info for the call, but maybe it had prefixes or suffixes. Try again with the base call.
|
||||
try:
|
||||
lookup_data = SEMI_STATIC_URL_DATA_CACHE.get(
|
||||
self.HAMQTH_BASE_URL + "?id=" + session_id + "&callsign=" + urllib.parse.quote_plus(
|
||||
callinfo.Callinfo.get_homecall(call)) + "&prg=" + HAMQTH_PRG, headers=HTTP_HEADERS).content
|
||||
data = xmltodict.parse(lookup_data)["HamQTH"]["search"]
|
||||
self.HAMQTH_CALLSIGN_DATA_CACHE.add(call, data, expire=604800) # 1 week in seconds
|
||||
return data
|
||||
except (KeyError, ValueError):
|
||||
# HamQTH had no info for the call, that's OK. Cache a None so we don't try to look this up again
|
||||
self.HAMQTH_CALLSIGN_DATA_CACHE.add(call, None, expire=604800) # 1 week in seconds
|
||||
return None
|
||||
|
||||
else:
|
||||
logging.warn("HamQTH login details incorrect, failed to look up with HamQTH.")
|
||||
except:
|
||||
logging.error("Exception when looking up HamQTH data")
|
||||
return None
|
||||
|
||||
# Utility method to get Clublog API data from cache if possible, if not get it from the API and cache it
|
||||
def get_clublog_api_data_for_callsign(self, call):
|
||||
# Fetch from cache if we can, otherwise fetch from the API and cache it
|
||||
@@ -334,70 +483,11 @@ class LookupHelper:
|
||||
return entry
|
||||
return None
|
||||
|
||||
# Infer an operator name from a callsign (requires QRZ.com)
|
||||
def infer_name_from_callsign(self, call):
|
||||
data = self.get_qrz_data_for_callsign(call)
|
||||
if data and "fname" in data:
|
||||
name = data["fname"]
|
||||
if "name" in data:
|
||||
name = name + " " + data["name"]
|
||||
return name
|
||||
else:
|
||||
return None
|
||||
|
||||
# Infer a latitude and longitude from a callsign (requires QRZ.com)
|
||||
def infer_latlon_from_callsign_qrz(self, call):
|
||||
data = self.get_qrz_data_for_callsign(call)
|
||||
if data and "latitude" in data and "longitude" in data:
|
||||
return [data["latitude"], data["longitude"]]
|
||||
else:
|
||||
return None
|
||||
|
||||
# Infer a grid locator from a callsign (requires QRZ.com)
|
||||
def infer_grid_from_callsign_qrz(self, call):
|
||||
data = self.get_qrz_data_for_callsign(call)
|
||||
if data and "locator" in data:
|
||||
return data["locator"]
|
||||
else:
|
||||
return None
|
||||
|
||||
# Infer a latitude and longitude from a callsign (using DXCC, probably very inaccurate)
|
||||
def infer_latlon_from_callsign_dxcc(self, call):
|
||||
try:
|
||||
data = self.CALL_INFO_BASIC.get_lat_long(call)
|
||||
if data and "latitude" in data and "longitude" in data:
|
||||
loc = [data["latitude"], data["longitude"]]
|
||||
else:
|
||||
loc = None
|
||||
except KeyError:
|
||||
loc = None
|
||||
# Couldn't get anything from basic call info database, try Clublog data
|
||||
if not loc:
|
||||
data = self.get_clublog_xml_data_for_callsign(call)
|
||||
if data and "Lat" in data and "Lon" in data:
|
||||
loc = [data["Lat"], data["Lon"]]
|
||||
if not loc:
|
||||
data = self.get_clublog_api_data_for_callsign(call)
|
||||
if data and "Lat" in data and "Lon" in data:
|
||||
loc = [data["Lat"], data["Lon"]]
|
||||
return loc
|
||||
|
||||
# Infer a grid locator from a callsign (using DXCC, probably very inaccurate)
|
||||
def infer_grid_from_callsign_dxcc(self, call):
|
||||
latlon = self.infer_latlon_from_callsign_dxcc(call)
|
||||
return latlong_to_locator(latlon[0], latlon[1], 8)
|
||||
|
||||
# Infer a mode from the frequency (in Hz) according to the band plan. Just a guess really.
|
||||
def infer_mode_from_frequency(self, freq):
|
||||
try:
|
||||
return freq_to_band(freq / 1000.0)["mode"]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
# Shutdown method to close down any caches neatly.
|
||||
def stop(self):
|
||||
self.QRZ_CALLSIGN_DATA_CACHE.close()
|
||||
self.CLUBLOG_CALLSIGN_DATA_CACHE.close()
|
||||
|
||||
|
||||
# Singleton object
|
||||
lookup_helper = LookupHelper()
|
||||
|
||||
@@ -1,4 +1,13 @@
|
||||
from core.constants import SIGS
|
||||
import csv
|
||||
import logging
|
||||
|
||||
from pyhamtools.locator import latlong_to_locator
|
||||
|
||||
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
|
||||
from core.constants import SIGS, HTTP_HEADERS
|
||||
from core.geo_utils import wab_wai_square_to_lat_lon
|
||||
from data.sig_ref import SIGRef
|
||||
|
||||
|
||||
# Utility function to get the icon for a named SIG. If no match is found, the "circle-question" icon will be returned.
|
||||
def get_icon_for_sig(sig):
|
||||
@@ -7,15 +16,109 @@ def get_icon_for_sig(sig):
|
||||
return s.icon
|
||||
return "circle-question"
|
||||
|
||||
|
||||
# Utility function to get the regex string for a SIG reference for a named SIG. If no match is found, None will be returned.
|
||||
def get_ref_regex_for_sig(sig):
|
||||
for s in SIGS:
|
||||
if s.name == sig:
|
||||
if s.name.upper() == sig.upper():
|
||||
return s.ref_regex
|
||||
return None
|
||||
|
||||
|
||||
# Look up details of a SIG reference (e.g. POTA park) such as name, lat/lon, and grid.
|
||||
# Note there is currently no support for KRMNPA location lookup, see issue #61.
|
||||
def get_sig_ref_info(sig, sig_ref_id):
|
||||
sig_ref = SIGRef(id=sig_ref_id, sig=sig)
|
||||
try:
|
||||
if sig.upper() == "POTA":
|
||||
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.pota.app/park/" + sig_ref_id, headers=HTTP_HEADERS).json()
|
||||
if data:
|
||||
fullname = data["name"] if "name" in data else None
|
||||
if fullname and "parktypeDesc" in data and data["parktypeDesc"] != "":
|
||||
fullname = fullname + " " + data["parktypeDesc"]
|
||||
sig_ref.name = fullname
|
||||
sig_ref.url = "https://pota.app/#/park/" + sig_ref_id
|
||||
sig_ref.grid = data["grid6"] if "grid6" in data else None
|
||||
sig_ref.latitude = data["latitude"] if "latitude" in data else None
|
||||
sig_ref.longitude = data["longitude"] if "longitude" in data else None
|
||||
elif sig.upper() == "SOTA":
|
||||
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api-db2.sota.org.uk/api/summits/" + sig_ref_id,
|
||||
headers=HTTP_HEADERS).json()
|
||||
if data:
|
||||
sig_ref.name = data["name"] if "name" in data else None
|
||||
sig_ref.url = "https://www.sotadata.org.uk/en/summit/" + sig_ref_id
|
||||
sig_ref.grid = data["locator"] if "locator" in data else None
|
||||
sig_ref.latitude = data["latitude"] if "latitude" in data else None
|
||||
sig_ref.longitude = data["longitude"] if "longitude" in data else None
|
||||
elif sig.upper() == "WWBOTA":
|
||||
data = SEMI_STATIC_URL_DATA_CACHE.get("https://api.wwbota.org/bunkers/" + sig_ref_id,
|
||||
headers=HTTP_HEADERS).json()
|
||||
if data:
|
||||
sig_ref.name = data["name"] if "name" in data else None
|
||||
sig_ref.url = "https://bunkerwiki.org/?s=" + sig_ref_id if sig_ref_id.startswith("B/G") else None
|
||||
sig_ref.grid = data["locator"] if "locator" in data else None
|
||||
sig_ref.latitude = data["lat"] if "lat" in data else None
|
||||
sig_ref.longitude = data["long"] if "long" in data else None
|
||||
elif sig.upper() == "GMA" or sig.upper() == "ARLHS" or sig.upper() == "ILLW" or sig.upper() == "WCA" or sig.upper() == "MOTA" or sig.upper() == "IOTA":
|
||||
data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.cqgma.org/api/ref/?" + sig_ref_id,
|
||||
headers=HTTP_HEADERS).json()
|
||||
if data:
|
||||
sig_ref.name = data["name"] if "name" in data else None
|
||||
sig_ref.url = "https://www.cqgma.org/zinfo.php?ref=" + sig_ref_id
|
||||
sig_ref.grid = data["locator"] if "locator" in data else None
|
||||
sig_ref.latitude = data["latitude"] if "latitude" in data else None
|
||||
sig_ref.longitude = data["longitude"] if "longitude" in data else None
|
||||
elif sig.upper() == "WWFF":
|
||||
sig_ref.url = "https://wwff.co/directory/?showRef=" + sig_ref_id
|
||||
elif sig.upper() == "SIOTA":
|
||||
siota_csv_data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.silosontheair.com/data/silos.csv",
|
||||
headers=HTTP_HEADERS)
|
||||
siota_dr = csv.DictReader(siota_csv_data.content.decode().splitlines())
|
||||
for row in siota_dr:
|
||||
if row["SILO_CODE"] == sig_ref_id:
|
||||
sig_ref.name = row["NAME"] if "NAME" in row else None
|
||||
sig_ref.grid = row["LOCATOR"] if "LOCATOR" in row else None
|
||||
sig_ref.latitude = float(row["LAT"]) if "LAT" in row else None
|
||||
sig_ref.longitude = float(row["LNG"]) if "LNG" in row else None
|
||||
elif sig.upper() == "WOTA":
|
||||
data = SEMI_STATIC_URL_DATA_CACHE.get("https://www.wota.org.uk/mapping/data/summits.json",
|
||||
headers=HTTP_HEADERS).json()
|
||||
if data:
|
||||
for feature in data["features"]:
|
||||
if feature["properties"]["wotaId"] == sig_ref_id:
|
||||
sig_ref.name = feature["properties"]["title"]
|
||||
sig_ref.url = "https://www.wota.org.uk/MM_" + sig_ref_id
|
||||
sig_ref.grid = feature["properties"]["qthLocator"]
|
||||
sig_ref.latitude = feature["geometry"]["coordinates"][1]
|
||||
sig_ref.longitude = feature["geometry"]["coordinates"][0]
|
||||
elif sig.upper() == "ZLOTA":
|
||||
data = SEMI_STATIC_URL_DATA_CACHE.get("https://ontheair.nz/assets/assets.json", headers=HTTP_HEADERS).json()
|
||||
if data:
|
||||
for asset in data:
|
||||
if asset["code"] == sig_ref_id:
|
||||
sig_ref.name = asset["name"]
|
||||
sig_ref.url = "https://ontheair.nz/assets/ZLI_OT-030" + sig_ref_id.replace("/", "_")
|
||||
sig_ref.grid = latlong_to_locator(asset["y"], asset["x"], 6)
|
||||
sig_ref.latitude = asset["y"]
|
||||
sig_ref.longitude = asset["x"]
|
||||
elif sig.upper() == "BOTA":
|
||||
if not sig_ref.name:
|
||||
sig_ref.name = sig_ref.id
|
||||
sig_ref.url = "https://www.beachesontheair.com/beaches/" + sig_ref.name.lower().replace(" ", "-")
|
||||
elif sig.upper() == "WAB" or sig.upper() == "WAI":
|
||||
ll = wab_wai_square_to_lat_lon(sig_ref_id)
|
||||
if ll:
|
||||
sig_ref.name = sig_ref_id
|
||||
sig_ref.grid = latlong_to_locator(ll[0], ll[1], 6)
|
||||
sig_ref.latitude = ll[0]
|
||||
sig_ref.longitude = ll[1]
|
||||
except:
|
||||
logging.warn("Failed to look up sig_ref info for " + sig + " ref " + sig_ref_id + ".")
|
||||
return sig_ref
|
||||
|
||||
|
||||
# Regex matching any SIG
|
||||
ANY_SIG_REGEX = r"(" + r"|".join(list(map(lambda p: p.name, SIGS))) + r")"
|
||||
|
||||
# Regex matching any SIG reference
|
||||
ANY_XOTA_SIG_REF_REGEX = r"[\w\/]+\-\d+"
|
||||
ANY_XOTA_SIG_REF_REGEX = r"[\w\/]+\-\d+"
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import copy
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
@@ -9,7 +8,7 @@ import pytz
|
||||
|
||||
from core.constants import DXCC_FLAGS
|
||||
from core.lookup_helper import lookup_helper
|
||||
from core.sig_utils import get_icon_for_sig
|
||||
from core.sig_utils import get_icon_for_sig, get_sig_ref_info
|
||||
|
||||
|
||||
# Data class that defines an alert.
|
||||
@@ -99,6 +98,21 @@ class Alert:
|
||||
if self.dx_dxcc_id and self.dx_dxcc_id in DXCC_FLAGS and not self.dx_flag:
|
||||
self.dx_flag = DXCC_FLAGS[self.dx_dxcc_id]
|
||||
|
||||
# Fetch SIG data. In case a particular API doesn't provide a full set of name, lat, lon & grid for a reference
|
||||
# in its initial call, we use this code to populate the rest of the data. This includes working out grid refs
|
||||
# from WAB and WAI, which count as a SIG even though there's no real lookup, just maths
|
||||
if self.sig_refs and len(self.sig_refs) > 0:
|
||||
for sig_ref in self.sig_refs:
|
||||
lookup_data = get_sig_ref_info(sig_ref.sig, sig_ref.id)
|
||||
if lookup_data:
|
||||
# Update the sig_ref data from the lookup
|
||||
sig_ref.__dict__.update(lookup_data.__dict__)
|
||||
|
||||
# If the spot itself doesn't have a SIG yet, but we have at least one SIG reference, take that reference's SIG
|
||||
# and apply it to the whole spot.
|
||||
if self.sig_refs and len(self.sig_refs) > 0 and not self.sig:
|
||||
self.sig = self.sig_refs[0].sig
|
||||
|
||||
# Icon from SIG
|
||||
if self.sig and not self.icon:
|
||||
self.icon = get_icon_for_sig(self.sig)
|
||||
|
||||
@@ -6,7 +6,15 @@ from dataclasses import dataclass
|
||||
class SIGRef:
|
||||
# Reference ID, e.g. "GB-0001".
|
||||
id: str
|
||||
# SIG that this reference is in, e.g. "POTA".
|
||||
sig: str
|
||||
# Name of the reference, e.g. "Null Country Park", if known.
|
||||
name: str = None
|
||||
# URL to look up more information about the reference, if known.
|
||||
url: str = None
|
||||
url: str = None
|
||||
# Latitude of the reference, if known.
|
||||
latitude: float = None
|
||||
# Longitude of the reference, if known.
|
||||
longitude: float = None
|
||||
# Maidenhead grid reference of the reference, if known.
|
||||
grid: str = None
|
||||
123
data/spot.py
123
data/spot.py
@@ -10,9 +10,9 @@ import pytz
|
||||
from pyhamtools.locator import locator_to_latlong, latlong_to_locator
|
||||
|
||||
from core.constants import DXCC_FLAGS
|
||||
from core.geo_utils import wab_wai_square_to_lat_lon
|
||||
from core.lookup_helper import lookup_helper
|
||||
from core.sig_utils import get_icon_for_sig
|
||||
from core.sig_utils import get_icon_for_sig, get_sig_ref_info, ANY_SIG_REGEX, get_ref_regex_for_sig
|
||||
from data.sig_ref import SIGRef
|
||||
|
||||
|
||||
# Data class that defines a spot.
|
||||
@@ -35,8 +35,6 @@ class Spot:
|
||||
dx_continent: str = None
|
||||
# DXCC ID of the DX operator
|
||||
dx_dxcc_id: int = None
|
||||
# DXCC ID of the spotter
|
||||
de_dxcc_id: int = None
|
||||
# CQ zone of the DX operator
|
||||
dx_cq_zone: int = None
|
||||
# ITU zone of the DX operator
|
||||
@@ -50,11 +48,12 @@ class Spot:
|
||||
# lookup
|
||||
dx_latitude: float = None
|
||||
dx_longitude: float = None
|
||||
# DX Location source. Indicates how accurate the location might be. Values: "SPOT", "WAB/WAI GRID", "QRZ", "DXCC", "NONE"
|
||||
# DX Location source. Indicates how accurate the location might be. Values: "SPOT", "WAB/WAI GRID", "HOME QTH",
|
||||
# "DXCC", "NONE"
|
||||
dx_location_source: str = "NONE"
|
||||
# DX Location good. Indicates that the software thinks the location data is good enough to plot on a map. This is
|
||||
# true if the location source is "SPOT" or "WAB/WAI GRID", or if the location source is "QRZ" and the DX callsign
|
||||
# doesn't have a suffix like /P.
|
||||
# true if the location source is "SPOT" or "WAB/WAI GRID", or if the location source is "HOME QTH" and the DX
|
||||
# callsign doesn't have a suffix like /P.
|
||||
dx_location_good: bool = False
|
||||
|
||||
# DE (Spotter) info
|
||||
@@ -67,6 +66,8 @@ class Spot:
|
||||
de_flag: str = None
|
||||
# Continent of the spotter
|
||||
de_continent: str = None
|
||||
# DXCC ID of the spotter
|
||||
de_dxcc_id: int = None
|
||||
# If this is an APRS/Packet/etc spot, what SSID was the spotter/receiver using?
|
||||
de_ssid: str = None
|
||||
# Maidenhead grid locator for the spotter. This is not going to be from a xOTA reference so it will likely just be
|
||||
@@ -196,12 +197,12 @@ class Spot:
|
||||
|
||||
# Spotter country, continent, zones etc. from callsign.
|
||||
# DE call with no digits, or APRS servers starting "T2" are not things we can look up location for
|
||||
if any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"):
|
||||
if self.de_call and not self.de_country:
|
||||
if self.de_call and any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"):
|
||||
if not self.de_country:
|
||||
self.de_country = lookup_helper.infer_country_from_callsign(self.de_call)
|
||||
if self.de_call and not self.de_continent:
|
||||
if not self.de_continent:
|
||||
self.de_continent = lookup_helper.infer_continent_from_callsign(self.de_call)
|
||||
if self.de_call and not self.de_dxcc_id:
|
||||
if not self.de_dxcc_id:
|
||||
self.de_dxcc_id = lookup_helper.infer_dxcc_id_from_callsign(self.de_call)
|
||||
if self.de_dxcc_id and self.de_dxcc_id in DXCC_FLAGS and not self.de_flag:
|
||||
self.de_flag = DXCC_FLAGS[self.de_dxcc_id]
|
||||
@@ -232,11 +233,68 @@ class Spot:
|
||||
if self.mode and not self.mode_type:
|
||||
self.mode_type = lookup_helper.infer_mode_type_from_mode(self.mode)
|
||||
|
||||
# Icon from SIG
|
||||
if self.sig and not self.icon:
|
||||
# If we have a latitude at this point, it can only have been provided by the spot itself
|
||||
if self.dx_latitude:
|
||||
self.dx_location_source = "SPOT"
|
||||
|
||||
# See if we already have a SIG reference, but the comment looks like it contains more for the same SIG. This
|
||||
# should catch e.g. POTA comments like "2-fer: GB-0001 GB-0002".
|
||||
if self.comment and self.sig_refs and len(self.sig_refs) > 0:
|
||||
sig = self.sig_refs[0].sig.upper()
|
||||
all_comment_refs = re.findall(get_ref_regex_for_sig(sig), self.comment)
|
||||
for ref in all_comment_refs:
|
||||
self.append_sig_ref_if_missing(SIGRef(id=ref.upper(), sig=sig))
|
||||
|
||||
# See if the comment looks like it contains any SIGs (and optionally SIG references) that we can
|
||||
# add to the spot. This should catch cluster spot comments like "POTA GB-0001 WWFF GFF-0001" and e.g. POTA
|
||||
# comments like "also WWFF GFF-0001".
|
||||
if self.comment:
|
||||
sig_matches = re.finditer(r"(^|\W)" + ANY_SIG_REGEX + r"($|\W)", self.comment, re.IGNORECASE)
|
||||
for sig_match in sig_matches:
|
||||
# First of all, if we haven't got a SIG for this spot set yet, now we have. This covers things like cluster
|
||||
# spots where the comment is just "POTA".
|
||||
found_sig = sig_match.group(2).upper()
|
||||
if not self.sig:
|
||||
self.sig = found_sig
|
||||
|
||||
# Now look to see if that SIG name was followed by something that looks like a reference ID for that SIG.
|
||||
# If so, add that to the sig_refs list for this spot.
|
||||
ref_regex = get_ref_regex_for_sig(found_sig)
|
||||
if ref_regex:
|
||||
ref_matches = re.finditer(r"(^|\W)" + found_sig + r"($|\W)(" + ref_regex + r")($|\W)", self.comment, re.IGNORECASE)
|
||||
for ref_match in ref_matches:
|
||||
self.append_sig_ref_if_missing(SIGRef(id=ref_match.group(3).upper(), sig=found_sig))
|
||||
|
||||
# Fetch SIG data. In case a particular API doesn't provide a full set of name, lat, lon & grid for a reference
|
||||
# in its initial call, we use this code to populate the rest of the data. This includes working out grid refs
|
||||
# from WAB and WAI, which count as a SIG even though there's no real lookup, just maths
|
||||
if self.sig_refs and len(self.sig_refs) > 0:
|
||||
for sig_ref in self.sig_refs:
|
||||
lookup_data = get_sig_ref_info(sig_ref.sig, sig_ref.id)
|
||||
if lookup_data:
|
||||
# Update the sig_ref data from the lookup
|
||||
sig_ref.__dict__.update(lookup_data.__dict__)
|
||||
# If the spot itself doesn't have location yet, but the SIG ref does, extract it
|
||||
if lookup_data.grid and not self.dx_grid:
|
||||
self.dx_grid = lookup_data.grid
|
||||
if lookup_data.latitude and not self.dx_latitude:
|
||||
self.dx_latitude = lookup_data.latitude
|
||||
self.dx_longitude = lookup_data.longitude
|
||||
if self.sig == "WAB" or self.sig == "WAI":
|
||||
self.dx_location_source = "WAB/WAI GRID"
|
||||
else:
|
||||
self.dx_location_source = "SIG REF LOOKUP"
|
||||
|
||||
# If the spot itself doesn't have a SIG yet, but we have at least one SIG reference, take that reference's SIG
|
||||
# and apply it to the whole spot.
|
||||
if self.sig_refs and len(self.sig_refs) > 0 and not self.sig:
|
||||
self.sig = self.sig_refs[0].sig
|
||||
|
||||
# Icon from SIG if we have one
|
||||
if self.sig:
|
||||
self.icon = get_icon_for_sig(self.sig)
|
||||
|
||||
# DX Grid to lat/lon and vice versa
|
||||
# DX Grid to lat/lon and vice versa in case one is missing
|
||||
if self.dx_grid and not self.dx_latitude:
|
||||
ll = locator_to_latlong(self.dx_grid)
|
||||
self.dx_latitude = ll[0]
|
||||
@@ -246,21 +304,6 @@ class Spot:
|
||||
self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8)
|
||||
except:
|
||||
logging.debug("Invalid lat/lon received for spot")
|
||||
if self.dx_latitude:
|
||||
self.dx_location_source = "SPOT"
|
||||
|
||||
# WAB/WAI grid to lat/lon
|
||||
if not self.dx_latitude and self.sig and self.sig_refs and len(self.sig_refs) > 0 and (
|
||||
self.sig == "WAB" or self.sig == "WAI"):
|
||||
ll = wab_wai_square_to_lat_lon(self.sig_refs[0])
|
||||
if ll:
|
||||
self.dx_latitude = ll[0]
|
||||
self.dx_longitude = ll[1]
|
||||
try:
|
||||
self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8)
|
||||
except:
|
||||
logging.debug("Invalid lat/lon received from WAB/WAI grid")
|
||||
self.dx_location_source = "WAB/WAI GRID"
|
||||
|
||||
# QRT comment detection
|
||||
if self.comment and not self.qrt:
|
||||
@@ -277,7 +320,7 @@ class Spot:
|
||||
self.dx_latitude = latlon[0]
|
||||
self.dx_longitude = latlon[1]
|
||||
self.dx_grid = lookup_helper.infer_grid_from_callsign_qrz(self.dx_call)
|
||||
self.dx_location_source = "QRZ"
|
||||
self.dx_location_source = "HOME QTH"
|
||||
|
||||
# Last resort for getting a DX position, use the DXCC entity.
|
||||
if self.dx_call and not self.dx_latitude:
|
||||
@@ -290,13 +333,14 @@ class Spot:
|
||||
|
||||
# DX Location is "good" if it is from a spot, or from QRZ if the callsign doesn't contain a slash, so the operator
|
||||
# is likely at home.
|
||||
self.dx_location_good = self.dx_location_source == "SPOT" or self.dx_location_source == "WAB/WAI GRID" or (
|
||||
self.dx_location_source == "QRZ" and not "/" in self.dx_call)
|
||||
self.dx_location_good = (self.dx_location_source == "SPOT" or self.dx_location_source == "SIG REF LOOKUP"
|
||||
or self.dx_location_source == "WAB/WAI GRID"
|
||||
or (self.dx_location_source == "HOME QTH" and not "/" in self.dx_call))
|
||||
|
||||
# DE with no digits and APRS servers starting "T2" are not things we can look up location for
|
||||
if any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"):
|
||||
if self.de_call and any(char.isdigit() for char in self.de_call) and not (self.de_call.startswith("T2") and self.source == "APRS-IS"):
|
||||
# DE operator position lookup, using QRZ.com.
|
||||
if self.de_call and not self.de_latitude:
|
||||
if not self.de_latitude:
|
||||
latlon = lookup_helper.infer_latlon_from_callsign_qrz(self.de_call)
|
||||
if latlon:
|
||||
self.de_latitude = latlon[0]
|
||||
@@ -304,7 +348,7 @@ class Spot:
|
||||
self.de_grid = lookup_helper.infer_grid_from_callsign_qrz(self.de_call)
|
||||
|
||||
# Last resort for getting a DE position, use the DXCC entity.
|
||||
if self.de_call and not self.de_latitude:
|
||||
if not self.de_latitude:
|
||||
latlon = lookup_helper.infer_latlon_from_callsign_dxcc(self.de_call)
|
||||
if latlon:
|
||||
self.de_latitude = latlon[0]
|
||||
@@ -324,3 +368,12 @@ class Spot:
|
||||
# JSON serialise
|
||||
def to_json(self):
|
||||
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)
|
||||
|
||||
# Append a sig_ref to the list, so long as it's not already there.
|
||||
def append_sig_ref_if_missing(self, new_sig_ref):
|
||||
if not self.sig_refs:
|
||||
self.sig_refs = []
|
||||
for sig_ref in self.sig_refs:
|
||||
if sig_ref.id.upper() == new_sig_ref.id.upper() and sig_ref.sig.upper() == new_sig_ref.sig.upper():
|
||||
return
|
||||
self.sig_refs.append(new_sig_ref)
|
||||
|
||||
@@ -12,7 +12,7 @@ from core.config import MAX_SPOT_AGE, ALLOW_SPOTTING
|
||||
from core.constants import BANDS, ALL_MODES, MODE_TYPES, SIGS, CONTINENTS, SOFTWARE_VERSION, UNKNOWN_BAND
|
||||
from core.lookup_helper import lookup_helper
|
||||
from core.prometheus_metrics_handler import page_requests_counter, get_metrics, api_requests_counter
|
||||
from core.sig_utils import get_ref_regex_for_sig
|
||||
from core.sig_utils import get_ref_regex_for_sig, get_sig_ref_info
|
||||
from data.sig_ref import SIGRef
|
||||
from data.spot import Spot
|
||||
|
||||
@@ -43,6 +43,8 @@ class WebServer:
|
||||
bottle.get("/api/v1/alerts")(lambda: self.serve_alerts_api())
|
||||
bottle.get("/api/v1/options")(lambda: self.serve_api(self.get_options()))
|
||||
bottle.get("/api/v1/status")(lambda: self.serve_api(self.status_data))
|
||||
bottle.get("/api/v1/lookup/call")(lambda: self.serve_call_lookup_api())
|
||||
bottle.get("/api/v1/lookup/sigref")(lambda: self.serve_sig_ref_lookup_api())
|
||||
bottle.post("/api/v1/spot")(lambda: self.accept_spot())
|
||||
# Routes for templated pages
|
||||
bottle.get("/")(lambda: self.serve_template('webpage_spots'))
|
||||
@@ -100,6 +102,82 @@ class WebServer:
|
||||
response.status = 500
|
||||
return json.dumps("Error - " + str(e), default=serialize_everything)
|
||||
|
||||
# Look up data for a callsign
|
||||
def serve_call_lookup_api(self):
|
||||
try:
|
||||
# Reject if no callsign
|
||||
query = bottle.request.query
|
||||
if not "call" in query.keys():
|
||||
response.content_type = 'application/json'
|
||||
response.status = 422
|
||||
return json.dumps("Error - call must be provided", default=serialize_everything)
|
||||
call = query.get("call").upper()
|
||||
|
||||
# Reject badly formatted callsigns
|
||||
if not re.match(r"^[A-Za-z0-9/\-]*$", call):
|
||||
response.content_type = 'application/json'
|
||||
response.status = 422
|
||||
return json.dumps("Error - '" + call + "' does not look like a valid callsign.",
|
||||
default=serialize_everything)
|
||||
|
||||
# Take the callsign, make a "fake spot" so we can run infer_missing() on it, then repack the resulting data
|
||||
# in the correct way for the API response.
|
||||
fake_spot = Spot(dx_call=call)
|
||||
fake_spot.infer_missing()
|
||||
return self.serve_api({
|
||||
"call": call,
|
||||
"name": fake_spot.dx_name,
|
||||
"country": fake_spot.dx_country,
|
||||
"flag": fake_spot.dx_flag,
|
||||
"continent": fake_spot.dx_continent,
|
||||
"dxcc_id": fake_spot.dx_dxcc_id,
|
||||
"cq_zone": fake_spot.dx_cq_zone,
|
||||
"itu_zone": fake_spot.dx_itu_zone,
|
||||
"grid": fake_spot.dx_grid,
|
||||
"latitude": fake_spot.dx_latitude,
|
||||
"longitude": fake_spot.dx_longitude,
|
||||
"location_source": fake_spot.dx_location_source
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
response.content_type = 'application/json'
|
||||
response.status = 500
|
||||
return json.dumps("Error - " + str(e), default=serialize_everything)
|
||||
|
||||
# Look up data for a SIG reference
|
||||
def serve_sig_ref_lookup_api(self):
|
||||
try:
|
||||
# Reject if no sig or sig_ref
|
||||
query = bottle.request.query
|
||||
if not "sig" in query.keys() or not "id" in query.keys():
|
||||
response.content_type = 'application/json'
|
||||
response.status = 422
|
||||
return json.dumps("Error - sig and id must be provided", default=serialize_everything)
|
||||
sig = query.get("sig").upper()
|
||||
id = query.get("id").upper()
|
||||
|
||||
# Reject if sig unknown
|
||||
if not sig in list(map(lambda p: p.name, SIGS)):
|
||||
response.content_type = 'application/json'
|
||||
response.status = 422
|
||||
return json.dumps("Error - sig '" + sig + "' is not known.", default=serialize_everything)
|
||||
|
||||
# Reject if sig_ref format incorrect for sig
|
||||
if get_ref_regex_for_sig(sig) and not re.match(get_ref_regex_for_sig(sig), id):
|
||||
response.content_type = 'application/json'
|
||||
response.status = 422
|
||||
return json.dumps("Error - '" + id + "' does not look like a valid reference ID for " + sig + ".", default=serialize_everything)
|
||||
|
||||
data = get_sig_ref_info(sig, id)
|
||||
return self.serve_api(data)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
response.content_type = 'application/json'
|
||||
response.status = 500
|
||||
return json.dumps("Error - " + str(e), default=serialize_everything)
|
||||
|
||||
# Serve a JSON API endpoint
|
||||
def serve_api(self, data):
|
||||
self.last_api_access_time = datetime.now(pytz.UTC)
|
||||
@@ -182,7 +260,6 @@ class WebServer:
|
||||
return json.dumps("Error - '" + spot.dx_grid + "' does not look like a valid Maidenhead grid.", default=serialize_everything)
|
||||
|
||||
# Reject if sig_ref format incorrect for sig
|
||||
print(spot.sig_refs[0])
|
||||
if spot.sig and spot.sig_refs and len(spot.sig_refs) > 0 and spot.sig_refs[0].id and get_ref_regex_for_sig(spot.sig) and not re.match(get_ref_regex_for_sig(spot.sig), spot.sig_refs[0].id):
|
||||
response.content_type = 'application/json'
|
||||
response.status = 422
|
||||
|
||||
@@ -8,8 +8,6 @@ import pytz
|
||||
import telnetlib3
|
||||
|
||||
from core.config import SERVER_OWNER_CALLSIGN
|
||||
from core.sig_utils import ANY_SIG_REGEX, get_icon_for_sig, get_ref_regex_for_sig
|
||||
from data.sig_ref import SIGRef
|
||||
from data.spot import Spot
|
||||
from spotproviders.spot_provider import SpotProvider
|
||||
|
||||
@@ -77,21 +75,6 @@ class DXCluster(SpotProvider):
|
||||
icon="desktop",
|
||||
time=spot_datetime.timestamp())
|
||||
|
||||
# See if the comment looks like it contains a SIG (and optionally SIG reference). Currently,
|
||||
# only one sig ref is supported. Note that this code is specifically in the DX Cluster class and
|
||||
# not in the general "spot" infer_missing() method. Because we only support one SIG per spot
|
||||
# at the moment (see issue #54), we don't want to risk e.g. a POTA spot with comment "WWFF GFF-0001"
|
||||
# being converted into a WWFF spot.
|
||||
sig_match = re.search(r"(^|\W)" + ANY_SIG_REGEX + r"($|\W)", spot.comment, re.IGNORECASE)
|
||||
if sig_match:
|
||||
spot.sig = sig_match.group(2).upper()
|
||||
spot.icon = get_icon_for_sig(spot.sig)
|
||||
ref_regex = get_ref_regex_for_sig(spot.sig)
|
||||
if ref_regex:
|
||||
sig_ref_match = re.search(r"(^|\W)" + spot.sig + r"($|\W)(" + ref_regex + r")($|\W)", spot.comment, re.IGNORECASE)
|
||||
if sig_ref_match:
|
||||
spot.sig_refs = [SIGRef(id=sig_ref_match.group(3).upper())]
|
||||
|
||||
# Add to our list
|
||||
self.submit(spot)
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
|
||||
import pytz
|
||||
from requests_cache import CachedSession
|
||||
|
||||
from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
|
||||
from core.constants import HTTP_HEADERS
|
||||
from core.sig_utils import get_icon_for_sig
|
||||
from data.sig_ref import SIGRef
|
||||
@@ -17,8 +17,6 @@ class GMA(HTTPSpotProvider):
|
||||
SPOTS_URL = "https://www.cqgma.org/api/spots/25/"
|
||||
# GMA spots don't contain the details of the programme they are for, we need a separate lookup for that
|
||||
REF_INFO_URL_ROOT = "https://www.cqgma.org/api/ref/?"
|
||||
REF_INFO_CACHE_TIME_DAYS = 30
|
||||
REF_INFO_CACHE = CachedSession("cache/gma_ref_info_cache", expire_after=timedelta(days=REF_INFO_CACHE_TIME_DAYS))
|
||||
|
||||
def __init__(self, provider_config):
|
||||
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
|
||||
@@ -36,7 +34,7 @@ class GMA(HTTPSpotProvider):
|
||||
mode=source_spot["MODE"].upper() if "<>" not in source_spot["MODE"] else None,
|
||||
# Filter out some weird mode strings
|
||||
comment=source_spot["TEXT"],
|
||||
sig_refs=[SIGRef(id=source_spot["REF"], name=source_spot["NAME"], url="https://www.cqgma.org/zinfo.php?ref=" + source_spot["REF"])],
|
||||
sig_refs=[SIGRef(id=source_spot["REF"], sig="", name=source_spot["NAME"])],
|
||||
time=datetime.strptime(source_spot["DATE"] + source_spot["TIME"], "%Y%m%d%H%M").replace(
|
||||
tzinfo=pytz.UTC).timestamp(),
|
||||
dx_latitude=float(source_spot["LAT"]) if (source_spot["LAT"] and source_spot["LAT"] != "") else None,
|
||||
@@ -44,7 +42,7 @@ class GMA(HTTPSpotProvider):
|
||||
dx_longitude=float(source_spot["LON"]) if (source_spot["LON"] and source_spot["LON"] != "") else None)
|
||||
|
||||
# GMA doesn't give what programme (SIG) the reference is for until we separately look it up.
|
||||
ref_response = self.REF_INFO_CACHE.get(self.REF_INFO_URL_ROOT + source_spot["REF"],
|
||||
ref_response = SEMI_STATIC_URL_DATA_CACHE.get(self.REF_INFO_URL_ROOT + source_spot["REF"],
|
||||
headers=HTTP_HEADERS)
|
||||
# Sometimes this is blank, so handle that
|
||||
if ref_response.text is not None and ref_response.text != "":
|
||||
@@ -56,22 +54,21 @@ class GMA(HTTPSpotProvider):
|
||||
if ref_info["reftype"] not in ["POTA", "WWFF"] and (ref_info["reftype"] != "Summit" or ref_info["sota"] == ""):
|
||||
match ref_info["reftype"]:
|
||||
case "Summit":
|
||||
spot.sig = "GMA"
|
||||
spot.sig_refs[0].sig = "GMA"
|
||||
case "IOTA Island":
|
||||
spot.sig = "IOTA"
|
||||
spot.sig_refs[0].sig = "IOTA"
|
||||
case "Lighthouse (ILLW)":
|
||||
spot.sig = "ILLW"
|
||||
spot.sig_refs[0].sig = "ILLW"
|
||||
case "Lighthouse (ARLHS)":
|
||||
spot.sig = "ARLHS"
|
||||
spot.sig_refs[0].sig = "ARLHS"
|
||||
case "Castle":
|
||||
spot.sig = "WCA"
|
||||
spot.sig_refs[0].sig = "WCA"
|
||||
case "Mill":
|
||||
spot.sig = "MOTA"
|
||||
spot.sig_refs[0].sig = "MOTA"
|
||||
case _:
|
||||
logging.warn("GMA spot found with ref type " + ref_info[
|
||||
"reftype"] + ", developer needs to add support for this!")
|
||||
spot.sig = ref_info["reftype"]
|
||||
spot.icon = get_icon_for_sig(spot.sig)
|
||||
spot.sig_refs[0].sig = ref_info["reftype"]
|
||||
|
||||
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
|
||||
# that for us.
|
||||
|
||||
@@ -53,9 +53,7 @@ class HEMA(HTTPSpotProvider):
|
||||
freq=float(freq_mode_match.group(1)) * 1000000,
|
||||
mode=freq_mode_match.group(2).upper(),
|
||||
comment=spotter_comment_match.group(2),
|
||||
sig="HEMA",
|
||||
sig_refs=[SIGRef(id=spot_items[3].upper(), name=spot_items[4])],
|
||||
icon=get_icon_for_sig("HEMA"),
|
||||
sig_refs=[SIGRef(id=spot_items[3].upper(), sig="HEMA", name=spot_items[4])],
|
||||
time=datetime.strptime(spot_items[0], "%d/%m/%Y %H:%M").replace(tzinfo=pytz.UTC).timestamp(),
|
||||
dx_latitude=float(spot_items[7]),
|
||||
dx_longitude=float(spot_items[8]))
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
import csv
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
|
||||
import pytz
|
||||
from requests_cache import CachedSession
|
||||
|
||||
from core.constants import HTTP_HEADERS
|
||||
from core.sig_utils import get_icon_for_sig
|
||||
from data.sig_ref import SIGRef
|
||||
from data.spot import Spot
|
||||
@@ -18,8 +15,6 @@ class ParksNPeaks(HTTPSpotProvider):
|
||||
POLL_INTERVAL_SEC = 120
|
||||
SPOTS_URL = "https://www.parksnpeaks.org/api/ALL"
|
||||
SIOTA_LIST_URL = "https://www.silosontheair.com/data/silos.csv"
|
||||
SIOTA_LIST_CACHE_TIME_DAYS = 30
|
||||
SIOTA_LIST_CACHE = CachedSession("cache/siota_data_cache", expire_after=timedelta(days=SIOTA_LIST_CACHE_TIME_DAYS))
|
||||
|
||||
def __init__(self, provider_config):
|
||||
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
|
||||
@@ -38,9 +33,7 @@ class ParksNPeaks(HTTPSpotProvider):
|
||||
# Seen PNP spots with empty frequency, and with comma-separated thousands digits
|
||||
mode=source_spot["actMode"].upper(),
|
||||
comment=source_spot["actComments"],
|
||||
sig=source_spot["actClass"],
|
||||
sig_refs=[SIGRef(id=source_spot["actSiteID"])],
|
||||
icon=get_icon_for_sig(source_spot["actClass"]),
|
||||
sig_refs=[SIGRef(id=source_spot["actSiteID"], sig=source_spot["actClass"].upper())],
|
||||
time=datetime.strptime(source_spot["actTime"], "%Y-%m-%d %H:%M:%S").replace(
|
||||
tzinfo=pytz.UTC).timestamp())
|
||||
|
||||
@@ -54,24 +47,11 @@ class ParksNPeaks(HTTPSpotProvider):
|
||||
spot.de_call = m.group(1)
|
||||
|
||||
# Log a warning for the developer if PnP gives us an unknown programme we've never seen before
|
||||
if spot.sig not in ["POTA", "SOTA", "WWFF", "SiOTA", "ZLOTA", "KRMNPA"]:
|
||||
if spot.sig_refs[0].sig not in ["POTA", "SOTA", "WWFF", "SIOTA", "ZLOTA", "KRMNPA"]:
|
||||
logging.warn("PNP spot found with sig " + spot.sig + ", developer needs to add support for this!")
|
||||
|
||||
# SiOTA lat/lon/grid lookup
|
||||
if spot.sig == "SiOTA":
|
||||
siota_csv_data = self.SIOTA_LIST_CACHE.get(self.SIOTA_LIST_URL, headers=HTTP_HEADERS)
|
||||
siota_dr = csv.DictReader(siota_csv_data.content.decode().splitlines())
|
||||
for row in siota_dr:
|
||||
if row["SILO_CODE"] == spot.sig_refs[0]:
|
||||
spot.dx_latitude = float(row["LAT"])
|
||||
spot.dx_longitude = float(row["LNG"])
|
||||
spot.dx_grid = row["LOCATOR"]
|
||||
break
|
||||
|
||||
# Note there is currently no support for KRMNPA location lookup, see issue #61.
|
||||
|
||||
# If this is POTA, SOTA, WWFF or ZLOTA data we already have it through other means, so ignore. Otherwise,
|
||||
# add to the spot list.
|
||||
if spot.sig not in ["POTA", "SOTA", "WWFF", "ZLOTA"]:
|
||||
if spot.sig_refs[0].sig not in ["POTA", "SOTA", "WWFF", "ZLOTA"]:
|
||||
new_spots.append(spot)
|
||||
return new_spots
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
|
||||
import pytz
|
||||
from requests_cache import CachedSession
|
||||
|
||||
from core.constants import HTTP_HEADERS
|
||||
from core.sig_utils import get_icon_for_sig, get_ref_regex_for_sig
|
||||
from data.sig_ref import SIGRef
|
||||
from data.spot import Spot
|
||||
@@ -17,9 +15,6 @@ class POTA(HTTPSpotProvider):
|
||||
SPOTS_URL = "https://api.pota.app/spot/activator"
|
||||
# Might need to look up extra park data
|
||||
PARK_URL_ROOT = "https://api.pota.app/park/"
|
||||
PARK_DATA_CACHE_TIME_DAYS = 30
|
||||
PARK_DATA_CACHE = CachedSession("cache/pota_park_data_cache",
|
||||
expire_after=timedelta(days=PARK_DATA_CACHE_TIME_DAYS))
|
||||
|
||||
def __init__(self, provider_config):
|
||||
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
|
||||
@@ -36,30 +31,13 @@ class POTA(HTTPSpotProvider):
|
||||
freq=float(source_spot["frequency"]) * 1000,
|
||||
mode=source_spot["mode"].upper(),
|
||||
comment=source_spot["comments"],
|
||||
sig="POTA",
|
||||
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["name"], url="https://pota.app/#/park/" + source_spot["reference"])],
|
||||
icon=get_icon_for_sig("POTA"),
|
||||
sig_refs=[SIGRef(id=source_spot["reference"], sig="POTA", name=source_spot["name"])],
|
||||
time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(
|
||||
tzinfo=pytz.UTC).timestamp(),
|
||||
dx_grid=source_spot["grid6"],
|
||||
dx_latitude=source_spot["latitude"],
|
||||
dx_longitude=source_spot["longitude"])
|
||||
|
||||
# Sometimes we can get other refs in the comments for n-fer activations, extract them
|
||||
all_comment_refs = re.findall(get_ref_regex_for_sig("POTA"), spot.comment)
|
||||
for r in all_comment_refs:
|
||||
if r not in list(map(lambda ref: ref.id, spot.sig_refs)):
|
||||
ref = SIGRef(id=r.upper(), url="https://pota.app/#/park/" + r.upper())
|
||||
|
||||
# Now we need to look up the name of that reference from the API, because the comment won't have it
|
||||
park_response = self.PARK_DATA_CACHE.get(self.PARK_URL_ROOT + r.upper(), headers=HTTP_HEADERS)
|
||||
park_data = park_response.json()
|
||||
if park_data and "name" in park_data:
|
||||
ref.name = park_data["name"]
|
||||
|
||||
# Finally append our new reference to the spot's reference list
|
||||
spot.sig_refs.append(ref)
|
||||
|
||||
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
|
||||
# that for us.
|
||||
new_spots.append(spot)
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
from requests_cache import CachedSession
|
||||
|
||||
from core.constants import HTTP_HEADERS
|
||||
from core.sig_utils import get_icon_for_sig
|
||||
@@ -21,8 +19,6 @@ class SOTA(HTTPSpotProvider):
|
||||
SPOTS_URL = "https://api-db2.sota.org.uk/api/spots/60/all/all"
|
||||
# SOTA spots don't contain lat/lon, we need a separate lookup for that
|
||||
SUMMIT_URL_ROOT = "https://api-db2.sota.org.uk/api/summits/"
|
||||
SUMMIT_DATA_CACHE_TIME_DAYS = 30
|
||||
SUMMIT_DATA_CACHE = CachedSession("cache/sota_summit_data_cache", expire_after=timedelta(days=SUMMIT_DATA_CACHE_TIME_DAYS))
|
||||
|
||||
def __init__(self, provider_config):
|
||||
super().__init__(provider_config, self.EPOCH_URL, self.POLL_INTERVAL_SEC)
|
||||
@@ -49,22 +45,10 @@ class SOTA(HTTPSpotProvider):
|
||||
freq=(float(source_spot["frequency"]) * 1000000) if (source_spot["frequency"] is not None) else None, # Seen SOTA spots with no frequency!
|
||||
mode=source_spot["mode"].upper(),
|
||||
comment=source_spot["comments"],
|
||||
sig="SOTA",
|
||||
sig_refs=[SIGRef(id=source_spot["summitCode"], name=source_spot["summitName"], url="https://www.sotadata.org.uk/en/summit/" + source_spot["summitCode"])],
|
||||
icon=get_icon_for_sig("SOTA"),
|
||||
sig_refs=[SIGRef(id=source_spot["summitCode"], sig="SOTA", name=source_spot["summitName"])],
|
||||
time=datetime.fromisoformat(source_spot["timeStamp"]).timestamp(),
|
||||
activation_score=source_spot["points"])
|
||||
|
||||
# SOTA doesn't give summit lat/lon/grid in the main call, so we need another separate call for this
|
||||
try:
|
||||
summit_response = self.SUMMIT_DATA_CACHE.get(self.SUMMIT_URL_ROOT + source_spot["summitCode"], headers=HTTP_HEADERS)
|
||||
summit_data = summit_response.json()
|
||||
spot.dx_grid = summit_data["locator"]
|
||||
spot.dx_latitude = summit_data["latitude"]
|
||||
spot.dx_longitude = summit_data["longitude"]
|
||||
except Exception:
|
||||
logging.warn("Looking up summit " + source_spot["summitCode"] + " from the SOTA API failed. No summit data was available.")
|
||||
|
||||
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
|
||||
# that for us.
|
||||
new_spots.append(spot)
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
from datetime import timedelta, datetime
|
||||
from datetime import datetime
|
||||
|
||||
import pytz
|
||||
from requests_cache import CachedSession
|
||||
from rss_parser import RSSParser
|
||||
|
||||
from core.constants import HTTP_HEADERS
|
||||
from core.sig_utils import get_icon_for_sig
|
||||
from data.sig_ref import SIGRef
|
||||
from data.spot import Spot
|
||||
@@ -16,8 +14,6 @@ class WOTA(HTTPSpotProvider):
|
||||
POLL_INTERVAL_SEC = 120
|
||||
SPOTS_URL = "https://www.wota.org.uk/spots_rss.php"
|
||||
LIST_URL = "https://www.wota.org.uk/mapping/data/summits.json"
|
||||
LIST_CACHE_TIME_DAYS = 30
|
||||
LIST_CACHE = CachedSession("cache/wota_data_cache", expire_after=timedelta(days=LIST_CACHE_TIME_DAYS))
|
||||
RSS_DATE_TIME_FORMAT = "%a, %d %b %Y %H:%M:%S %z"
|
||||
|
||||
def __init__(self, provider_config):
|
||||
@@ -68,21 +64,8 @@ class WOTA(HTTPSpotProvider):
|
||||
freq=freq_hz,
|
||||
mode=mode,
|
||||
comment=comment,
|
||||
sig="WOTA",
|
||||
sig_refs=[SIGRef(id=ref, name=ref_name, url="https://www.wota.org.uk/MM_" + ref)] if ref else [],
|
||||
icon=get_icon_for_sig("WOTA"),
|
||||
sig_refs=[SIGRef(id=ref, sig="WOTA", name=ref_name)] if ref else [],
|
||||
time=time.timestamp())
|
||||
|
||||
# WOTA name/grid/lat/lon lookup
|
||||
if ref:
|
||||
wota_data = self.LIST_CACHE.get(self.LIST_URL, headers=HTTP_HEADERS).json()
|
||||
for feature in wota_data["features"]:
|
||||
if feature["properties"]["wotaId"] == ref:
|
||||
spot.sig_refs[0].name = feature["properties"]["title"]
|
||||
spot.dx_latitude = feature["geometry"]["coordinates"][1]
|
||||
spot.dx_longitude = feature["geometry"]["coordinates"][0]
|
||||
spot.dx_grid = feature["properties"]["qthLocator"]
|
||||
break
|
||||
|
||||
new_spots.append(spot)
|
||||
return new_spots
|
||||
|
||||
@@ -20,10 +20,7 @@ class WWBOTA(SSESpotProvider):
|
||||
# n-fer activations.
|
||||
refs = []
|
||||
for ref in source_spot["references"]:
|
||||
sigref = SIGRef(id=ref["reference"], name=ref["name"])
|
||||
# Bunkerbase URLs only work for UK bunkers, so only add a URL if we have a B/G prefix.
|
||||
if ref["reference"].startswith("B/G"):
|
||||
sigref.url="https://bunkerwiki.org/?s=" + ref["reference"]
|
||||
sigref = SIGRef(id=ref["reference"], sig="WWBOTA", name=ref["name"])
|
||||
refs.append(sigref)
|
||||
|
||||
spot = Spot(source=self.name,
|
||||
@@ -32,9 +29,7 @@ class WWBOTA(SSESpotProvider):
|
||||
freq=float(source_spot["freq"]) * 1000000,
|
||||
mode=source_spot["mode"].upper(),
|
||||
comment=source_spot["comment"],
|
||||
sig="WWBOTA",
|
||||
sig_refs=refs,
|
||||
icon=get_icon_for_sig("WWBOTA"),
|
||||
time=datetime.fromisoformat(source_spot["time"]).timestamp(),
|
||||
# WWBOTA spots can contain multiple references for bunkers being activated simultaneously. For
|
||||
# now, we will just pick the first one to use as our grid, latitude and longitude.
|
||||
|
||||
@@ -28,9 +28,7 @@ class WWFF(HTTPSpotProvider):
|
||||
freq=float(source_spot["frequency_khz"]) * 1000,
|
||||
mode=source_spot["mode"].upper(),
|
||||
comment=source_spot["remarks"],
|
||||
sig="WWFF",
|
||||
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["reference_name"], url="https://wwff.co/directory/?showRef=" + source_spot["reference"])],
|
||||
icon=get_icon_for_sig("WWFF"),
|
||||
sig_refs=[SIGRef(id=source_spot["reference"], sig="WWFF", name=source_spot["reference_name"])],
|
||||
time=datetime.fromtimestamp(source_spot["spot_time"], tz=pytz.UTC).timestamp(),
|
||||
dx_latitude=source_spot["latitude"],
|
||||
dx_longitude=source_spot["longitude"])
|
||||
|
||||
@@ -1,12 +1,7 @@
|
||||
import csv
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
|
||||
import pytz
|
||||
from requests_cache import CachedSession
|
||||
|
||||
from core.constants import HTTP_HEADERS
|
||||
from core.sig_utils import get_icon_for_sig
|
||||
from data.sig_ref import SIGRef
|
||||
from data.spot import Spot
|
||||
@@ -18,8 +13,6 @@ class ZLOTA(HTTPSpotProvider):
|
||||
POLL_INTERVAL_SEC = 120
|
||||
SPOTS_URL = "https://ontheair.nz/api/spots?zlota_only=true"
|
||||
LIST_URL = "https://ontheair.nz/assets/assets.json"
|
||||
LIST_CACHE_TIME_DAYS = 30
|
||||
LIST_CACHE = CachedSession("cache/zlota_data_cache", expire_after=timedelta(days=LIST_CACHE_TIME_DAYS))
|
||||
|
||||
def __init__(self, provider_config):
|
||||
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
|
||||
@@ -41,18 +34,8 @@ class ZLOTA(HTTPSpotProvider):
|
||||
freq=freq_hz,
|
||||
mode=source_spot["mode"].upper().strip(),
|
||||
comment=source_spot["comments"],
|
||||
sig="ZLOTA",
|
||||
sig_refs=[SIGRef(id=source_spot["reference"], name=source_spot["name"])],
|
||||
icon=get_icon_for_sig("ZLOTA"),
|
||||
sig_refs=[SIGRef(id=source_spot["reference"], sig="ZLOTA", name=source_spot["name"])],
|
||||
time=datetime.fromisoformat(source_spot["referenced_time"]).astimezone(pytz.UTC).timestamp())
|
||||
|
||||
# ZLOTA lat/lon lookup
|
||||
zlota_data = self.LIST_CACHE.get(self.LIST_URL, headers=HTTP_HEADERS).json()
|
||||
for asset in zlota_data:
|
||||
if asset["code"] == spot.sig_refs[0]:
|
||||
spot.dx_latitude = asset["y"]
|
||||
spot.dx_longitude = asset["x"]
|
||||
break
|
||||
|
||||
new_spots.append(spot)
|
||||
return new_spots
|
||||
|
||||
@@ -17,7 +17,7 @@ paths:
|
||||
/spots:
|
||||
get:
|
||||
tags:
|
||||
- spots
|
||||
- Spots
|
||||
summary: Get spots
|
||||
description: The main API call that retrieves spots from the system. Supply this with no query parameters to retrieve all spots known to the system. Supply query parameters to filter what is retrieved.
|
||||
operationId: spots
|
||||
@@ -247,7 +247,7 @@ paths:
|
||||
/alerts:
|
||||
get:
|
||||
tags:
|
||||
- alerts
|
||||
- Alerts
|
||||
summary: Get alerts
|
||||
description: Retrieves alerts (indications of upcoming activations) from the system. Supply this with no query parameters to retrieve all alerts known to the system. Supply query parameters to filter what is retrieved.
|
||||
operationId: spots
|
||||
@@ -355,7 +355,7 @@ paths:
|
||||
/status:
|
||||
get:
|
||||
tags:
|
||||
- general
|
||||
- General
|
||||
summary: Get server status
|
||||
description: Query information about the server for use in a diagnostics display.
|
||||
operationId: status
|
||||
@@ -432,7 +432,7 @@ paths:
|
||||
/options:
|
||||
get:
|
||||
tags:
|
||||
- general
|
||||
- General
|
||||
summary: Get enumeration options
|
||||
description: Retrieves the list of options for various enumerated types, which can be found in the spots and also provided back to the API as query parameters. While these enumerated options are defined in this spec anyway, providing them in an API call allows us to define extra parameters, like the colours associated with bands, and also allows clients to set up their filters and features without having to have internal knowledge about, for example, what bands the server knows about.
|
||||
operationId: options
|
||||
@@ -488,10 +488,155 @@ paths:
|
||||
example: true
|
||||
|
||||
|
||||
/lookup/call:
|
||||
get:
|
||||
tags:
|
||||
- Utilities
|
||||
summary: Look up callsign details
|
||||
description: Perform a lookup of data about a certain callsign, using any of the lookup services available to the Spothole server.
|
||||
operationId: call
|
||||
parameters:
|
||||
- name: call
|
||||
in: query
|
||||
description: A callsign
|
||||
required: true
|
||||
type: string
|
||||
example: M0TRT
|
||||
responses:
|
||||
'200':
|
||||
description: Success
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
call:
|
||||
type: string
|
||||
description: Callsign, as provided to the API
|
||||
example: M0TRT
|
||||
name:
|
||||
type: string
|
||||
description: Name of the operator
|
||||
example: Ian
|
||||
country:
|
||||
type: string
|
||||
description: Country of the operator
|
||||
example: United Kingdom
|
||||
flag:
|
||||
type: string
|
||||
description: Country flag of the operator
|
||||
example: ""
|
||||
continent:
|
||||
type: string
|
||||
description: Continent of the operator
|
||||
enum:
|
||||
- EU
|
||||
- NA
|
||||
- SA
|
||||
- AS
|
||||
- AF
|
||||
- OC
|
||||
- AN
|
||||
example: EU
|
||||
dxcc_id:
|
||||
type: integer
|
||||
description: DXCC ID of the operator
|
||||
example: 235
|
||||
cq_zone:
|
||||
type: integer
|
||||
description: CQ zone of the operator
|
||||
example: 27
|
||||
itu_zone:
|
||||
type: integer
|
||||
description: ITU zone of the operator
|
||||
example: 14
|
||||
grid:
|
||||
type: string
|
||||
description: Maidenhead grid locator for the operator's QTH. This could be from an online lookup service, or just based on the DXCC.
|
||||
example: IO91aa
|
||||
latitude:
|
||||
type: number
|
||||
description: Latitude of the operator's QTH, in degrees. This could be from an online lookup service, or just based on the DXCC.
|
||||
example: 51.2345
|
||||
longitude:
|
||||
type: number
|
||||
description: Longitude of the opertor's QTH, in degrees. This could be from an online lookup service, or just based on the DXCC.
|
||||
example: -1.2345
|
||||
location_source:
|
||||
type: string
|
||||
description: Where we got the location (grid/latitude/longitude) from. Unlike a spot where we might have a summit position or WAB square, here the only options are an online QTH lookup, or a location based purely on DXCC, or nothing.
|
||||
enum:
|
||||
- "HOME QTH"
|
||||
- DXCC
|
||||
- NONE
|
||||
example: "HOME QTH"
|
||||
'422':
|
||||
description: Validation error e.g. callsign missing or format incorrect
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
example: "Failed"
|
||||
|
||||
|
||||
/lookup/sigref:
|
||||
get:
|
||||
tags:
|
||||
- Utilities
|
||||
summary: Look up SIG ref details
|
||||
description: Perform a lookup of data about a certain reference, providing the SIG and the ID of the reference. A SIGRef structure will be returned containing the SIG and ID, plus any other information Spothole could find about it.
|
||||
operationId: sigref
|
||||
parameters:
|
||||
- name: sig
|
||||
in: query
|
||||
description: Special Interest Group (SIG), e.g. outdoor activity programme such as POTA
|
||||
required: true
|
||||
type: string
|
||||
enum:
|
||||
- POTA
|
||||
- SOTA
|
||||
- WWFF
|
||||
- WWBOTA
|
||||
- GMA
|
||||
- HEMA
|
||||
- WCA
|
||||
- MOTA
|
||||
- SIOTA
|
||||
- ARLHS
|
||||
- ILLW
|
||||
- ZLOTA
|
||||
- IOTA
|
||||
- WOTA
|
||||
- BOTA
|
||||
- WAB
|
||||
- WAI
|
||||
example: POTA
|
||||
- name: id
|
||||
in: query
|
||||
description: ID of a reference in that SIG
|
||||
required: true
|
||||
type: string
|
||||
example: GB-0001
|
||||
responses:
|
||||
'200':
|
||||
description: Success
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/SIGRef'
|
||||
'422':
|
||||
description: Validation error e.g. SIG not supported or reference format incorrect
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
example: "Failed"
|
||||
|
||||
|
||||
/spot:
|
||||
post:
|
||||
tags:
|
||||
- spots
|
||||
- Spots
|
||||
summary: Add a spot
|
||||
description: "Supply a new spot object, which will be added to the system. Currently, this will not be reported up the chain to a cluster, POTA, SOTA etc. This may be introduced in a future version. cURL example: `curl --request POST --header \"Content-Type: application/json\" --data '{\"dx_call\":\"M0TRT\",\"time\":1760019539, \"freq\":14200000, \"comment\":\"Test spot please ignore\", \"de_call\":\"M0TRT\"}' https://spothole.app/api/v1/spot`"
|
||||
operationId: spot
|
||||
@@ -542,6 +687,28 @@ components:
|
||||
type: string
|
||||
description: SIG reference ID.
|
||||
example: GB-0001
|
||||
sig:
|
||||
type: string
|
||||
description: SIG that this reference is in.
|
||||
enum:
|
||||
- POTA
|
||||
- SOTA
|
||||
- WWFF
|
||||
- WWBOTA
|
||||
- GMA
|
||||
- HEMA
|
||||
- WCA
|
||||
- MOTA
|
||||
- SIOTA
|
||||
- ARLHS
|
||||
- ILLW
|
||||
- ZLOTA
|
||||
- IOTA
|
||||
- WOTA
|
||||
- BOTA
|
||||
- WAB
|
||||
- WAI
|
||||
example: POTA
|
||||
name:
|
||||
type: string
|
||||
description: SIG reference name
|
||||
@@ -550,6 +717,18 @@ components:
|
||||
type: string
|
||||
description: SIG reference URL, which the user can look up for more information
|
||||
example: "https://pota.app/#/park/GB-0001"
|
||||
grid:
|
||||
type: string
|
||||
description: Maidenhead grid locator for the reference, if known.
|
||||
example: IO91aa
|
||||
latitude:
|
||||
type: number
|
||||
description: Latitude of the reference, in degrees, if known.
|
||||
example: 51.2345
|
||||
longitude:
|
||||
type: number
|
||||
description: Longitude of the reference, in degrees, if known.
|
||||
example: -1.2345
|
||||
|
||||
Spot:
|
||||
type: object
|
||||
@@ -616,17 +795,18 @@ components:
|
||||
example: -1.2345
|
||||
dx_location_source:
|
||||
type: string
|
||||
description: Where we got the DX location (grid/latitude/longitude) from. If this was from the spot itself, it's likely quite accurate, but if we had to fall back to QRZ lookup, or even a location based on the DXCC itself, it will be a lot less accurate.
|
||||
description: Where we got the DX location (grid/latitude/longitude) from. If this was from the spot itself, or from a lookup of the SIG ref (e.g. park) it's likely quite accurate, but if we had to fall back to QRZ lookup, or even a location based on the DXCC itself, it will be a lot less accurate.
|
||||
enum:
|
||||
- SPOT
|
||||
- "SIG REF LOOKUP"
|
||||
- "WAB/WAI GRID"
|
||||
- QRZ
|
||||
- "HOME QTH"
|
||||
- DXCC
|
||||
- NONE
|
||||
example: SPOT
|
||||
dx_location_good:
|
||||
type: boolean
|
||||
description: Does the software think the location is good enough to put a marker on a map? This is true if the source is "SPOT" or "WAB/WAI GRID", or alternatively if the source is "QRZ" and the callsign doesn't have a slash in it (i.e. operator likely at home).
|
||||
description: Does the software think the location is good enough to put a marker on a map? This is true if the source is "SPOT", "SIG REF LOOKUP" or "WAB/WAI GRID", or alternatively if the source is "HOME QTH" and the callsign doesn't have a slash in it (i.e. operator likely at home).
|
||||
example: true
|
||||
de_call:
|
||||
type: string
|
||||
|
||||
@@ -249,6 +249,7 @@ $(document).ready(function() {
|
||||
setUpMap();
|
||||
// Call loadOptions(), this will then trigger loading spots and setting up timers.
|
||||
loadOptions();
|
||||
// Prevent scrolling actions in the popup menus being passed through to the map
|
||||
// Prevent mouse scroll and touch actions in the popup menus being passed through to the map
|
||||
L.DomEvent.disableScrollPropagation(document.getElementById('maptools'));
|
||||
L.DomEvent.disableClickPropagation(document.getElementById('maptools'));
|
||||
});
|
||||
@@ -156,7 +156,7 @@ function updateTable() {
|
||||
var bearing = calcBearing(userPos[0], userPos[1], s["dx_latitude"], s["dx_longitude"]);
|
||||
bearingText = bearing.toFixed(0).padStart(3, '0') + "°";
|
||||
if (s["dx_location_good"] == null || s["dx_location_good"] == false) {
|
||||
if (s["dx_location_source"] == "QRZ") {
|
||||
if (s["dx_location_source"] == "HOME QTH") {
|
||||
bearingText = bearingText + "<span class='bearing-q hideonmobile'><i class='fa-solid fa-circle-question' title='The position was not reported via the spotting service. We had to fall back to a QRZ \"home\" location for a portable/mobile/alternative spot, so this bearing may not be accurate if the DX is close to you..'></i></span>";
|
||||
} else {
|
||||
bearingText = bearingText + "<span class='bearing-q hideonmobile'><i class='fa-solid fa-circle-question' title='The position was not reported via the spotting service. We had to fall back to just using the centre of a DXCC entity, so this bearing may not be accurate if the DX is close to you.'></i></span>";
|
||||
|
||||
Reference in New Issue
Block a user