Add data providers for most other programmes

This commit is contained in:
Ian Renton
2025-09-27 10:00:12 +01:00
parent 7bdf6cf203
commit 27a61393cf
14 changed files with 439 additions and 102 deletions

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@
/.venv /.venv
__pycache__ __pycache__
*.pyc *.pyc
/sota_summit_data_cache.sqlite

View File

@@ -5,6 +5,12 @@ SOFTWARE_NAME = "Metaspot by M0TRT"
SOFTWARE_VERSION = "0.1" SOFTWARE_VERSION = "0.1"
SERVER_OWNER_CALLSIGN = "M0TRT" SERVER_OWNER_CALLSIGN = "M0TRT"
# Modes
CW_MODES = ["CW"]
PHONE_MODES = ["PHONE", "SSB", "USB", "LSB", "AM", "FM", "DV", "DMR", "DSTAR", "C4FM", "M17"]
DATA_MODES = ["DIGI", "DATA", "FT8", "FT4", "RTTY", "SSTV", "JS8", "HELL", "BPSK", "PSK", "BPSK31", "OLIVIA"]
ALL_MODES = CW_MODES + PHONE_MODES + DATA_MODES
# Band definitions # Band definitions
BANDS = [ BANDS = [
Band(name="160m", start_freq=1800, end_freq=2000, color="#7cfc00", contrast_color="black"), Band(name="160m", start_freq=1800, end_freq=2000, color="#7cfc00", contrast_color="black"),

View File

@@ -1,4 +1,4 @@
from core.constants import BANDS, UNKNOWN_BAND from core.constants import BANDS, UNKNOWN_BAND, CW_MODES, PHONE_MODES, DATA_MODES, ALL_MODES
from pyhamtools import LookupLib, Callinfo from pyhamtools import LookupLib, Callinfo
# Static lookup helpers from pyhamtools # Static lookup helpers from pyhamtools
@@ -6,14 +6,24 @@ from pyhamtools import LookupLib, Callinfo
lookuplib = LookupLib(lookuptype="countryfile") lookuplib = LookupLib(lookuptype="countryfile")
callinfo = Callinfo(lookuplib) callinfo = Callinfo(lookuplib)
# Infer a mode from the comment
def infer_mode_from_comment(comment):
for mode in ALL_MODES:
if mode in comment.upper():
return mode
return None
# Infer a "mode family" from a mode. # Infer a "mode family" from a mode.
def infer_mode_family_from_mode(mode): def infer_mode_family_from_mode(mode):
if mode.upper() == "CW": if mode.upper() in CW_MODES:
return "CW" return "CW"
elif mode.upper() in ["PHONE", "SSB", "USB", "LSB", "AM", "FM", "DMR", "DSTAR", "C4FM", "M17"]: elif mode.upper() in PHONE_MODES:
return "PHONE" return "PHONE"
elif mode.upper() in DATA_MODES:
return "DATA"
else: else:
return "DIGI" print("Found an unrecognised mode: " + mode + ". Developer should categorise this.")
return None
# Infer a band from a frequency in kHz # Infer a band from a frequency in kHz
def infer_band_from_freq(freq): def infer_band_from_freq(freq):

View File

@@ -1,11 +1,14 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
import pytz
from pyhamtools.locator import locator_to_latlong, latlong_to_locator from pyhamtools.locator import locator_to_latlong, latlong_to_locator
from core.constants import DXCC_FLAGS from core.constants import DXCC_FLAGS
from core.utils import infer_mode_family_from_mode, infer_band_from_freq, infer_continent_from_callsign, \ from core.utils import infer_mode_family_from_mode, infer_band_from_freq, infer_continent_from_callsign, \
infer_country_from_callsign, infer_cq_zone_from_callsign, infer_itu_zone_from_callsign, infer_dxcc_id_from_callsign infer_country_from_callsign, infer_cq_zone_from_callsign, infer_itu_zone_from_callsign, infer_dxcc_id_from_callsign, \
infer_mode_from_comment
# Data class that defines a spot. # Data class that defines a spot.
@dataclass @dataclass
@@ -14,6 +17,8 @@ class Spot:
dx_call: str = None dx_call: str = None
# Callsign of the operator that has spotted them # Callsign of the operator that has spotted them
de_call: str = None de_call: str = None
# Name of the operator that has been spotted
dx_name: str = None
# Country of the DX operator # Country of the DX operator
dx_country: str = None dx_country: str = None
# Country of the spotter # Country of the spotter
@@ -48,6 +53,9 @@ class Spot:
band_contrast_color: str = None band_contrast_color: str = None
# Time of the spot # Time of the spot
time: datetime = None time: datetime = None
# Time that this software received the spot. This is used with the "since" call to our API to receive all data that
# is new to us, even if by a quirk of the API it might be older than the list time the client polled the API.
received_time: datetime = datetime.now(pytz.UTC),
# Comment left by the spotter, if any # Comment left by the spotter, if any
comment: str = None comment: str = None
# Special Interest Group (SIG), e.g. outdoor activity programme such as POTA # Special Interest Group (SIG), e.g. outdoor activity programme such as POTA
@@ -56,6 +64,8 @@ class Spot:
sig_refs: list = None sig_refs: list = None
# SIG reference names # SIG reference names
sig_refs_names: list = None sig_refs_names: list = None
# Activation score. SOTA only
activation_score: int = None
# Maidenhead grid locator for the spot. This could be from a geographical reference e.g. POTA, or just from the country # Maidenhead grid locator for the spot. This could be from a geographical reference e.g. POTA, or just from the country
grid: str = None grid: str = None
# Latitude & longitude, in degrees. This could be from a geographical reference e.g. POTA, or just from the country # Latitude & longitude, in degrees. This could be from a geographical reference e.g. POTA, or just from the country
@@ -70,6 +80,7 @@ class Spot:
# Infer missing parameters where possible # Infer missing parameters where possible
def infer_missing(self): def infer_missing(self):
# DX country, continent, zones etc. from callsign
if self.dx_call and not self.dx_country: if self.dx_call and not self.dx_country:
self.dx_country = infer_country_from_callsign(self.dx_call) self.dx_country = infer_country_from_callsign(self.dx_call)
if self.dx_call and not self.dx_continent: if self.dx_call and not self.dx_continent:
@@ -83,6 +94,7 @@ class Spot:
if self.dx_dxcc_id and not self.dx_flag: if self.dx_dxcc_id and not self.dx_flag:
self.dx_flag = DXCC_FLAGS[self.dx_dxcc_id] self.dx_flag = DXCC_FLAGS[self.dx_dxcc_id]
# Spotter country, continent, zones etc. from callsign
if self.de_call and not self.de_country: if self.de_call and not self.de_country:
self.de_country = infer_country_from_callsign(self.de_call) self.de_country = infer_country_from_callsign(self.de_call)
if self.de_call and not self.de_continent: if self.de_call and not self.de_continent:
@@ -92,22 +104,31 @@ class Spot:
if self.de_dxcc_id and not self.de_flag: if self.de_dxcc_id and not self.de_flag:
self.de_flag = DXCC_FLAGS[self.de_dxcc_id] self.de_flag = DXCC_FLAGS[self.de_dxcc_id]
# Band from frequency
if self.freq and not self.band: if self.freq and not self.band:
band = infer_band_from_freq(self.freq) band = infer_band_from_freq(self.freq)
self.band = band.name self.band = band.name
self.band_color = band.color self.band_color = band.color
self.band_contrast_color = band.contrast_color self.band_contrast_color = band.contrast_color
# Mode from comments, mode family from mode
if self.comment and not self.mode:
self.mode=infer_mode_from_comment(self.comment)
if self.mode and not self.mode_family: if self.mode and not self.mode_family:
self.mode_family=infer_mode_family_from_mode(self.mode) self.mode_family=infer_mode_family_from_mode(self.mode)
# Grid to lat/lon and vice versa
if self.grid and not self.latitude: if self.grid and not self.latitude:
ll = locator_to_latlong(self.grid) ll = locator_to_latlong(self.grid)
self.latitude = ll[0] self.latitude = ll[0]
self.longitude = ll[1] self.longitude = ll[1]
if self.latitude and self.longitude and not self.grid: if self.latitude and self.longitude and not self.grid:
self.grid = latlong_to_locator(self.latitude, self.longitude, 8) self.grid = latlong_to_locator(self.latitude, self.longitude, 8)
# TODO use QRZ provider to get grids, lat Lon, DX name # QRT comment detection
# TODO lat/lon from DXCC centre? if self.comment and not self.qrt:
self.qrt = "QRT" in self.comment.upper()
# TODO use QRZ/HamQTH provider to get grids, lat Lon, when missing; and DX name
# credentials in config file which is .gitignored; sample provided
# TODO lat/lon from DXCC centre as last resort?

29
main.py
View File

@@ -1,14 +1,17 @@
# Main script # Main script
import signal import signal
from time import sleep
from providers.dxcluster import DXCluster from providers.dxcluster import DXCluster
from providers.gma import GMA
from providers.pota import POTA from providers.pota import POTA
from providers.sota import SOTA
from providers.wwbota import WWBOTA
from providers.wwff import WWFF
# Shutdown function # Shutdown function
def shutdown(sig, frame): def shutdown(sig, frame):
# Start data providers print("Stopping program, this may take a few seconds...")
for p in providers: p.stop() for p in providers: p.stop()
@@ -20,8 +23,18 @@ if __name__ == '__main__':
# Create providers # Create providers
providers = [ providers = [
POTA(), POTA(),
DXCluster("hrd.wa9pie.net", 8000) SOTA(),
] # todo all other providers WWFF(),
WWBOTA(),
GMA(),
# todo HEMA
# todo PNP
# todo RBN
# todo packet?
# todo APRS?
DXCluster("hrd.wa9pie.net", 8000),
# DXCluster("dxc.w3lpl.net", 22)
]
# Set up spot list # Set up spot list
spot_list = [] spot_list = []
# Set up data providers # Set up data providers
@@ -32,12 +45,16 @@ if __name__ == '__main__':
# todo thread to clear spot list of old data # todo thread to clear spot list of old data
# Todo serve spot API # Todo serve spot API
# Todo spot API arguments e.g. "since" based on received_time of spots, sig only, dx cont, dxcc, de cont, band, mode, filter out qrt, filter pre-qsy
# Todo serve status API # Todo serve status API
# Todo serve apidocs # Todo serve apidocs
# Todo serve website # Todo serve website
# TODO NOTES FOR NGINX REVERSE PROXY
# local cache time of 15 sec to avoid over burdening python?
# NOTES FOR FIELD SPOTTER # TODO NOTES FOR FIELD SPOTTER
# Still need to de-dupe spots # Still need to de-dupe spots
# Still need to do QSY checking # Still need to do QSY checking in FS because we can enable/disable showing them and don't want to re-query the API.
# Filter comments, still do in FS or move that here?

View File

@@ -1,19 +1,23 @@
import logging
import re
from datetime import datetime, timezone from datetime import datetime, timezone
from threading import Thread from threading import Thread
from time import sleep
import pytz import pytz
import telnetlib3
from core.constants import SERVER_OWNER_CALLSIGN from core.constants import SERVER_OWNER_CALLSIGN
from data.spot import Spot from data.spot import Spot
from providers.provider import Provider from providers.provider import Provider
import telnetlib3
import re
callsign_pattern = "([a-z|0-9|/]+)"
frequency_pattern = "([0-9|.]+)"
pattern = re.compile("^DX de "+callsign_pattern+":\\s+"+frequency_pattern+"\\s+"+callsign_pattern+"\\s+(.*)\\s+(\\d{4}Z)", re.IGNORECASE)
# Provider for a DX Cluster. Hostname and port provided as parameters.
class DXCluster(Provider): class DXCluster(Provider):
CALLSIGN_PATTERN = "([a-z|0-9|/]+)"
FREQUENCY_PATTERM = "([0-9|.]+)"
LINE_PATTERN = re.compile(
"^DX de " + CALLSIGN_PATTERN + ":\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)",
re.IGNORECASE)
# Constructor requires hostname and port # Constructor requires hostname and port
def __init__(self, hostname, port): def __init__(self, hostname, port):
@@ -25,7 +29,7 @@ class DXCluster(Provider):
self.run = True self.run = True
def name(self): def name(self):
return "DX Cluster " + self.hostname + " " + str(self.port) return "DX Cluster " + self.hostname
def start(self): def start(self):
self.thread = Thread(target=self.handle) self.thread = Thread(target=self.handle)
@@ -37,29 +41,50 @@ class DXCluster(Provider):
self.thread.join() self.thread.join()
def handle(self): def handle(self):
self.status = "Connecting"
self.telnet = telnetlib3.Telnet(self.hostname, self.port)
self.telnet.read_until("login: ".encode("ascii"))
self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("ascii"))
self.status = "Waiting for Data"
while self.run: while self.run:
# Check new telnet info against regular expression connected = False
telnet_output = self.telnet.read_until("\n".encode("ascii")) while not connected and self.run:
match = pattern.match(telnet_output.decode("ascii")) try:
if match: self.status = "Connecting"
spot_time = datetime.strptime(match.group(5), "%H%MZ") self.telnet = telnetlib3.Telnet(self.hostname, self.port)
spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC) self.telnet.read_until("login: ".encode("ascii"))
spot = Spot(source=self.name(), self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("ascii"))
dx_call=match.group(3), connected = True
de_call=match.group(1), except Exception as e:
freq=float(match.group(2)), self.status = "Error"
comment=match.group(4).strip(), logging.exception("Exception while connecting to DX Cluster Provider (" + self.hostname + ").")
time=spot_datetime) sleep(5)
# Fill in any blanks
spot.infer_missing()
# Add to our list
self.submit([spot])
self.status = "OK" self.status = "Waiting for Data"
self.last_update_time = datetime.now(timezone.utc) while connected and self.run:
try:
# Check new telnet info against regular expression
telnet_output = self.telnet.read_until("\n".encode("ascii"))
match = self.LINE_PATTERN.match(telnet_output.decode("ascii"))
if match:
spot_time = datetime.strptime(match.group(5), "%H%MZ")
spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC)
spot = Spot(source=self.name(),
dx_call=match.group(3),
de_call=match.group(1),
freq=float(match.group(2)),
comment=match.group(4).strip(),
time=spot_datetime)
# Fill in any blanks
spot.infer_missing()
# Add to our list
self.submit([spot])
self.status = "OK"
self.last_update_time = datetime.now(timezone.utc)
except Exception as e:
connected = False
if self.run:
self.status = "Error"
logging.exception("Exception in DX Cluster Provider (" + self.hostname + ")")
sleep(5)
else:
self.status = "Shutting down"
self.status = "Disconnected"

