Starting to implement alerts #17

This commit is contained in:
Ian Renton
2025-10-04 18:09:54 +01:00
parent 55893949b8
commit 74153a9d94
29 changed files with 552 additions and 109 deletions

58
spotproviders/aprsis.py Normal file
View File

@@ -0,0 +1,58 @@
import logging
from datetime import datetime, timezone
from threading import Thread
import aprslib
import pytz
from core.config import SERVER_OWNER_CALLSIGN
from data.spot import Spot
from spotproviders.spot_provider import SpotProvider
# Spot provider for the APRS-IS.
class APRSIS(SpotProvider):
def __init__(self, provider_config):
super().__init__(provider_config)
self.thread = Thread(target=self.connect)
self.thread.daemon = True
self.aprsis = None
def start(self):
self.thread.start()
def connect(self):
self.aprsis = aprslib.IS(SERVER_OWNER_CALLSIGN)
self.status = "Connecting"
logging.info("APRS-IS connecting...")
self.aprsis.connect()
self.aprsis.consumer(self.handle)
logging.info("APRS-IS connected.")
def stop(self):
self.status = "Shutting down"
self.aprsis.close()
self.thread.join()
def handle(self, data):
# Split SSID in "from" call and store separately
from_parts = data["from"].split("-")
dx_call = from_parts[0]
dx_aprs_ssid = from_parts[1] if len(from_parts) > 1 else None
spot = Spot(source="APRS-IS",
dx_call=dx_call,
dx_aprs_ssid=dx_aprs_ssid,
de_call=data["via"],
comment=data["comment"] if "comment" in data else None,
latitude=data["latitude"] if "latitude" in data else None,
longitude=data["longitude"] if "longitude" in data else None,
icon="tower-cell",
time=datetime.now(pytz.UTC).timestamp()) # APRS-IS spots are live so we can assume spot time is "now"
# Add to our list
self.submit(spot)
self.status = "OK"
self.last_update_time = datetime.now(timezone.utc)
logging.debug("Data received from APRS-IS.")

View File

@@ -0,0 +1,92 @@
import logging
import re
from datetime import datetime, timezone
from threading import Thread
from time import sleep
import pytz
import telnetlib3
from data.spot import Spot
from core.config import SERVER_OWNER_CALLSIGN
from spotproviders.spot_provider import SpotProvider
# Spot provider for a DX Cluster. Hostname and port provided as parameters.
class DXCluster(SpotProvider):
CALLSIGN_PATTERN = "([a-z|0-9|/]+)"
FREQUENCY_PATTERM = "([0-9|.]+)"
LINE_PATTERN = re.compile(
"^DX de " + CALLSIGN_PATTERN + ":\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)",
re.IGNORECASE)
# Constructor requires hostname and port
def __init__(self, provider_config):
super().__init__(provider_config)
self.hostname = provider_config["host"]
self.port = provider_config["port"]
self.telnet = None
self.thread = Thread(target=self.handle)
self.thread.daemon = True
self.run = True
def start(self):
self.thread.start()
def stop(self):
self.run = False
self.telnet.close()
self.thread.join()
def handle(self):
while self.run:
connected = False
while not connected and self.run:
try:
self.status = "Connecting"
logging.info("DX Cluster " + self.hostname + " connecting...")
self.telnet = telnetlib3.Telnet(self.hostname, self.port)
self.telnet.read_until("login: ".encode("latin-1"))
self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("latin-1"))
connected = True
logging.info("DX Cluster " + self.hostname + " connected.")
except Exception as e:
self.status = "Error"
logging.exception("Exception while connecting to DX Cluster Provider (" + self.hostname + ").")
sleep(5)
self.status = "Waiting for Data"
while connected and self.run:
try:
# Check new telnet info against regular expression
telnet_output = self.telnet.read_until("\n".encode("latin-1"))
match = self.LINE_PATTERN.match(telnet_output.decode("latin-1"))
if match:
spot_time = datetime.strptime(match.group(5), "%H%MZ")
spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC)
spot = Spot(source=self.name,
dx_call=match.group(3),
de_call=match.group(1),
freq=float(match.group(2)) * 1000,
comment=match.group(4).strip(),
icon="desktop",
time=spot_datetime.timestamp())
# Add to our list
self.submit(spot)
self.status = "OK"
self.last_update_time = datetime.now(timezone.utc)
logging.debug("Data received from DX Cluster " + self.hostname + ".")
except Exception as e:
connected = False
if self.run:
self.status = "Error"
logging.exception("Exception in DX Cluster Provider (" + self.hostname + ")")
sleep(5)
else:
logging.info("DX Cluster " + self.hostname + " shutting down...")
self.status = "Shutting down"
self.status = "Disconnected"

