mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-02-04 09:14:30 +00:00
39C3 TOTA location lookup
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
import pytz
|
||||
@@ -9,31 +11,45 @@ from spotproviders.websocket_spot_provider import WebsocketSpotProvider
|
||||
|
||||
|
||||
# Spot provider for servers based on the "xOTA" software at https://github.com/nischu/xOTA/
|
||||
# The provider typically doesn't give us a lat/lon or SIG explicitly, so our own config provides this information. This
|
||||
# functionality is implemented for TOTA events.
|
||||
# The provider typically doesn't give us a lat/lon or SIG explicitly, so our own config provides a SIG and a reference
|
||||
# to a local CSV file with location information. This functionality is implemented for TOTA events, of which there are
|
||||
# several - so a plain lookup of a "TOTA reference" doesn't make sense, it depends on which TOTA and hence which server
|
||||
# supplied the data, which is why the CSV location lookup is here and not in sig_utils.
|
||||
class XOTA(WebsocketSpotProvider):
|
||||
FIXED_LATITUDE = None
|
||||
FIXED_LONGITUDE = None
|
||||
LOCATION_DATA = {}
|
||||
SIG = None
|
||||
|
||||
def __init__(self, provider_config):
|
||||
super().__init__(provider_config, provider_config["url"])
|
||||
self.FIXED_LATITUDE = provider_config["latitude"] if "latitude" in provider_config else None
|
||||
self.FIXED_LONGITUDE = provider_config["longitude"] if "longitude" in provider_config else None
|
||||
locations_csv = provider_config["locations-csv"] if "locations-csv" in provider_config else None
|
||||
self.SIG = provider_config["sig"] if "sig" in provider_config else None
|
||||
|
||||
# Load location data
|
||||
if locations_csv:
|
||||
try:
|
||||
f = open(locations_csv)
|
||||
csv_data = f.read()
|
||||
dr = csv.DictReader(csv_data.splitlines())
|
||||
for row in dr:
|
||||
self.LOCATION_DATA[row["ref"]] = {"lat": row["lat"], "lon": row["lon"]}
|
||||
except:
|
||||
logging.exception("Could not look up location data for XOTA source.")
|
||||
|
||||
def ws_message_to_spot(self, bytes):
|
||||
string = bytes.decode("utf-8")
|
||||
source_spot = json.loads(string)
|
||||
ref_id = source_spot["reference"]["title"]
|
||||
lat = float(self.LOCATION_DATA[ref_id]["lat"]) if ref_id in self.LOCATION_DATA else None
|
||||
lon = float(self.LOCATION_DATA[ref_id]["lon"]) if ref_id in self.LOCATION_DATA else None
|
||||
spot = Spot(source=self.name,
|
||||
source_id=source_spot["id"],
|
||||
dx_call=source_spot["stationCallSign"].upper(),
|
||||
freq=float(source_spot["freq"]) * 1000,
|
||||
mode=source_spot["mode"].upper(),
|
||||
sig=self.SIG,
|
||||
sig_refs=[SIGRef(id=source_spot["reference"]["title"], sig=self.SIG, url=source_spot["reference"]["website"])],
|
||||
sig_refs=[SIGRef(id=ref_id, sig=self.SIG, url=source_spot["reference"]["website"], latitude=lat, longitude=lon)],
|
||||
time=datetime.now(pytz.UTC).timestamp(),
|
||||
dx_latitude=self.FIXED_LATITUDE,
|
||||
dx_longitude=self.FIXED_LONGITUDE,
|
||||
dx_latitude=lat,
|
||||
dx_longitude=lon,
|
||||
qrt=source_spot["state"] != "active")
|
||||
return spot
|
||||
|
||||
Reference in New Issue
Block a user