Download cty.plist separately so errors can be better handled

This commit is contained in:
Ian Renton
2025-10-13 19:32:29 +01:00
parent a5a4981b7e
commit dbeebe32f3
11 changed files with 53 additions and 24 deletions

View File

@@ -9,9 +9,6 @@ from core.constants import SOFTWARE_NAME, SOFTWARE_VERSION
# Generic alert provider class. Subclasses of this query the individual APIs for alerts.
class AlertProvider:
# HTTP headers used for spot providers that use HTTP
HTTP_HEADERS = { "User-Agent": SOFTWARE_NAME + " " + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")" }
# Constructor
def __init__(self, provider_config):
self.name = provider_config["name"]

View File

@@ -7,6 +7,7 @@ import pytz
import requests
from alertproviders.alert_provider import AlertProvider
from core.constants import HTTP_HEADERS
# Generic alert provider class for providers that request data via HTTP(S). Just for convenience to avoid code
@@ -35,7 +36,7 @@ class HTTPAlertProvider(AlertProvider):
try:
# Request data from API
logging.debug("Polling " + self.name + " alert API...")
http_response = requests.get(self.url, headers=self.HTTP_HEADERS)
http_response = requests.get(self.url, headers=HTTP_HEADERS)
# Pass off to the subclass for processing
new_alerts = self.http_response_to_alerts(http_response)
# Submit the new alerts for processing. There might not be any alerts for the less popular programs.

View File

@@ -4,6 +4,9 @@ from data.band import Band
SOFTWARE_NAME = "Spothole by M0TRT"
SOFTWARE_VERSION = "0.1"
# HTTP headers used for spot providers that use HTTP
HTTP_HEADERS = {"User-Agent": SOFTWARE_NAME + " " + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")"}
# Special Interest Groups
SIGS = ["POTA", "SOTA", "WWFF", "GMA", "WWBOTA", "HEMA", "MOTA", "ARLHS", "SiOTA", "WCA"]

View File