83
spotproviders/gma.py Normal file
View File

@@ -0,0 +1,83 @@
import logging
from datetime import datetime, timedelta
import pytz
from requests_cache import CachedSession
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
# Spot provider for General Mountain Activity
class GMA(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://www.cqgma.org/api/spots/25/"
# GMA spots don't contain the details of the programme they are for, we need a separate lookup for that
REF_INFO_URL_ROOT = "https://www.cqgma.org/api/ref/?"
REF_INFO_CACHE_TIME_DAYS = 30
REF_INFO_CACHE = CachedSession("gma_ref_info_cache", expire_after=timedelta(days=REF_INFO_CACHE_TIME_DAYS))
def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_spots(self, http_response):
new_spots = []
# Iterate through source data
for source_spot in http_response.json()["RCD"]:
# Convert to our spot format
spot = Spot(source=self.name,
dx_call=source_spot["ACTIVATOR"].upper(),
de_call=source_spot["SPOTTER"].upper(),
freq=float(source_spot["QRG"]) * 1000 if (source_spot["QRG"] != "") else None,
# Seen GMA spots with no frequency
mode=source_spot["MODE"].upper() if "<>" not in source_spot["MODE"] else None,
# Filter out some weird mode strings
comment=source_spot["TEXT"],
sig_refs=[source_spot["REF"]],
sig_refs_names=[source_spot["NAME"]],
time=datetime.strptime(source_spot["DATE"] + source_spot["TIME"], "%Y%m%d%H%M").replace(
tzinfo=pytz.UTC).timestamp(),
latitude=float(source_spot["LAT"]) if (source_spot["LAT"] != "") else None,
# Seen GMA spots with no lat/lon
longitude=float(source_spot["LON"]) if (source_spot["LON"] != "") else None)
# GMA doesn't give what programme (SIG) the reference is for until we separately look it up.
ref_response = self.REF_INFO_CACHE.get(self.REF_INFO_URL_ROOT + source_spot["REF"],
headers=self.HTTP_HEADERS)
# Sometimes this is blank, so handle that
if ref_response.text is not None and ref_response.text != "":
ref_info = ref_response.json()
# If this is POTA, SOTA or WWFF data we already have it through other means, so ignore. POTA and WWFF
# spots come through with reftype=POTA or reftype=WWFF. SOTA is harder to figure out because both SOTA
# and GMA summits come through with reftype=Summit, so we must check for the presence of a "sota" entry
# to determine if it's a SOTA summit.
if ref_info["reftype"] not in ["POTA", "WWFF"] and (ref_info["reftype"] != "Summit" or ref_info["sota"] == ""):
match ref_info["reftype"]:
case "Summit":
spot.sig = "GMA"
spot.icon = "mountain"
case "IOTA Island":
spot.sig = "IOTA"
spot.icon = "umbrella-beach"
case "Lighthouse (ILLW)":
spot.sig = "ILLW"
spot.icon = "tower-observation"
case "Lighthouse (ARLHS)":
spot.sig = "ARLHS"
spot.icon = "tower-observation"
case "Castle":
spot.sig = "WCA/COTA"
spot.icon = "chess-rook"
case "Mill":
spot.sig = "MOTA"
spot.icon = "fan"
case _:
logging.warn("GMA spot found with ref type " + ref_info[
"reftype"] + ", developer needs to figure out an icon for this!")
spot.sig = ref_info["reftype"]
spot.icon = "person-hiking"
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots

65
spotproviders/hema.py Normal file
View File

@@ -0,0 +1,65 @@
import re
from datetime import datetime, timedelta
import pytz
import requests
from requests_cache import CachedSession
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
# Spot provider for HuMPs Excluding Marilyns Award
class HEMA(HTTPSpotProvider):
POLL_INTERVAL_SEC = 300
# HEMA wants us to check for a "spot seed" from the API and see if it's actually changed before querying the main
# data API. So it's actually the SPOT_SEED_URL that we pass into the constructor and get the superclass to call on a
# timer. The actual data lookup all happens after parsing and checking the seed.
SPOT_SEED_URL = "http://www.hema.org.uk/spotSeed.jsp"
SPOTS_URL = "http://www.hema.org.uk/spotsMobile.jsp"
FREQ_MODE_PATTERN = re.compile("^([\\d.]*) \\((.*)\\)$")
SPOTTER_COMMENT_PATTERN = re.compile("^\\((.*)\\) (.*)$")
def __init__(self, provider_config):
super().__init__(provider_config, self.SPOT_SEED_URL, self.POLL_INTERVAL_SEC)
self.spot_seed = ""
def http_response_to_spots(self, http_response):
# OK, source data is actually just the spot seed at this point. We'll then go on to fetch real data if we know
# this has changed.
spot_seed_changed = http_response.text != self.spot_seed
self.spot_seed = http_response.text
new_spots = []
# OK, if the spot seed actually changed, now we make the real request for data.
if spot_seed_changed:
source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS)
source_data_items = source_data.text.split("=")
# Iterate through source data items.
for source_spot in source_data_items:
spot_items = source_spot.split(";")
# Any line with less than 9 items is not a proper spot line
if len(spot_items) >= 9:
# Fiddle with some data to extract bits we need. Freq/mode and spotter/comment come in combined fields.
freq_mode_match = re.search(self.FREQ_MODE_PATTERN, spot_items[5])
spotter_comment_match = re.search(self.SPOTTER_COMMENT_PATTERN, spot_items[6])
# Convert to our spot format
spot = Spot(source=self.name,
dx_call=spot_items[2].upper(),
de_call=spotter_comment_match.group(1).upper(),
freq=float(freq_mode_match.group(1)) * 1000000,
mode=freq_mode_match.group(2).upper(),
comment=spotter_comment_match.group(2),
sig="HEMA",
sig_refs=[spot_items[3].upper()],
sig_refs_names=[spot_items[4]],
icon="mound",
time=datetime.strptime(spot_items[0], "%d/%m/%Y %H:%M").replace(tzinfo=pytz.UTC).timestamp(),
latitude=float(spot_items[7]),
longitude=float(spot_items[8]))
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots

View File

@@ -0,0 +1,61 @@
import logging
from datetime import datetime
from threading import Timer, Thread
from time import sleep
import pytz
import requests
from spotproviders.spot_provider import SpotProvider
# Generic spot provider class for providers that request data via HTTP(S). Just for convenience to avoid code
# duplication. Subclasses of this query the individual APIs for data.
class HTTPSpotProvider(SpotProvider):
def __init__(self, provider_config, url, poll_interval):
super().__init__(provider_config)
self.url = url
self.poll_interval = poll_interval
self.poll_timer = None
def start(self):
# Fire off a one-shot thread to run poll() for the first time, just to ensure start() returns immediately and
# the application can continue starting. The thread itself will then die, and the timer will kick in on its own
# thread.
logging.info("Set up query of " + self.name + " spot API every " + str(self.poll_interval) + " seconds.")
thread = Thread(target=self.poll)
thread.daemon = True
thread.start()
def stop(self):
self.poll_timer.cancel()
def poll(self):
try:
# Request data from API
logging.debug("Polling " + self.name + " spot API...")
http_response = requests.get(self.url, headers=self.HTTP_HEADERS)
# Pass off to the subclass for processing
new_spots = self.http_response_to_spots(http_response)
# Submit the new spots for processing. There might not be any spots for the less popular programs.
if new_spots:
self.submit_batch(new_spots)
self.status = "OK"
self.last_update_time = datetime.now(pytz.UTC)
logging.debug("Received data from " + self.name + " spot API.")
except Exception as e:
self.status = "Error"
logging.exception("Exception in HTTP JSON Spot Provider (" + self.name + ")")
sleep(1)
self.poll_timer = Timer(self.poll_interval, self.poll)
self.poll_timer.start()
# Convert an HTTP response returned by the API into spot data. The whole response is provided here so the subclass
# implementations can check for HTTP status codes if necessary, and handle the response as JSON, XML, text, whatever
# the API actually provides.
def http_response_to_spots(self, http_response):
raise NotImplementedError("Subclasses must implement this method")

View File