51
providers/gma.py Normal file
View File

@@ -0,0 +1,51 @@
from datetime import datetime, timedelta
import pytz
from requests_cache import CachedSession
from data.spot import Spot
from providers.http_provider import HTTPProvider
# Provider for General Mountain Activity
class GMA(HTTPProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://www.cqgma.org/api/spots/25/"
# GMA spots don't contain the details of the programme they are for, we need a separate lookup for that
REF_INFO_URL_ROOT = "https://www.cqgma.org/api/ref/?"
REF_INFO_CACHE_TIME_DAYS = 30
REF_INFO_CACHE = CachedSession("gma_ref_info_cache", expire_after=timedelta(days=REF_INFO_CACHE_TIME_DAYS))
def __init__(self):
super().__init__(self.SPOTS_URL, self.POLL_INTERVAL_SEC)
def name(self):
return "GMA"
def http_response_to_spots(self, http_response):
new_spots = []
# Iterate through source data
for source_spot in http_response.json()["RCD"]:
# Convert to our spot format
spot = Spot(source=self.name(),
dx_call=source_spot["ACTIVATOR"].upper(),
de_call=source_spot["SPOTTER"].upper(),
freq=float(source_spot["QRG"]),
mode=source_spot["MODE"].upper(),
comment=source_spot["TEXT"],
sig_refs=[source_spot["REF"]],
sig_refs_names=[source_spot["NAME"]],
time=datetime.strptime(source_spot["DATE"] + source_spot["TIME"], "%Y%m%d%H%M").replace(tzinfo=pytz.UTC),
latitude=float(source_spot["LAT"]),
longitude=float(source_spot["LON"]))
# GMA doesn't give what programme (SIG) the reference is for until we separately look it up.
ref_info = self.REF_INFO_CACHE.get(self.REF_INFO_URL_ROOT + source_spot["REF"], headers=self.HTTP_HEADERS).json()
spot.sig = ref_info["reftype"]
# Fill in any missing data
spot.infer_missing()
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots

View File

@@ -0,0 +1,59 @@
import logging
from datetime import datetime, timezone
from threading import Timer, Thread
from time import sleep
import requests
from providers.provider import Provider
# Generic data provider class for providers that request data via HTTP(S). Just for convenience to avoid code
# duplication. Subclasses of this query the individual APIs for data.
class HTTPProvider(Provider):
def __init__(self, url, poll_interval):
super().__init__()
self.url = url
self.poll_interval = poll_interval
self.poll_timer = None
def name(self):
raise NotImplementedError("Subclasses must implement this method")
def start(self):
# Fire off a one-shot thread to run poll() for the first time, just to ensure start() returns immediately and
# the application can continue starting. The thread itself will then die, and the timer will kick in on its own
# thread.
thread = Thread(target=self.poll)
thread.start()
def stop(self):
self.poll_timer.cancel()
def poll(self):
try:
# Request data from API
http_response = requests.get(self.url, headers=self.HTTP_HEADERS)
# Pass off to the subclass for processing
new_spots = self.http_response_to_spots(http_response)
# Submit the new spots for processing. There might not be any spots for the less popular programs.
if new_spots:
self.submit(new_spots)
self.status = "OK"
self.last_update_time = datetime.now(timezone.utc)
except Exception as e:
self.status = "Error"
logging.exception("Exception in HTTP JSON Provider (" + self.name() + ")")
sleep(1)
self.poll_timer = Timer(self.poll_interval, self.poll)
self.poll_timer.start()
# Convert an HTTP response returned by the API into spot data. The whole response is provided here so the subclass
# implementations can check for HTTP status codes if necessary, and handle the response as JSON, XML, text, whatever
# the API actually provides.
def http_response_to_spots(self, http_response):
raise NotImplementedError("Subclasses must implement this method")

View File

@@ -1,65 +1,44 @@
from datetime import datetime, timezone from datetime import datetime
import pytz
from data.spot import Spot
from providers.provider import Provider
from threading import Timer
import requests
class POTA(Provider): import pytz
from data.spot import Spot
from providers.http_provider import HTTPProvider
# Provider for Parks on the Air
class POTA(HTTPProvider):
POLL_INTERVAL_SEC = 120 POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://api.pota.app/spot/activator" SPOTS_URL = "https://api.pota.app/spot/activator"
def __init__(self): def __init__(self):
super().__init__() super().__init__(self.SPOTS_URL, self.POLL_INTERVAL_SEC)
self.poll_timer = None
def name(self): def name(self):
return "POTA" return "POTA"
def start(self): def http_response_to_spots(self, http_response):
self.poll() new_spots = []
# Iterate through source data
def stop(self): for source_spot in http_response.json():
self.poll_timer.cancel() # Convert to our spot format
spot = Spot(source=self.name(),
def poll(self): source_id=source_spot["spotId"],
try: dx_call=source_spot["activator"].upper(),
# Request data from API de_call=source_spot["spotter"].upper(),
source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS).json() freq=float(source_spot["frequency"]),
# Build a list of spots we haven't seen before mode=source_spot["mode"].upper(),
new_spots = [] comment=source_spot["comments"],
# Iterate through source data sig="POTA",
for source_spot in source_data: sig_refs=[source_spot["reference"]],
# Convert to our spot format sig_refs_names=[source_spot["name"]],
spot = Spot(source=self.name(), time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=pytz.UTC),
source_id=source_spot["spotId"], grid=source_spot["grid6"],
dx_call=source_spot["activator"], latitude=source_spot["latitude"],
de_call=source_spot["spotter"], longitude=source_spot["longitude"])
freq=float(source_spot["frequency"]), # Fill in any missing data
mode=source_spot["mode"], spot.infer_missing()
comment=source_spot["comments"], # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
sig="POTA", # that for us.
sig_refs=[source_spot["reference"]], new_spots.append(spot)
sig_refs_names=[source_spot["name"]], return new_spots
time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=pytz.UTC),
grid=source_spot["grid6"],
latitude=source_spot["latitude"],
longitude=source_spot["longitude"],
qrt="QRT" in source_spot["comments"].upper())
# Fill in any blanks
spot.infer_missing()
# Add to our list
new_spots.append(spot)
# Submit the new spots for processing
self.submit(new_spots)
self.status = "OK"
self.last_update_time = datetime.now(timezone.utc)
except requests.exceptions.RequestException as e:
self.status = "Error"
self.poll_timer = Timer(self.POLL_INTERVAL_SEC, self.poll)
self.poll_timer.start()

