mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2025-12-15 16:43:38 +00:00
86 lines
5.5 KiB
Python
86 lines
5.5 KiB
Python
import re
|
|
from datetime import datetime
|
|
|
|
import pytz
|
|
|
|
from data.spot import Spot
|
|
from spotproviders.http_spot_provider import HTTPSpotProvider
|
|
|
|
|
|
# Spot provider for UK Packet Radio network API
|
|
class UKPacketNet(HTTPSpotProvider):
|
|
POLL_INTERVAL_SEC = 600
|
|
SPOTS_URL = "https://nodes.ukpacketradio.network/api/nodedata"
|
|
|
|
def __init__(self, provider_config):
|
|
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
|
|
|
|
def http_response_to_spots(self, http_response):
|
|
new_spots = []
|
|
# Iterate through source data
|
|
nodes = http_response.json()["nodes"]
|
|
for callsign, node in nodes.items():
|
|
# The node corresponse to the spotter here. It has an "mheard" section which indicates which nodes it has
|
|
# recently heard, which will be our "DX". But "mheard" stations are not necessarily over RF, they could be
|
|
# via the internet, so we also need to look up the "port" on which the station was heard, and check that it
|
|
# is RF.
|
|
if "mheard" in node:
|
|
for heard in node["mheard"]:
|
|
heard_port_id = heard["port"]
|
|
if "ports" in node:
|
|
for listed_port in node["ports"]:
|
|
if listed_port["id"] == heard_port_id and listed_port["linkType"] == "RF":
|
|
# This is another packet station heard over RF, so we are good to create a Spot object.
|
|
|
|
# First build a "full" comment combining some of the extra info
|
|
comment = listed_port["comment"] if "comment" in listed_port else ""
|
|
comment = (comment + " " + listed_port["mode"]) if "mode" in listed_port else comment
|
|
comment = (comment + " " + listed_port["modulation"]) if "modulation" in listed_port else comment
|
|
comment = (comment + " " + str(listed_port["baud"]) + " baud") if "baud" in listed_port and listed_port["baud"] > 0 else comment
|
|
|
|
# Get frequency from the comment if it's not set properly in the data structure. This is
|
|
# very hacky but a lot of node comments contain their frequency as the first or second
|
|
# word of their comment, but not in the proper data structure field.
|
|
freq = listed_port["freq"] if "freq" in listed_port and listed_port["freq"] > 0 else None
|
|
if not freq and comment:
|
|
possible_freq = comment.split(" ")[0].upper().replace("MHZ", "")
|
|
if re.match(r"^[0-9.]+$", possible_freq) and possible_freq != "1200" and possible_freq != "9600":
|
|
freq = float(possible_freq) * 1000000
|
|
if not freq and len(comment.split(" ")) > 1:
|
|
possible_freq = comment.split(" ")[1].upper().replace("MHZ", "")
|
|
if re.match(r"^[0-9.]+$", possible_freq) and possible_freq != "1200" and possible_freq != "9600":
|
|
freq = float(possible_freq) * 1000000
|
|
# Check for a found frequency likely having been in kHz, sorry to all GHz packet folks
|
|
if freq and freq > 1000000000:
|
|
freq = freq / 1000
|
|
|
|
# Now build the spot object
|
|
spot = Spot(source=self.name,
|
|
dx_call=heard["callsign"].upper(),
|
|
de_call=node["callsign"].upper(),
|
|
freq=freq,
|
|
mode="PKT",
|
|
comment=comment,
|
|
icon="tower-cell",
|
|
time=datetime.strptime(heard["lastHeard"], "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC).timestamp(),
|
|
de_grid=node["location"]["locator"] if "locator" in node["location"] else None,
|
|
de_latitude=node["location"]["coords"]["lat"],
|
|
de_longitude=node["location"]["coords"]["lon"])
|
|
|
|
# Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do
|
|
# that for us.
|
|
new_spots.append(spot)
|
|
break
|
|
|
|
# Now we have a list of every node that heard every other node via RF, as Spothole spots. What each spot doesn't
|
|
# yet have is a DX lat/lon/grid, because the data doesn't provide the location of the "mheard" stations within
|
|
# the structure. However, each "heard" station should also be represented in the list somewhere with its own
|
|
# data, and we can use that to look these up.
|
|
for spot in new_spots:
|
|
if spot.dx_call in nodes:
|
|
spot.dx_grid = nodes[spot.dx_call]["location"]["locator"] if "locator" in nodes[spot.dx_call]["location"] else None
|
|
spot.dx_latitude = nodes[spot.dx_call]["location"]["coords"]["lat"]
|
|
spot.dx_longitude = nodes[spot.dx_call]["location"]["coords"]["lon"]
|
|
|
|
return new_spots
|