@@ -0,0 +1,41 @@
import logging
from datetime import datetime
import pytz
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
# Spot provider for Parks n Peaks
class ParksNPeaks(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://www.parksnpeaks.org/api/ALL"
def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_spots(self, http_response):
new_spots = []
# Iterate through source data
for source_spot in http_response.json():
# Convert to our spot format
spot = Spot(source=self.name,
source_id=source_spot["actID"],
dx_call=source_spot["actCallsign"].upper(),
de_call=source_spot["actSpoter"].upper(), # typo exists in API
freq=float(source_spot["actFreq"].replace(",", "")) * 1000000 if (source_spot["actFreq"] != "") else None, # Seen PNP spots with empty frequency, and with comma-separated thousands digits
mode=source_spot["actMode"].upper(),
comment=source_spot["actComments"],
sig=source_spot["actClass"],
sig_refs=[source_spot["actSiteID"]],
icon="question", # todo determine from actClass
time=datetime.strptime(source_spot["actTime"], "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC).timestamp())
# If this is POTA, SOTA or WWFF data we already have it through other means, so ignore.
if spot.sig not in ["POTA", "SOTA", "WWFF"]:
logging.warn("PNP spot found with sig " + spot.sig + ", developer needs to figure out how to look this up for grid/lat/lon!")
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots

41
spotproviders/pota.py Normal file
View File

@@ -0,0 +1,41 @@
from datetime import datetime
import pytz
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
# Spot provider for Parks on the Air
class POTA(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://api.pota.app/spot/activator"
def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_spots(self, http_response):
new_spots = []
# Iterate through source data
for source_spot in http_response.json():
# Convert to our spot format
spot = Spot(source=self.name,
source_id=source_spot["spotId"],
dx_call=source_spot["activator"].upper(),
de_call=source_spot["spotter"].upper(),
freq=float(source_spot["frequency"]) * 1000,
mode=source_spot["mode"].upper(),
comment=source_spot["comments"],
sig="POTA",
sig_refs=[source_spot["reference"]],
sig_refs_names=[source_spot["name"]],
icon="tree",
time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=pytz.UTC).timestamp(),
grid=source_spot["grid6"],
latitude=source_spot["latitude"],
longitude=source_spot["longitude"])
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots

93
spotproviders/rbn.py Normal file
View File

@@ -0,0 +1,93 @@
import logging
import re
from datetime import datetime, timezone
from threading import Thread
from time import sleep
import pytz
import telnetlib3
from data.spot import Spot
from core.config import SERVER_OWNER_CALLSIGN
from spotproviders.spot_provider import SpotProvider
# Spot provider for the Reverse Beacon Network. Connects to a single port, if you want both CW/RTTY (port 7000) and FT8
# (port 7001) you need to instantiate two copies of this. The port is provided as an argument to the constructor.
class RBN(SpotProvider):
CALLSIGN_PATTERN = "([a-z|0-9|/]+)"
FREQUENCY_PATTERM = "([0-9|.]+)"
LINE_PATTERN = re.compile(
"^DX de " + CALLSIGN_PATTERN + "-.*:\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)",
re.IGNORECASE)
# Constructor requires port number.
def __init__(self, provider_config):
super().__init__(provider_config)
self.port = provider_config["port"]
self.telnet = None
self.thread = Thread(target=self.handle)
self.thread.daemon = True
self.run = True
def start(self):
self.thread.start()
def stop(self):
self.run = False
self.telnet.close()
self.thread.join()
def handle(self):
while self.run:
connected = False
while not connected and self.run:
try:
self.status = "Connecting"
logging.info("RBN port " + str(self.port) + " connecting...")
self.telnet = telnetlib3.Telnet("telnet.reversebeacon.net", self.port)
telnet_output = self.telnet.read_until("Please enter your call: ".encode("latin-1"))
self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("latin-1"))
connected = True
logging.info("RBN port " + str(self.port) + " connected.")
except Exception as e:
self.status = "Error"
logging.exception("Exception while connecting to RBN (port " + str(self.port) + ").")
sleep(5)
self.status = "Waiting for Data"
while connected and self.run:
try:
# Check new telnet info against regular expression
telnet_output = self.telnet.read_until("\n".encode("latin-1"))
match = self.LINE_PATTERN.match(telnet_output.decode("latin-1"))
if match:
spot_time = datetime.strptime(match.group(5), "%H%MZ")
spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC)
spot = Spot(source=self.name,
dx_call=match.group(3),
de_call=match.group(1),
freq=float(match.group(2)) * 1000,
comment=match.group(4).strip(),
icon="tower-cell",
time=spot_datetime.timestamp())
# Add to our list
self.submit(spot)
self.status = "OK"
self.last_update_time = datetime.now(timezone.utc)
logging.debug("Data received from RBN on port " + str(self.port) + ".")
except Exception as e:
connected = False
if self.run:
self.status = "Error"
logging.exception("Exception in RBN provider (port " + str(self.port) + ")")
sleep(5)
else:
logging.info("RBN provider (port " + str(self.port) + ") shutting down...")
self.status = "Shutting down"
self.status = "Disconnected"

64
spotproviders/sota.py Normal file
View File

@@ -0,0 +1,64 @@
from datetime import datetime, timedelta
import requests
from requests_cache import CachedSession
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
# Spot provider for Summits on the Air
class SOTA(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120
# SOTA wants us to check for an "epoch" from the API and see if it's actually changed before querying the main data
# APIs. So it's actually the EPOCH_URL that we pass into the constructor and get the superclass to call on a timer.
# The actual data lookup all happens after parsing and checking the epoch.
EPOCH_URL = "https://api-db2.sota.org.uk/api/spots/epoch"
SPOTS_URL = "https://api-db2.sota.org.uk/api/spots/60/all/all"
# SOTA spots don't contain lat/lon, we need a separate lookup for that
SUMMIT_URL_ROOT = "https://api-db2.sota.org.uk/api/summits/"
SUMMIT_DATA_CACHE_TIME_DAYS = 30
SUMMIT_DATA_CACHE = CachedSession("sota_summit_data_cache", expire_after=timedelta(days=SUMMIT_DATA_CACHE_TIME_DAYS))
def __init__(self, provider_config):
super().__init__(provider_config, self.EPOCH_URL, self.POLL_INTERVAL_SEC)
self.api_epoch = ""
def http_response_to_spots(self, http_response):
# OK, source data is actually just the epoch at this point. We'll then go on to fetch real data if we know this
# has changed.
epoch_changed = http_response.text != self.api_epoch
self.api_epoch = http_response.text
new_spots = []
# OK, if the epoch actually changed, now we make the real request for data.
if epoch_changed:
source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS).json()
# Iterate through source data
for source_spot in source_data:
# Convert to our spot format
spot = Spot(source=self.name,
source_id=source_spot["id"],
dx_call=source_spot["activatorCallsign"].upper(),
dx_name=source_spot["activatorName"],
de_call=source_spot["callsign"].upper(),
freq=(float(source_spot["frequency"]) * 1000000) if (source_spot["frequency"] is not None) else None, # Seen SOTA spots with no frequency!
mode=source_spot["mode"].upper(),
comment=source_spot["comments"],
sig="SOTA",
sig_refs=[source_spot["summitCode"]],
sig_refs_names=[source_spot["summitName"]],
icon="mountain-sun",
time=datetime.fromisoformat(source_spot["timeStamp"]).timestamp(),
activation_score=source_spot["points"])
# SOTA doesn't give summit lat/lon/grid in the main call, so we need another separate call for this
summit_data = self.SUMMIT_DATA_CACHE.get(self.SUMMIT_URL_ROOT + source_spot["summitCode"], headers=self.HTTP_HEADERS).json()
spot.grid = summit_data["locator"]
spot.latitude = summit_data["latitude"]
spot.longitude = summit_data["longitude"]
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots

View File

@@ -0,0 +1,57 @@
from datetime import datetime
import pytz
from core.constants import SOFTWARE_NAME, SOFTWARE_VERSION
from core.config import SERVER_OWNER_CALLSIGN, MAX_SPOT_AGE
# Generic spot provider class. Subclasses of this query the individual APIs for data.
class SpotProvider:
# HTTP headers used for spot providers that use HTTP
HTTP_HEADERS = { "User-Agent": SOFTWARE_NAME + " " + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")" }
# Constructor
def __init__(self, provider_config):
self.name = provider_config["name"]
self.enabled = provider_config["enabled"]
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
self.last_spot_time = datetime.min.replace(tzinfo=pytz.UTC)
self.status = "Not Started" if self.enabled else "Disabled"
self.spots = None
# Set up the provider, e.g. giving it the spot list to work from
def setup(self, spots):
self.spots = spots
# Start the provider. This should return immediately after spawning threads to access the remote resources
def start(self):
raise NotImplementedError("Subclasses must implement this method")
# Submit a batch of spots retrieved from the provider. Only spots that are newer than the last spot retrieved
# by this provider will be added to the spot list, to prevent duplications. Spots passing the check will also have
# their infer_missing() method called to complete their data set. This is called by the API-querying
# subclasses on receiving spots.
def submit_batch(self, spots):
for spot in spots:
if datetime.fromtimestamp(spot.time, pytz.UTC) > self.last_spot_time:
# Fill in any blanks
spot.infer_missing()
# Add to the list
self.spots.add(spot.id, spot, expire=MAX_SPOT_AGE)
self.last_spot_time = datetime.fromtimestamp(max(map(lambda s: s.time, spots)), pytz.UTC)
# Submit a single spot retrieved from the provider. This will be added to the list regardless of its age. Spots
# passing the check will also have their infer_missing() method called to complete their data set. This is called by
# the data streaming subclasses, which can be relied upon not to re-provide old spots.
def submit(self, spot):
# Fill in any blanks
spot.infer_missing()
# Add to the list
self.spots.add(spot.id, spot, expire=MAX_SPOT_AGE)
self.last_spot_time = datetime.fromtimestamp(spot.time, pytz.UTC)
# Stop any threads and prepare for application shutdown
def stop(self):
raise NotImplementedError("Subclasses must implement this method")

48
spotproviders/wwbota.py Normal file
View File

@@ -0,0 +1,48 @@
from datetime import datetime
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
# Spot provider for Worldwide Bunkers on the Air
class WWBOTA(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://api.wwbota.org/spots/"
def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_spots(self, http_response):
new_spots = []
# Iterate through source data
for source_spot in http_response.json():
# Convert to our spot format. First we unpack references, because WWBOTA spots can have more than one for
# n-fer activations.
refs = []
ref_names = []
for ref in source_spot["references"]:
refs.append(ref["reference"])
ref_names.append(ref["name"])
spot = Spot(source=self.name,
dx_call=source_spot["call"].upper(),
de_call=source_spot["spotter"].upper(),
freq=float(source_spot["freq"]) * 1000000,
mode=source_spot["mode"].upper(),
comment=source_spot["comment"],
sig="WWBOTA",
sig_refs=refs,
sig_refs_names=ref_names,
icon="radiation",
time=datetime.fromisoformat(source_spot["time"]).timestamp(),
# WWBOTA spots can contain multiple references for bunkers being activated simultaneously. For
# now, we will just pick the first one to use as our grid, latitude and longitude.
grid=source_spot["references"][0]["locator"],
latitude=source_spot["references"][0]["lat"],
longitude=source_spot["references"][0]["long"],
qrt=source_spot["type"] == "QRT")
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us. But WWBOTA does support a special "Test" spot type, we need to avoid adding that.
if source_spot["type"] != "Test":
new_spots.append(spot)
return new_spots

40
spotproviders/wwff.py Normal file
View File

@@ -0,0 +1,40 @@
from datetime import datetime
import pytz
from data.spot import Spot
from spotproviders.http_spot_provider import HTTPSpotProvider
# Spot provider for Worldwide Flora & Fauna
class WWFF(HTTPSpotProvider):
POLL_INTERVAL_SEC = 120
SPOTS_URL = "https://spots.wwff.co/static/spots.json"
def __init__(self, provider_config):
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
def http_response_to_spots(self, http_response):
new_spots = []
# Iterate through source data
for source_spot in http_response.json():
# Convert to our spot format
spot = Spot(source=self.name,
source_id=source_spot["id"],
dx_call=source_spot["activator"].upper(),
de_call=source_spot["spotter"].upper(),
freq=float(source_spot["frequency_khz"]) * 1000,
mode=source_spot["mode"].upper(),
comment=source_spot["remarks"],
sig="WWFF",
sig_refs=[source_spot["reference"]],
sig_refs_names=[source_spot["reference_name"]],
icon="seedling",
time=datetime.fromtimestamp(source_spot["spot_time"], tz=pytz.UTC).timestamp(),
latitude=source_spot["latitude"],
longitude=source_spot["longitude"])
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
# that for us.
new_spots.append(spot)
return new_spots