Files
spothole/data/spot.py

409 lines
20 KiB
Python

import copy
import hashlib
import json
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
import pytz
from pyhamtools.locator import locator_to_latlong, latlong_to_locator
from core.config import MAX_SPOT_AGE
from core.constants import MODE_ALIASES
from core.geo_utils import lat_lon_to_cq_zone, lat_lon_to_itu_zone
from core.lookup_helper import lookup_helper
from core.sig_utils import populate_sig_ref_info, ANY_SIG_REGEX, get_ref_regex_for_sig
from data.sig_ref import SIGRef
@dataclass
class Spot:
"""Data class that defines a spot."""
# Unique identifier for the spot
id: str = None
# DX (spotted) operator info
# Callsign of the operator that has been spotted
dx_call: str = None
# Name of the operator that has been spotted
dx_name: str = None
# QTH of the operator that has been spotted. This could be from any SIG refs or could be from online lookup of their
# home QTH.
dx_qth: str = None
# Country of the DX operator
dx_country: str = None
# Country flag of the DX operator
dx_flag: str = None
# Continent of the DX operator
dx_continent: str = None
# DXCC ID of the DX operator
dx_dxcc_id: int = None
# CQ zone of the DX operator
dx_cq_zone: int = None
# ITU zone of the DX operator
dx_itu_zone: int = None
# If this is an APRS/Packet/etc spot, what SSID was the DX operator using?
dx_ssid: str = None
# Maidenhead grid locator for the DX. This could be from a geographical reference e.g. POTA, or just from the
# country
dx_grid: str = None
# Latitude & longitude of the DX, in degrees. This could be from a geographical reference e.g. POTA, or from a QRZ
# lookup
dx_latitude: float = None
dx_longitude: float = None
# DX Location source. Indicates how accurate the location might be. Values: "SPOT", "WAB/WAI GRID", "HOME QTH",
# "DXCC", "NONE"
dx_location_source: str = "NONE"
# DX Location good. Indicates that the software thinks the location data is good enough to plot on a map. This is
# true if the location source is "SPOT" or "WAB/WAI GRID", or if the location source is "HOME QTH" and the DX
# callsign doesn't have a suffix like /P.
dx_location_good: bool = False
# DE (Spotter) info
# Callsign of the spotter
de_call: str = None
# Country of the spotter
de_country: str = None
# Country flag of the spotter
de_flag: str = None
# Continent of the spotter
de_continent: str = None
# DXCC ID of the spotter
de_dxcc_id: int = None
# If this is an APRS/Packet/etc spot, what SSID was the spotter/receiver using?
de_ssid: str = None
# Maidenhead grid locator for the spotter. This is not going to be from a xOTA reference so it will likely just be
# a QRZ or DXCC lookup. If the spotter is also portable, this is probably wrong, but it's good enough for some
# simple mapping.
de_grid: str = None
# Latitude & longitude of the DX, in degrees. This is not going to be from a xOTA reference so it will likely just
# be a QRZ or DXCC lookup. If the spotter is also portable, this is probably wrong, but it's good enough for some
# simple mapping.
de_latitude: float = None
de_longitude: float = None
# General QSO info
# Reported mode, such as SSB, PHONE, CW, FT8...
mode: str = None
# Inferred mode "family". One of "CW", "PHONE" or "DIGI".
mode_type: str = None
# Source of the mode information. "SPOT", "COMMENT", "BANDPLAN" or "NONE"
mode_source: str = "NONE"
# Frequency, in Hz
freq: float = None
# Band, defined by the frequency, e.g. "40m" or "70cm"
band: str = None
# Comment left by the spotter, if any
comment: str = None
# QRT state. Some APIs return spots marked as QRT. Otherwise we can check the comments.
qrt: bool = False
# Special Interest Group info
# Special Interest Group (SIG), e.g. outdoor activity programme such as POTA
sig: str = None
# SIG references. We allow multiple here for e.g. n-fer activations, unlike ADIF SIG_INFO
sig_refs: list = None
# Timing info
# Time of the spot, UTC seconds since UNIX epoch
time: float = None
# Time of the spot, ISO 8601
time_iso: str = None
# Time that this software received the spot, UTC seconds since UNIX epoch. This is used with the "since_received"
# call to our API to receive all data that is new to us, even if by a quirk of the API it might be older than the
# list time the client polled the API.
received_time: float = None
# Time that this software received the spot, ISO 8601
received_time_iso: str = None
# Source info
# Where we got the spot from, e.g. "POTA", "Cluster"...
source: str = None
# The ID the source gave it, if any.
source_id: str = None
def infer_missing(self):
"""Infer missing parameters where possible"""
# If we somehow don't have a spot time, set it to zero so it sorts off the bottom of any list but
# clients can still reliably parse it as a number.
if not self.time:
self.time = 0
# If we don't have a received time, this has just been received so set that to "now"
if not self.received_time:
self.received_time = datetime.now(pytz.UTC).timestamp()
# Fill in ISO versions of times, in case the client prefers that
if self.time and not self.time_iso:
self.time_iso = datetime.fromtimestamp(self.time, pytz.UTC).isoformat()
if self.received_time and not self.received_time_iso:
self.received_time_iso = datetime.fromtimestamp(self.received_time, pytz.UTC).isoformat()
# Clean up DX call if it has an SSID or -# from RBN
if self.dx_call and "-" in self.dx_call:
split = self.dx_call.split("-")
self.dx_call = split[0]
if len(split) > 1 and split[1] != "#":
self.dx_ssid = split[1]
# DX country, continent etc. from callsign
if self.dx_call and not self.dx_country:
self.dx_country = lookup_helper.infer_country_from_callsign(self.dx_call)
if self.dx_call and not self.dx_continent:
self.dx_continent = lookup_helper.infer_continent_from_callsign(self.dx_call)
if self.dx_call and not self.dx_dxcc_id:
self.dx_dxcc_id = lookup_helper.infer_dxcc_id_from_callsign(self.dx_call)
if self.dx_dxcc_id and not self.dx_flag:
self.dx_flag = lookup_helper.get_flag_for_dxcc(self.dx_dxcc_id)
# Clean up spotter call if it has an SSID or -# from RBN
if self.de_call and "-" in self.de_call:
split = self.de_call.split("-")
self.de_call = split[0]
if len(split) > 1 and split[1] != "#":
self.de_ssid = split[1]
# If we have a spotter of "RBNHOLE", we should have the actual spotter callsign in the comment, so extract it.
# RBNHole posts come from a number of providers, so it's dealt with here in the generic spot handling code.
if self.de_call == "RBNHOLE" and self.comment:
rbnhole_call_match = re.search(r"\Wat ([a-z0-9/]+)\W", self.comment, re.IGNORECASE)
if rbnhole_call_match:
self.de_call = rbnhole_call_match.group(1).upper()
# If we have a spotter of "SOTAMAT", we might have the actual spotter callsign in the comment, if so extract it.
# SOTAMAT can do POTA as well as SOTA, so it's dealt with here in the generic spot handling code.
if self.de_call == "SOTAMAT" and self.comment:
sotamat_call_match = re.search(r"\Wfrom ([a-z0-9/]+)]", self.comment, re.IGNORECASE)
if sotamat_call_match:
self.de_call = sotamat_call_match.group(1).upper()
# Spotter country, continent, zones etc. from callsign.
# DE call with no digits, or APRS servers starting "T2" are not things we can look up location for
if self.de_call and any(char.isdigit() for char in self.de_call) and not (
self.de_call.startswith("T2") and self.source == "APRS-IS"):
if not self.de_country:
self.de_country = lookup_helper.infer_country_from_callsign(self.de_call)
if not self.de_continent:
self.de_continent = lookup_helper.infer_continent_from_callsign(self.de_call)
if not self.de_dxcc_id:
self.de_dxcc_id = lookup_helper.infer_dxcc_id_from_callsign(self.de_call)
if self.de_dxcc_id and not self.de_flag:
self.de_flag = lookup_helper.get_flag_for_dxcc(self.de_dxcc_id)
# Band from frequency
if self.freq and not self.band:
band = lookup_helper.infer_band_from_freq(self.freq)
self.band = band.name
# Mode from comments or bandplan
if self.mode:
self.mode_source = "SPOT"
if self.comment and not self.mode:
self.mode = lookup_helper.infer_mode_from_comment(self.comment)
self.mode_source = "COMMENT"
if self.freq and not self.mode:
self.mode = lookup_helper.infer_mode_from_frequency(self.freq)
self.mode_source = "BANDPLAN"
# Normalise mode if necessary.
if self.mode in MODE_ALIASES:
self.mode = MODE_ALIASES[self.mode]
# Mode type from mode
if self.mode and not self.mode_type:
self.mode_type = lookup_helper.infer_mode_type_from_mode(self.mode)
# If we have a latitude or grid at this point, it can only have been provided by the spot itself
if self.dx_latitude or self.dx_grid:
self.dx_location_source = "SPOT"
# Set the top-level "SIG" if it is missing but we have at least one SIG ref.
if not self.sig and self.sig_refs and len(self.sig_refs) > 0:
self.sig = self.sig_refs[0].sig.upper()
# See if we already have a SIG reference, but the comment looks like it contains more for the same SIG. This
# should catch e.g. POTA comments like "2-fer: GB-0001 GB-0002".
if self.comment and self.sig_refs and len(self.sig_refs) > 0 and self.sig_refs[0].sig:
sig = self.sig_refs[0].sig.upper()
regex = get_ref_regex_for_sig(sig)
if regex:
all_comment_ref_matches = re.finditer(r"(^|\W)(" + regex + r")(^|\W)", self.comment, re.IGNORECASE)
for ref_match in all_comment_ref_matches:
self.append_sig_ref_if_missing(SIGRef(id=ref_match.group(2).upper(), sig=sig))
# See if the comment looks like it contains any SIGs (and optionally SIG references) that we can
# add to the spot. This should catch cluster spot comments like "POTA GB-0001 WWFF GFF-0001" and e.g. POTA
# comments like "also WWFF GFF-0001".
if self.comment:
sig_matches = re.finditer(r"(^|\W)" + ANY_SIG_REGEX + r"($|\W)", self.comment, re.IGNORECASE)
for sig_match in sig_matches:
# First of all, if we haven't got a SIG for this spot set yet, now we have. This covers things like cluster
# spots where the comment is just "POTA".
found_sig = sig_match.group(2).upper()
if not self.sig:
self.sig = found_sig
# Now look to see if that SIG name was followed by something that looks like a reference ID for that SIG.
# If so, add that to the sig_refs list for this spot.
ref_regex = get_ref_regex_for_sig(found_sig)
if ref_regex:
ref_matches = re.finditer(r"(^|\W)" + found_sig + r"($|\W)(" + ref_regex + r")($|\W)", self.comment,
re.IGNORECASE)
for ref_match in ref_matches:
self.append_sig_ref_if_missing(SIGRef(id=ref_match.group(3).upper(), sig=found_sig))
# Fetch SIG data. In case a particular API doesn't provide a full set of name, lat, lon & grid for a reference
# in its initial call, we use this code to populate the rest of the data. This includes working out grid refs
# from WAB and WAI, which count as a SIG even though there's no real lookup, just maths
if self.sig_refs and len(self.sig_refs) > 0:
for sig_ref in self.sig_refs:
sig_ref = populate_sig_ref_info(sig_ref)
# If the spot itself doesn't have location yet, but the SIG ref does, extract it
if sig_ref.grid and not self.dx_grid:
self.dx_grid = sig_ref.grid
if sig_ref.latitude and not self.dx_latitude:
self.dx_latitude = sig_ref.latitude
self.dx_longitude = sig_ref.longitude
if self.sig == "WAB" or self.sig == "WAI":
self.dx_location_source = "WAB/WAI GRID"
else:
self.dx_location_source = "SIG REF LOOKUP"
# If the spot itself doesn't have a SIG yet, but we have at least one SIG reference, take that reference's SIG
# and apply it to the whole spot.
if self.sig_refs and len(self.sig_refs) > 0 and not self.sig:
self.sig = self.sig_refs[0].sig
# DX Grid to lat/lon and vice versa in case one is missing
if self.dx_grid and not self.dx_latitude:
try:
ll = locator_to_latlong(self.dx_grid)
self.dx_latitude = ll[0]
self.dx_longitude = ll[1]
except:
logging.debug("Invalid grid received for spot")
if self.dx_latitude and self.dx_longitude and not self.dx_grid:
try:
self.dx_grid = latlong_to_locator(self.dx_latitude, self.dx_longitude, 8)
except:
logging.debug("Invalid lat/lon received for spot")
# QRT comment detection
if self.comment and not self.qrt:
self.qrt = "QRT" in self.comment.upper()
# DX operator details lookup, using QRZ.com. This should be the last resort compared to taking the data from
# the actual spotting service, e.g. we don't want to accidentally use a user's QRZ.com home lat/lon instead of
# the one from the park reference they're at.
if self.dx_call and not self.dx_name:
self.dx_name = lookup_helper.infer_name_from_callsign_online_lookup(self.dx_call)
if self.dx_call and not self.dx_latitude:
latlon = lookup_helper.infer_latlon_from_callsign_online_lookup(self.dx_call)
if latlon:
self.dx_latitude = latlon[0]
self.dx_longitude = latlon[1]
self.dx_grid = lookup_helper.infer_grid_from_callsign_online_lookup(self.dx_call)
self.dx_location_source = "HOME QTH"
# Determine a "QTH" string. If we have a SIG ref, pick the first one and turn it into a suitable stirng,
# otherwise see what they have set on an online lookup service.
if self.sig_refs and len(self.sig_refs) > 0:
self.dx_qth = self.sig_refs[0].id
if self.sig_refs[0].name:
self.dx_qth = self.dx_qth + " " + self.sig_refs[0].name
else:
self.dx_qth = lookup_helper.infer_qth_from_callsign_online_lookup(self.dx_call)
# Last resort for getting a DX position, use the DXCC entity.
if self.dx_call and not self.dx_latitude:
latlon = lookup_helper.infer_latlon_from_callsign_dxcc(self.dx_call)
if latlon:
self.dx_latitude = latlon[0]
self.dx_longitude = latlon[1]
self.dx_grid = lookup_helper.infer_grid_from_callsign_dxcc(self.dx_call)
self.dx_location_source = "DXCC"
# CQ and ITU zone lookup, preferably from location but failing that, from callsign
if not self.dx_cq_zone:
if self.dx_latitude:
self.dx_cq_zone = lat_lon_to_cq_zone(self.dx_latitude, self.dx_longitude)
elif self.dx_call:
self.dx_cq_zone = lookup_helper.infer_cq_zone_from_callsign(self.dx_call)
if not self.dx_itu_zone:
if self.dx_latitude:
self.dx_itu_zone = lat_lon_to_itu_zone(self.dx_latitude, self.dx_longitude)
elif self.dx_call:
self.dx_itu_zone = lookup_helper.infer_itu_zone_from_callsign(self.dx_call)
# DX Location is "good" if it is from a spot, or from QRZ if the callsign doesn't contain a slash, so the operator
# is likely at home.
self.dx_location_good = self.dx_latitude and self.dx_longitude and (
self.dx_location_source == "SPOT" or self.dx_location_source == "SIG REF LOOKUP"
or self.dx_location_source == "WAB/WAI GRID"
or (self.dx_location_source == "HOME QTH" and not "/" in self.dx_call))
# DE with no digits and APRS servers starting "T2" are not things we can look up location for
if self.de_call and any(char.isdigit() for char in self.de_call) and not (
self.de_call.startswith("T2") and self.source == "APRS-IS"):
# DE operator position lookup, using QRZ.com.
if not self.de_latitude:
latlon = lookup_helper.infer_latlon_from_callsign_online_lookup(self.de_call)
if latlon:
self.de_latitude = latlon[0]
self.de_longitude = latlon[1]
self.de_grid = lookup_helper.infer_grid_from_callsign_online_lookup(self.de_call)
# Last resort for getting a DE position, use the DXCC entity.
if not self.de_latitude:
latlon = lookup_helper.infer_latlon_from_callsign_dxcc(self.de_call)
if latlon:
self.de_latitude = latlon[0]
self.de_longitude = latlon[1]
self.de_grid = lookup_helper.infer_grid_from_callsign_dxcc(self.de_call)
# Always create an ID based on a hash of every parameter *except* received_time. This is used as the index
# to a map, which as a byproduct avoids us having multiple duplicate copies of the object that are identical
# apart from that they were retrieved from the API at different times. Note that the simple Python hash()
# function includes a seed randomly generated at runtime; this is therefore not consistent between runs. But we
# use diskcache to store our data between runs, so we use SHA256 which does not include this random element.
self_copy = copy.deepcopy(self)
self_copy.received_time = 0
self_copy.received_time_iso = ""
self.id = hashlib.sha256(str(self_copy).encode("utf-8")).hexdigest()
def to_json(self):
"""JSON serialise"""
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)
def append_sig_ref_if_missing(self, new_sig_ref):
"""Append a sig_ref to the list, so long as it's not already there."""
if not self.sig_refs:
self.sig_refs = []
new_sig_ref.id = new_sig_ref.id.strip().upper()
new_sig_ref.sig = new_sig_ref.sig.strip().upper()
if new_sig_ref.id == "":
return
for sig_ref in self.sig_refs:
if sig_ref.id == new_sig_ref.id and sig_ref.sig == new_sig_ref.sig:
return
self.sig_refs.append(new_sig_ref)
def expired(self):
"""Decide if this spot has expired (in which case it should not be added to the system in the first place, and not
returned by the web server if later requested, and removed by the cleanup functions). "Expired" is defined as
either having a time further ago than the server's MAX_SPOT_AGE. If it somehow doesn't have a time either, it is
considered to be expired."""
return not self.time or self.time < (datetime.now(pytz.UTC) - timedelta(seconds=MAX_SPOT_AGE)).timestamp()