@@ -1,24 +1,44 @@
import gzip
import logging
import urllib.request
from datetime import timedelta
from diskcache import Cache
from pyhamtools import LookupLib, Callinfo
from pyhamtools.exceptions import APIKeyMissingError
from pyhamtools.frequency import freq_to_band
from pyhamtools.locator import latlong_to_locator
from requests_cache import CachedSession
from core.config import config
from core.constants import BANDS, UNKNOWN_BAND, CW_MODES, PHONE_MODES, DATA_MODES, ALL_MODES, QRZCQ_CALLSIGN_LOOKUP_DATA
from core.constants import BANDS, UNKNOWN_BAND, CW_MODES, PHONE_MODES, DATA_MODES, ALL_MODES, \
QRZCQ_CALLSIGN_LOOKUP_DATA, HTTP_HEADERS
CLUBLOG_API_KEY = config["clublog-api-key"]
# Download the cty.plist file from country-files.com on first startup. The pyhamtools lib can actually download and use
# this itself, but it's occasionally offline which causes it to throw an error. By downloading it separately, we can
# catch errors and handle them, falling back to a previous copy of the file in the cache, and we can use the
# requests_cache library to prevent re-downloading too quickly if the software keeps restarting.
def download_country_files_cty_plist():
try:
response = COUNTRY_FILES_CTY_PLIST_CACHE.get("https://www.country-files.com/cty/cty.plist",
headers=HTTP_HEADERS).text
# write to file
with open(COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION, "w") as f:
f.write(response)
return True
except Exception as e:
logging.error("Exception when downloading Clublog cty.xml", e)
return False
# Download the cty.xml (gzipped) file from Clublog on first startup, so we can use it in preference to querying the
# database live if possible.
def download_clublog_ctyxml():
try:
# Read the file inside the .gz archive located at url
with urllib.request.urlopen("https://cdn.clublog.org/cty.php?api=" + CLUBLOG_API_KEY) as response:
response = CLUBLOG_CTY_XML_CACHE.get("https://cdn.clublog.org/cty.php?api=" + CLUBLOG_API_KEY,
headers=HTTP_HEADERS).raw
with gzip.GzipFile(fileobj=response) as uncompressed:
file_content = uncompressed.read()
@@ -35,12 +55,18 @@ def download_clublog_ctyxml():
# once on startup, and requires no login/key, but does not have the best coverage.
# If the user provides login details/API keys, we also set up helpers for QRZ.com, Clublog (live API request), and
# Clublog (XML download). The lookup functions iterate through these in a sensible order, looking for suitable data.
LOOKUP_LIB_BASIC = LookupLib(lookuptype="countryfile")
COUNTRY_FILES_CTY_PLIST_CACHE = CachedSession("cache/country_files_city_plist_cache", expire_after=timedelta(days=10))
COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION = "cache/cty.plist"
LOOKUP_LIB_BASIC = LookupLib(lookuptype="countryfile", filename=COUNTRY_FILES_CTY_PLIST_DOWNLOAD_LOCATION)
CALL_INFO_BASIC = Callinfo(LOOKUP_LIB_BASIC)
QRZ_AVAILABLE = config["qrz-password"] != ""
if QRZ_AVAILABLE:
LOOKUP_LIB_QRZ = LookupLib(lookuptype="qrz", username=config["qrz-username"], pwd=config["qrz-password"])
QRZ_CALLSIGN_DATA_CACHE = Cache('cache/qrz_callsign_lookup_cache')
CLUBLOG_API_KEY = config["clublog-api-key"]
CLUBLOG_CTY_XML_CACHE = CachedSession("cache/clublog_cty_xml_cache", expire_after=timedelta(days=10))
CLUBLOG_API_AVAILABLE = CLUBLOG_API_KEY != ""
CLUBLOG_XML_DOWNLOAD_LOCATION = "cache/cty.xml"
if CLUBLOG_API_AVAILABLE:

View File

@@ -4,6 +4,7 @@ from datetime import datetime, timedelta
import pytz
from requests_cache import CachedSession
from core.constants import HTTP_HEADERS
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
@@ -43,7 +44,7 @@ class GMA(HTTPSpotProvider):
# GMA doesn't give what programme (SIG) the reference is for until we separately look it up.
ref_response = self.REF_INFO_CACHE.get(self.REF_INFO_URL_ROOT + source_spot["REF"],
headers=self.HTTP_HEADERS)
headers=HTTP_HEADERS)
# Sometimes this is blank, so handle that
if ref_response.text is not None and ref_response.text != "":
ref_info = ref_response.json()

View File

@@ -1,10 +1,10 @@
import re
from datetime import datetime, timedelta
from datetime import datetime
import pytz
import requests
from requests_cache import CachedSession
from core.constants import HTTP_HEADERS
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
@@ -33,7 +33,7 @@ class HEMA(HTTPSpotProvider):
new_spots = []
# OK, if the spot seed actually changed, now we make the real request for data.
if spot_seed_changed:
source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS)
source_data = requests.get(self.SPOTS_URL, headers=HTTP_HEADERS)
source_data_items = source_data.text.split("=")
# Iterate through source data items.
for source_spot in source_data_items:

View File

@@ -6,6 +6,7 @@ from time import sleep
import pytz
import requests
from core.constants import HTTP_HEADERS
from spotproviders.spot_provider import SpotProvider
@@ -35,7 +36,7 @@ class HTTPSpotProvider(SpotProvider):
try:
# Request data from API
logging.debug("Polling " + self.name + " spot API...")
http_response = requests.get(self.url, headers=self.HTTP_HEADERS)
http_response = requests.get(self.url, headers=HTTP_HEADERS)
# Pass off to the subclass for processing
new_spots = self.http_response_to_spots(http_response)
# Submit the new spots for processing. There might not be any spots for the less popular programs.

