import re from datetime import datetime, timedelta import pytz from requests_cache import CachedSession from core.constants import HTTP_HEADERS from core.sig_utils import get_icon_for_sig, get_ref_regex_for_sig from data.spot import Spot from spotproviders.http_spot_provider import HTTPSpotProvider # Spot provider for UK Packet Radio network API class UKPacketNet(HTTPSpotProvider): POLL_INTERVAL_SEC = 600 SPOTS_URL = "https://nodes.ukpacketradio.network/api/nodedata" def __init__(self, provider_config): super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC) def http_response_to_spots(self, http_response): new_spots = [] # Iterate through source data nodes = http_response.json()["nodes"] for callsign, node in nodes.items(): # The node corresponse to the spotter here. It has an "mheard" section which indicates which nodes it has # recently heard, which will be our "DX". But "mheard" stations are not necessarily over RF, they could be # via the internet, so we also need to look up the "port" on which the station was heard, and check that it # is RF. if "mheard" in node: for heard in node["mheard"]: heard_port_id = heard["port"] if "ports" in node: for listed_port in node["ports"]: if listed_port["id"] == heard_port_id and listed_port["linkType"] == "RF": # This is another packet station heard over RF, so we are good to create a Spot object. # First build a "full" comment combining some of the extra info comment = listed_port["comment"] if "comment" in listed_port else "" comment = (comment + " " + listed_port["mode"]) if "mode" in listed_port else comment comment = (comment + " " + listed_port["modulation"]) if "modulation" in listed_port else comment comment = (comment + " " + str(listed_port["baud"]) + " baud") if "baud" in listed_port and listed_port["baud"] > 0 else comment # Get frequency from the comment if it's not set properly in the data structure. This is # very hacky but a lot of node comments contain their frequency as the first or second # word of their comment, but not in the proper data structure field. freq = listed_port["freq"] if "freq" in listed_port and listed_port["freq"] > 0 else None if not freq and comment: possible_freq = comment.split(" ")[0].upper().replace("MHZ", "") if re.match(r"^[0-9.]+$", possible_freq) and possible_freq != "1200" and possible_freq != "9600": freq = float(possible_freq) * 1000000 if not freq and len(comment.split(" ")) > 1: possible_freq = comment.split(" ")[1].upper().replace("MHZ", "") if re.match(r"^[0-9.]+$", possible_freq) and possible_freq != "1200" and possible_freq != "9600": freq = float(possible_freq) * 1000000 # Check for a found frequency likely having been in kHz, sorry to all GHz packet folks if freq and freq > 1000000000: freq = freq / 1000 # Now build the spot object spot = Spot(source=self.name, dx_call=heard["callsign"].upper(), de_call=node["callsign"].upper(), freq=freq, mode="PKT", comment=comment, icon="tower-cell", time=datetime.strptime(heard["lastHeard"], "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC).timestamp(), de_grid=node["location"]["locator"] if "locator" in node["location"] else None, de_latitude=node["location"]["coords"]["lat"], de_longitude=node["location"]["coords"]["lon"]) # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do # that for us. new_spots.append(spot) break # Now we have a list of every node that heard every other node via RF, as Spothole spots. What each spot doesn't # yet have is a DX lat/lon/grid, because the data doesn't provide the location of the "mheard" stations within # the structure. However, each "heard" station should also be represented in the list somewhere with its own # data, and we can use that to look these up. for spot in new_spots: if spot.dx_call in nodes: spot.dx_grid = nodes[spot.dx_call]["location"]["locator"] if "locator" in nodes[spot.dx_call]["location"] else None spot.dx_latitude = nodes[spot.dx_call]["location"]["coords"]["lat"] spot.dx_longitude = nodes[spot.dx_call]["location"]["coords"]["lon"] return new_spots