View File

@@ -1,7 +1,10 @@
from datetime import datetime from datetime import datetime
import pytz import pytz
from core.constants import SOFTWARE_NAME, SOFTWARE_VERSION from core.constants import SOFTWARE_NAME, SOFTWARE_VERSION
# Generic data provider class. Subclasses of this query the individual APIs for data. # Generic data provider class. Subclasses of this query the individual APIs for data.
class Provider: class Provider:

68
providers/sota.py Normal file
View File

@@ -0,0 +1,68 @@
from datetime import datetime, timedelta
import requests
from requests_cache import CachedSession
from data.spot import Spot
from providers.http_provider import HTTPProvider
# Provider for Summits on the Air
class SOTA(HTTPProvider):
POLL_INTERVAL_SEC = 120
# SOTA wants us to check for an "epoch" from the API and see if it's actually changed before querying the main data
# APIs. So it's actually the EPOCH_URL that we pass into the constructor and get the superclass to call on a timer.
# The actual data lookup all happens after parsing and checking the epoch.
EPOCH_URL = "https://api-db2.sota.org.uk/api/spots/epoch"
SPOTS_URL = "https://api-db2.sota.org.uk/api/spots/60/all/all"
# SOTA spots don't contain lat/lon, we need a separate lookup for that
SUMMIT_URL_ROOT = "https://api-db2.sota.org.uk/api/summits/"
SUMMIT_DATA_CACHE_TIME_DAYS = 30
SUMMIT_DATA_CACHE = CachedSession("sota_summit_data_cache", expire_after=timedelta(days=SUMMIT_DATA_CACHE_TIME_DAYS))
def __init__(self):
super().__init__(self.EPOCH_URL, self.POLL_INTERVAL_SEC)
self.api_epoch = ""
def name(self):
return "SOTA"
def http_response_to_spots(self, http_response):
# OK, source data is actually just the epoch at this point. We'll then go on to fetch real data if we know this
# has changed.
epoch_changed = http_response.text != self.api_epoch
self.api_epoch = http_response.text
new_spots = []
# OK, if the epoch actually changed, now we make the real request for data.
if epoch_changed:
source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS).json()
# Iterate through source data
for source_spot in source_data:
# Convert to our spot format
spot = Spot(source=self.name(),
source_id=source_spot["id"],
dx_call=source_spot["activatorCallsign"].upper(),
dx_name=source_spot["activatorName"],
de_call=source_spot["callsign"].upper(),
freq=(float(source_spot["frequency"]) * 1000) if (source_spot["frequency"] is not None) else None, # Seen SOTA spots with no frequency!
mode=source_spot["mode"].upper(),
comment=source_spot["comments"],
sig="SOTA",
sig_refs=[source_spot["summitCode"]],
sig_refs_names=[source_spot["summitName"]],
time=datetime.fromisoformat(source_spot["timeStamp"]),
activation_score=source_spot["points"])
# SOTA doesn't give summit lat/lon/grid in the main call, so we need another separate call for this
summit_data = self.SUMMIT_DATA_CACHE.get(self.SUMMIT_URL_ROOT + source_spot["summitCode"], headers=self.HTTP_HEADERS).json()
spot.grid = summit_data["locator"]
spot.latitude = summit_data["latitude"]
spot.longitude = summit_data["longitude"]
# Fill in any missing data
spot.infer_missing()
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots

51
providers/wwbota.py Normal file
View File

@@ -0,0 +1,51 @@
from datetime import datetime
from data.spot import Spot
from providers.http_provider import HTTPProvider
# Provider for Worldwide Bunkers on the Air
class WWBOTA(HTTPProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://api.wwbota.org/spots/"
def __init__(self):
super().__init__(self.SPOTS_URL, self.POLL_INTERVAL_SEC)
def name(self):
return "WWBOTA"
def http_response_to_spots(self, http_response):
new_spots = []
# Iterate through source data
for source_spot in http_response.json():
# Convert to our spot format. First we unpack references, because WWBOTA spots can have more than one for
# n-fer activations.
refs = []
ref_names = []
for ref in source_spot["references"]:
refs.append(ref["reference"])
ref_names.append(ref["name"])
spot = Spot(source=self.name(),
dx_call=source_spot["call"].upper(),
de_call=source_spot["spotter"].upper(),
freq=float(source_spot["freq"]) * 1000, # MHz to kHz
mode=source_spot["mode"].upper(),
comment=source_spot["comment"],
sig="WWBOTA",
sig_refs=refs,
sig_refs_names=ref_names,
time=datetime.fromisoformat(source_spot["time"]),
# WWBOTA spots can contain multiple references for bunkers being activated simultaneously. For
# now, we will just pick the first one to use as our grid, latitude and longitude.
grid=source_spot["references"][0]["locator"],
latitude=source_spot["references"][0]["lat"],
longitude=source_spot["references"][0]["long"],
qrt=source_spot["type"] == "QRT")
# Fill in any missing data
spot.infer_missing()
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us. But WWBOTA does support a special "Test" spot type, we need to avoid adding that.
if source_spot["type"] != "Test":
new_spots.append(spot)
return new_spots

43
providers/wwff.py Normal file
View File

@@ -0,0 +1,43 @@
from datetime import datetime
import pytz
from data.spot import Spot
from providers.http_provider import HTTPProvider
# Provider for Worldwide Flora & Fauna
class WWFF(HTTPProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://spots.wwff.co/static/spots.json"
def __init__(self):
super().__init__(self.SPOTS_URL, self.POLL_INTERVAL_SEC)
def name(self):
return "WWFF"
def http_response_to_spots(self, http_response):
new_spots = []
# Iterate through source data
for source_spot in http_response.json():
# Convert to our spot format
spot = Spot(source=self.name(),
source_id=source_spot["id"],
dx_call=source_spot["activator"].upper(),
de_call=source_spot["spotter"].upper(),
freq=float(source_spot["frequency_khz"]),
mode=source_spot["mode"].upper(),
comment=source_spot["remarks"],
sig="WWFF",
sig_refs=[source_spot["reference"]],
sig_refs_names=[source_spot["reference_name"]],
time=datetime.fromtimestamp(source_spot["spot_time"]).replace(tzinfo=pytz.UTC),
latitude=source_spot["latitude"],
longitude=source_spot["longitude"])
# Fill in any missing data
spot.infer_missing()
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots

View File

@@ -1,2 +1,5 @@
pytz requests-cache~=1.2.1
pyhamtools~=0.12.0
telnetlib3~=2.0.8
pytz~=2025.2
requests~=2.32.5 requests~=2.32.5