View File

@@ -5,6 +5,7 @@ from datetime import datetime, timedelta
import pytz
from requests_cache import CachedSession
from core.constants import HTTP_HEADERS
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
@@ -58,7 +59,7 @@ class ParksNPeaks(HTTPSpotProvider):
# SiOTA lat/lon/grid lookup
if spot.sig == "SiOTA":
siota_csv_data = self.SIOTA_LIST_CACHE.get(self.SIOTA_LIST_URL, headers=self.HTTP_HEADERS)
siota_csv_data = self.SIOTA_LIST_CACHE.get(self.SIOTA_LIST_URL, headers=HTTP_HEADERS)
siota_dr = csv.DictReader(siota_csv_data.content.decode().splitlines())
for row in siota_dr:
if row["SILO_CODE"] == spot.sig_refs[0]:
@@ -69,7 +70,7 @@ class ParksNPeaks(HTTPSpotProvider):
# ZLOTA name/lat/lon lookup
if spot.sig == "ZLOTA":
zlota_data = self.ZLOTA_LIST_CACHE.get(self.ZLOTA_LIST_URL, headers=self.HTTP_HEADERS).json()
zlota_data = self.ZLOTA_LIST_CACHE.get(self.ZLOTA_LIST_URL, headers=HTTP_HEADERS).json()
for asset in zlota_data:
if asset["code"] == spot.sig_refs[0]:
spot.sig_refs_names = [asset["name"]]

View File

@@ -4,6 +4,7 @@ from datetime import datetime, timedelta
import requests
from requests_cache import CachedSession
from core.constants import HTTP_HEADERS
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
@@ -34,7 +35,7 @@ class SOTA(HTTPSpotProvider):
new_spots = []
# OK, if the epoch actually changed, now we make the real request for data.
if epoch_changed:
source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS).json()
source_data = requests.get(self.SPOTS_URL, headers=HTTP_HEADERS).json()
# Iterate through source data
for source_spot in source_data:
# Convert to our spot format
@@ -55,7 +56,7 @@ class SOTA(HTTPSpotProvider):
# SOTA doesn't give summit lat/lon/grid in the main call, so we need another separate call for this
try:
summit_response = self.SUMMIT_DATA_CACHE.get(self.SUMMIT_URL_ROOT + source_spot["summitCode"], headers=self.HTTP_HEADERS)
summit_response = self.SUMMIT_DATA_CACHE.get(self.SUMMIT_URL_ROOT + source_spot["summitCode"], headers=HTTP_HEADERS)
summit_data = summit_response.json()
spot.grid = summit_data["locator"]
spot.latitude = summit_data["latitude"]

View File

@@ -9,9 +9,6 @@ from core.config import SERVER_OWNER_CALLSIGN, MAX_SPOT_AGE
# Generic spot provider class. Subclasses of this query the individual APIs for data.
class SpotProvider:
# HTTP headers used for spot providers that use HTTP
HTTP_HEADERS = { "User-Agent": SOFTWARE_NAME + " " + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")" }
# Constructor
def __init__(self, provider_config):
self.name = provider_config["name"]

View File

@@ -6,6 +6,7 @@ from time import sleep
import pytz
from requests_sse import EventSource
from core.constants import HTTP_HEADERS
from spotproviders.spot_provider import SpotProvider
# Spot provider using Server-Sent Events.
@@ -37,7 +38,7 @@ class SSESpotProvider(SpotProvider):
while not self.stopped:
try:
logging.debug("Connecting to " + self.name + " spot API...")
with EventSource(self.url, headers=self.HTTP_HEADERS, latest_event_id=self.last_event_id, timeout=30) as event_source:
with EventSource(self.url, headers=HTTP_HEADERS, latest_event_id=self.last_event_id, timeout=30) as event_source:
self.event_source = event_source
for event in self.event_source:
if event.type == 'message':