mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-05-30 17:35:11 +00:00
Modify the backend so that instead of using the server owner's QRZ & HamQTH credentials, it instead requires them to be provided by the client (if none are provided, the lookups do not occur.)
This commit is contained in:
@@ -5,6 +5,7 @@ import re
|
||||
import urllib.parse
|
||||
from datetime import timedelta
|
||||
|
||||
import requests
|
||||
import xmltodict
|
||||
from diskcache import Cache
|
||||
from pyhamtools import LookupLib, Callinfo, callinfo
|
||||
@@ -17,6 +18,38 @@ from core.cache_utils import SEMI_STATIC_URL_DATA_CACHE
|
||||
from core.config import config
|
||||
from core.constants import BANDS, UNKNOWN_BAND, CW_MODES, PHONE_MODES, DATA_MODES, ALL_MODES, \
|
||||
HTTP_HEADERS, HAMQTH_PRG, MODE_ALIASES
|
||||
from data.lookup_credentials import LookupCredentials
|
||||
|
||||
# QRZ XML field names differ from pyhamtools' normalised names; map them here.
|
||||
_QRZ_FIELD_MAP = {
|
||||
"lat": "latitude",
|
||||
"lon": "longitude",
|
||||
"grid": "locator",
|
||||
"ituzone": "ituz",
|
||||
"cqzone": "cqz",
|
||||
}
|
||||
_QRZ_INT_FIELDS = {"adif", "cqz", "ituz"}
|
||||
_QRZ_FLOAT_FIELDS = {"latitude", "longitude"}
|
||||
|
||||
|
||||
def _normalize_qrz_data(raw):
|
||||
data = {}
|
||||
for k, v in raw.items():
|
||||
if v is None:
|
||||
continue
|
||||
mapped_key = _QRZ_FIELD_MAP.get(k, k)
|
||||
if mapped_key in _QRZ_INT_FIELDS:
|
||||
try:
|
||||
v = int(v)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
elif mapped_key in _QRZ_FLOAT_FIELDS:
|
||||
try:
|
||||
v = float(v)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
data[mapped_key] = v
|
||||
return data
|
||||
|
||||
|
||||
class LookupHelper:
|
||||
@@ -36,9 +69,10 @@ class LookupHelper:
|
||||
self._clublog_cty_xml_cache = None
|
||||
self._clublog_api_key = None
|
||||
self._qrz_callsign_data_cache = None
|
||||
self._lookup_lib_qrz = None
|
||||
self._qrz_available = None
|
||||
self._hamqth_available = None
|
||||
self._qrz_base_url = "https://xmldata.qrz.com/xml/current/"
|
||||
# QRZ session keys expire after an hour; cache the login response for 55 minutes.
|
||||
self._qrz_session_cache = CachedSession("cache/qrz_session_cache",
|
||||
expire_after=timedelta(minutes=55))
|
||||
self._hamqth_callsign_data_cache = None
|
||||
self._hamqth_base_url = "https://www.hamqth.com/xml.php"
|
||||
# HamQTH session keys expire after an hour. Rather than working out how much time has passed manually, we cheat
|
||||
@@ -67,13 +101,8 @@ class LookupHelper:
|
||||
self._lookup_lib_basic = LookupLib(lookuptype="countryfile")
|
||||
self._call_info_basic = Callinfo(self._lookup_lib_basic)
|
||||
|
||||
self._qrz_available = config["qrz-username"] != "" and config["qrz-password"] != ""
|
||||
if self._qrz_available:
|
||||
self._lookup_lib_qrz = LookupLib(lookuptype="qrz", username=config["qrz-username"],
|
||||
pwd=config["qrz-password"])
|
||||
self._qrz_callsign_data_cache = Cache('cache/qrz_callsign_lookup_cache')
|
||||
|
||||
self._hamqth_available = config["hamqth-username"] != "" and config["hamqth-password"] != ""
|
||||
self._hamqth_callsign_data_cache = Cache('cache/hamqth_callsign_lookup_cache')
|
||||
|
||||
self._clublog_api_key = config["clublog-api-key"]
|
||||
@@ -166,7 +195,7 @@ class LookupHelper:
|
||||
logging.error("Exception when downloading Clublog cty.xml", e)
|
||||
return False
|
||||
|
||||
def infer_country_from_callsign(self, call):
|
||||
def infer_country_from_callsign(self, call, credentials=None):
|
||||
"""Infer a country name from a callsign"""
|
||||
|
||||
try:
|
||||
@@ -176,12 +205,12 @@ class LookupHelper:
|
||||
country = None
|
||||
# Couldn't get anything from basic call info database, try QRZ.com
|
||||
if not country:
|
||||
qrz_data = self._get_qrz_data_for_callsign(call)
|
||||
qrz_data = self._get_qrz_data_for_callsign(call, credentials)
|
||||
if qrz_data and "country" in qrz_data:
|
||||
country = qrz_data["country"]
|
||||
# Couldn't get anything from QRZ.com database, try HamQTH
|
||||
if not country:
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call)
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if hamqth_data and "country" in hamqth_data:
|
||||
country = hamqth_data["country"]
|
||||
# Couldn't get anything from HamQTH database, try Clublog data
|
||||
@@ -200,7 +229,7 @@ class LookupHelper:
|
||||
country = dxcc_data["name"]
|
||||
return country
|
||||
|
||||
def infer_dxcc_id_from_callsign(self, call):
|
||||
def infer_dxcc_id_from_callsign(self, call, credentials=None):
|
||||
"""Infer a DXCC ID from a callsign"""
|
||||
|
||||
try:
|
||||
@@ -210,12 +239,12 @@ class LookupHelper:
|
||||
dxcc = None
|
||||
# Couldn't get anything from basic call info database, try QRZ.com
|
||||
if not dxcc:
|
||||
qrz_data = self._get_qrz_data_for_callsign(call)
|
||||
qrz_data = self._get_qrz_data_for_callsign(call, credentials)
|
||||
if qrz_data and "adif" in qrz_data:
|
||||
dxcc = qrz_data["adif"]
|
||||
# Couldn't get anything from QRZ.com database, try HamQTH
|
||||
if not dxcc:
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call)
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if hamqth_data and "adif" in hamqth_data:
|
||||
dxcc = hamqth_data["adif"]
|
||||
# Couldn't get anything from HamQTH database, try Clublog data
|
||||
@@ -234,7 +263,7 @@ class LookupHelper:
|
||||
dxcc = dxcc_data["entityCode"]
|
||||
return dxcc
|
||||
|
||||
def infer_continent_from_callsign(self, call):
|
||||
def infer_continent_from_callsign(self, call, credentials=None):
|
||||
"""Infer a continent shortcode from a callsign"""
|
||||
|
||||
try:
|
||||
@@ -244,7 +273,7 @@ class LookupHelper:
|
||||
continent = None
|
||||
# Couldn't get anything from basic call info database, try HamQTH
|
||||
if not continent:
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call)
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if hamqth_data and "continent" in hamqth_data:
|
||||
continent = hamqth_data["continent"]
|
||||
# Couldn't get anything from HamQTH database, try Clublog data
|
||||
@@ -264,7 +293,7 @@ class LookupHelper:
|
||||
continent = dxcc_data["continent"][0]
|
||||
return continent
|
||||
|
||||
def infer_cq_zone_from_callsign(self, call):
|
||||
def infer_cq_zone_from_callsign(self, call, credentials=None):
|
||||
"""Infer a CQ zone from a callsign"""
|
||||
|
||||
try:
|
||||
@@ -274,12 +303,12 @@ class LookupHelper:
|
||||
cqz = None
|
||||
# Couldn't get anything from basic call info database, try QRZ.com
|
||||
if not cqz:
|
||||
qrz_data = self._get_qrz_data_for_callsign(call)
|
||||
qrz_data = self._get_qrz_data_for_callsign(call, credentials)
|
||||
if qrz_data and "cqz" in qrz_data:
|
||||
cqz = qrz_data["cqz"]
|
||||
# Couldn't get anything from QRZ.com database, try HamQTH
|
||||
if not cqz:
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call)
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if hamqth_data and "cq" in hamqth_data:
|
||||
cqz = hamqth_data["cq"]
|
||||
# Couldn't get anything from HamQTH database, try Clublog data
|
||||
@@ -299,7 +328,7 @@ class LookupHelper:
|
||||
cqz = dxcc_data["cq"][0]
|
||||
return cqz
|
||||
|
||||
def infer_itu_zone_from_callsign(self, call):
|
||||
def infer_itu_zone_from_callsign(self, call, credentials=None):
|
||||
"""Infer a ITU zone from a callsign"""
|
||||
|
||||
try:
|
||||
@@ -309,12 +338,12 @@ class LookupHelper:
|
||||
ituz = None
|
||||
# Couldn't get anything from basic call info database, try QRZ.com
|
||||
if not ituz:
|
||||
qrz_data = self._get_qrz_data_for_callsign(call)
|
||||
qrz_data = self._get_qrz_data_for_callsign(call, credentials)
|
||||
if qrz_data and "ituz" in qrz_data:
|
||||
ituz = qrz_data["ituz"]
|
||||
# Couldn't get anything from QRZ.com database, try HamQTH
|
||||
if not ituz:
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call)
|
||||
hamqth_data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if hamqth_data and "itu" in hamqth_data:
|
||||
ituz = hamqth_data["itu"]
|
||||
# Couldn't get anything from HamQTH database, Clublog doesn't provide this, so try DXCC data
|
||||
@@ -330,31 +359,31 @@ class LookupHelper:
|
||||
|
||||
return self._dxcc_data[dxcc]["flag"] if dxcc in self._dxcc_data else None
|
||||
|
||||
def infer_name_from_callsign_online_lookup(self, call):
|
||||
def infer_name_from_callsign_online_lookup(self, call, credentials=None):
|
||||
"""Infer an operator name from a callsign (requires QRZ.com/HamQTH)"""
|
||||
|
||||
data = self._get_qrz_data_for_callsign(call)
|
||||
data = self._get_qrz_data_for_callsign(call, credentials)
|
||||
if data and "fname" in data:
|
||||
name = data["fname"]
|
||||
if "name" in data:
|
||||
name = name + " " + data["name"]
|
||||
return name
|
||||
data = self._get_hamqth_data_for_callsign(call)
|
||||
data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if data and "nick" in data:
|
||||
return data["nick"]
|
||||
else:
|
||||
return None
|
||||
|
||||
def infer_latlon_from_callsign_online_lookup(self, call):
|
||||
def infer_latlon_from_callsign_online_lookup(self, call, credentials=None):
|
||||
"""Infer a latitude and longitude from a callsign (requires QRZ.com/HamQTH)
|
||||
Coordinates that look default are rejected (apologies if your position really is 0,0, enjoy your voyage)"""
|
||||
|
||||
data = self._get_qrz_data_for_callsign(call)
|
||||
data = self._get_qrz_data_for_callsign(call, credentials)
|
||||
if data and "latitude" in data and "longitude" in data and (
|
||||
float(data["latitude"]) != 0 or float(data["longitude"]) != 0) and -89.9 < float(
|
||||
data["latitude"]) < 89.9:
|
||||
return [float(data["latitude"]), float(data["longitude"])]
|
||||
data = self._get_hamqth_data_for_callsign(call)
|
||||
data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if data and "latitude" in data and "longitude" in data and (
|
||||
float(data["latitude"]) != 0 or float(data["longitude"]) != 0) and -89.9 < float(
|
||||
data["latitude"]) < 89.9:
|
||||
@@ -362,28 +391,28 @@ class LookupHelper:
|
||||
else:
|
||||
return None
|
||||
|
||||
def infer_grid_from_callsign_online_lookup(self, call):
|
||||
def infer_grid_from_callsign_online_lookup(self, call, credentials=None):
|
||||
"""Infer a grid locator from a callsign (requires QRZ.com/HamQTH).
|
||||
Grids that look default are rejected (apologies if your grid really is AA00aa, enjoy your research)"""
|
||||
|
||||
data = self._get_qrz_data_for_callsign(call)
|
||||
data = self._get_qrz_data_for_callsign(call, credentials)
|
||||
if data and "locator" in data and data["locator"].upper() != "AA00" and data["locator"].upper() != "AA00AA" and \
|
||||
data["locator"].upper() != "AA00AA00":
|
||||
return data["locator"]
|
||||
data = self._get_hamqth_data_for_callsign(call)
|
||||
data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if data and "grid" in data and data["grid"].upper() != "AA00" and data["grid"].upper() != "AA00AA" and data[
|
||||
"grid"].upper() != "AA00AA00":
|
||||
return data["grid"]
|
||||
else:
|
||||
return None
|
||||
|
||||
def infer_qth_from_callsign_online_lookup(self, call):
|
||||
def infer_qth_from_callsign_online_lookup(self, call, credentials=None):
|
||||
"""Infer a textual QTH from a callsign (requires QRZ.com/HamQTH)"""
|
||||
|
||||
data = self._get_qrz_data_for_callsign(call)
|
||||
data = self._get_qrz_data_for_callsign(call, credentials)
|
||||
if data and "addr2" in data:
|
||||
return data["addr2"]
|
||||
data = self._get_hamqth_data_for_callsign(call)
|
||||
data = self._get_hamqth_data_for_callsign(call, credentials)
|
||||
if data and "qth" in data:
|
||||
return data["qth"]
|
||||
else:
|
||||
@@ -422,79 +451,116 @@ class LookupHelper:
|
||||
logging.debug("Invalid lat/lon received for DXCC")
|
||||
return grid
|
||||
|
||||
def _get_qrz_data_for_callsign(self, call):
|
||||
"""Utility method to get QRZ.com data from cache if possible, if not get it from the API and cache it"""
|
||||
def _get_qrz_data_for_callsign(self, call, credentials):
|
||||
"""Utility method to get QRZ.com data from cache if possible, if not get it from the API and cache it.
|
||||
Returns None immediately if no credentials are provided."""
|
||||
|
||||
# Fetch from cache if we can, otherwise fetch from the API and cache it
|
||||
# Return from cache if available (a cached None means 'not found in QRZ')
|
||||
if call in self._qrz_callsign_data_cache:
|
||||
return self._qrz_callsign_data_cache.get(call)
|
||||
elif self._qrz_available:
|
||||
|
||||
# Obtain session key from credentials
|
||||
session_key = None
|
||||
if credentials and credentials.qrz_session_key:
|
||||
session_key = credentials.qrz_session_key
|
||||
elif credentials and credentials.qrz_username and credentials.qrz_password:
|
||||
try:
|
||||
data = self._lookup_lib_qrz.lookup_callsign(callsign=call)
|
||||
self._qrz_callsign_data_cache.add(call, data, expire=604800) # 1 week in seconds
|
||||
return data
|
||||
except (KeyError, ValueError):
|
||||
# QRZ had no info for the call, but maybe it had prefixes or suffixes. Try again with the base call.
|
||||
try:
|
||||
data = self._lookup_lib_qrz.lookup_callsign(callsign=callinfo.Callinfo.get_homecall(call))
|
||||
self._qrz_callsign_data_cache.add(call, data, expire=604800) # 1 week in seconds
|
||||
return data
|
||||
except (KeyError, ValueError):
|
||||
# QRZ had no info for the call, that's OK. Cache a None so we don't try to look this up again
|
||||
self._qrz_callsign_data_cache.add(call, None, expire=604800) # 1 week in seconds
|
||||
login_response = self._qrz_session_cache.get(
|
||||
self._qrz_base_url + "?username=" + urllib.parse.quote_plus(credentials.qrz_username) +
|
||||
"&password=" + urllib.parse.quote_plus(credentials.qrz_password) + "&agent=spothole",
|
||||
headers=HTTP_HEADERS).content
|
||||
login_data = xmltodict.parse(login_response)
|
||||
session = login_data.get("QRZDatabase", {}).get("Session", {})
|
||||
if "Key" in session:
|
||||
session_key = session["Key"]
|
||||
else:
|
||||
logging.warning("QRZ.com login details incorrect, failed to look up with QRZ.")
|
||||
return None
|
||||
except Exception:
|
||||
# General exception like a timeout when communicating with QRZ. Return None this time, but don't cache
|
||||
# that, so we can try again next time.
|
||||
logging.error("Exception when looking up QRZ data")
|
||||
logging.error("Exception when getting QRZ.com session key")
|
||||
return None
|
||||
else:
|
||||
|
||||
if not session_key:
|
||||
return None
|
||||
|
||||
def _get_hamqth_data_for_callsign(self, call):
|
||||
"""Utility method to get HamQTH data from cache if possible, if not get it from the API and cache it"""
|
||||
# Try the call as given, then fall back to the base call (strips /P, /M etc.)
|
||||
calls_to_try = [call]
|
||||
home_call = callinfo.Callinfo.get_homecall(call)
|
||||
if home_call != call:
|
||||
calls_to_try.append(home_call)
|
||||
|
||||
# Fetch from cache if we can, otherwise fetch from the API and cache it
|
||||
for lookup_call in calls_to_try:
|
||||
try:
|
||||
lookup_response = requests.get(
|
||||
self._qrz_base_url + "?s=" + session_key + "&callsign=" + urllib.parse.quote_plus(lookup_call),
|
||||
headers=HTTP_HEADERS, timeout=10).content
|
||||
raw = xmltodict.parse(lookup_response).get("QRZDatabase", {}).get("Callsign")
|
||||
if raw:
|
||||
data = _normalize_qrz_data(raw)
|
||||
self._qrz_callsign_data_cache.add(call, data, expire=604800) # 1 week in seconds
|
||||
return data
|
||||
except (KeyError, ValueError):
|
||||
continue
|
||||
except Exception:
|
||||
logging.error("Exception when looking up QRZ data")
|
||||
return None
|
||||
|
||||
# Not found in QRZ; cache None so we don't keep retrying
|
||||
self._qrz_callsign_data_cache.add(call, None, expire=604800) # 1 week in seconds
|
||||
return None
|
||||
|
||||
def _get_hamqth_data_for_callsign(self, call, credentials):
|
||||
"""Utility method to get HamQTH data from cache if possible, if not get it from the API and cache it.
|
||||
Returns None immediately if no credentials are provided."""
|
||||
|
||||
# Return from cache if available
|
||||
if call in self._hamqth_callsign_data_cache:
|
||||
return self._hamqth_callsign_data_cache.get(call)
|
||||
elif self._hamqth_available:
|
||||
|
||||
# Obtain session ID from credentials
|
||||
session_id = None
|
||||
if credentials and credentials.hamqth_session_id:
|
||||
session_id = credentials.hamqth_session_id
|
||||
elif credentials and credentials.hamqth_username and credentials.hamqth_password:
|
||||
try:
|
||||
# First we need to log in and get a session token.
|
||||
session_data = self._hamqth_session_lookup_cache.get(
|
||||
self._hamqth_base_url + "?u=" + urllib.parse.quote_plus(config["hamqth-username"]) +
|
||||
"&p=" + urllib.parse.quote_plus(config["hamqth-password"]), headers=HTTP_HEADERS).content
|
||||
self._hamqth_base_url + "?u=" + urllib.parse.quote_plus(credentials.hamqth_username) +
|
||||
"&p=" + urllib.parse.quote_plus(credentials.hamqth_password), headers=HTTP_HEADERS).content
|
||||
dict_data = xmltodict.parse(session_data)
|
||||
if "session_id" in dict_data["HamQTH"]["session"]:
|
||||
session_id = dict_data["HamQTH"]["session"]["session_id"]
|
||||
|
||||
# Now look up the actual data.
|
||||
try:
|
||||
lookup_data = SEMI_STATIC_URL_DATA_CACHE.get(
|
||||
self._hamqth_base_url + "?id=" + session_id + "&callsign=" + urllib.parse.quote_plus(
|
||||
call) + "&prg=" + HAMQTH_PRG, headers=HTTP_HEADERS).content
|
||||
data = xmltodict.parse(lookup_data)["HamQTH"]["search"]
|
||||
self._hamqth_callsign_data_cache.add(call, data, expire=604800) # 1 week in seconds
|
||||
return data
|
||||
except (KeyError, ValueError):
|
||||
# HamQTH had no info for the call, but maybe it had prefixes or suffixes. Try again with the base call.
|
||||
try:
|
||||
lookup_data = SEMI_STATIC_URL_DATA_CACHE.get(
|
||||
self._hamqth_base_url + "?id=" + session_id + "&callsign=" + urllib.parse.quote_plus(
|
||||
callinfo.Callinfo.get_homecall(call)) + "&prg=" + HAMQTH_PRG,
|
||||
headers=HTTP_HEADERS).content
|
||||
data = xmltodict.parse(lookup_data)["HamQTH"]["search"]
|
||||
self._hamqth_callsign_data_cache.add(call, data, expire=604800) # 1 week in seconds
|
||||
return data
|
||||
except (KeyError, ValueError):
|
||||
# HamQTH had no info for the call, that's OK. Cache a None so we don't try to look this up again
|
||||
self._hamqth_callsign_data_cache.add(call, None, expire=604800) # 1 week in seconds
|
||||
return None
|
||||
|
||||
else:
|
||||
logging.warning("HamQTH login details incorrect, failed to look up with HamQTH.")
|
||||
except:
|
||||
return None
|
||||
except Exception:
|
||||
logging.error("Exception when getting HamQTH session ID")
|
||||
return None
|
||||
|
||||
if not session_id:
|
||||
return None
|
||||
|
||||
# Try the call as given, then fall back to the base call (strips /P, /M etc.)
|
||||
calls_to_try = [call]
|
||||
home_call = callinfo.Callinfo.get_homecall(call)
|
||||
if home_call != call:
|
||||
calls_to_try.append(home_call)
|
||||
|
||||
for lookup_call in calls_to_try:
|
||||
try:
|
||||
lookup_data = SEMI_STATIC_URL_DATA_CACHE.get(
|
||||
self._hamqth_base_url + "?id=" + session_id + "&callsign=" + urllib.parse.quote_plus(
|
||||
lookup_call) + "&prg=" + HAMQTH_PRG, headers=HTTP_HEADERS).content
|
||||
data = xmltodict.parse(lookup_data)["HamQTH"]["search"]
|
||||
self._hamqth_callsign_data_cache.add(call, data, expire=604800) # 1 week in seconds
|
||||
return data
|
||||
except (KeyError, ValueError):
|
||||
continue
|
||||
except Exception:
|
||||
logging.error("Exception when looking up HamQTH data")
|
||||
return None
|
||||
|
||||
# Not found in HamQTH; cache None so we don't keep retrying
|
||||
self._hamqth_callsign_data_cache.add(call, None, expire=604800) # 1 week in seconds
|
||||
return None
|
||||
|
||||
def _get_clublog_api_data_for_callsign(self, call):
|
||||
@@ -551,6 +617,7 @@ class LookupHelper:
|
||||
"""Shutdown method to close down any caches neatly."""
|
||||
|
||||
self._qrz_callsign_data_cache.close()
|
||||
self._hamqth_callsign_data_cache.close()
|
||||
self._clublog_callsign_data_cache.close()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user