mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2025-10-27 08:49:27 +00:00
Get ZLOTA spots from its own API rather than PnP. Closes #37
This commit is contained in:
59
spotproviders/zlota.py
Normal file
59
spotproviders/zlota.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import csv
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pytz
|
||||
from requests_cache import CachedSession
|
||||
|
||||
from core.constants import HTTP_HEADERS
|
||||
from core.sig_utils import get_icon_for_sig
|
||||
from data.spot import Spot
|
||||
from spotproviders.http_spot_provider import HTTPSpotProvider
|
||||
|
||||
|
||||
# Spot provider for ZLOTA
|
||||
class ZLOTA(HTTPSpotProvider):
|
||||
POLL_INTERVAL_SEC = 120
|
||||
SPOTS_URL = "https://ontheair.nz/api/spots?zlota_only=true"
|
||||
LIST_URL = "https://ontheair.nz/assets/assets.json"
|
||||
LIST_CACHE_TIME_DAYS = 30
|
||||
LIST_CACHE = CachedSession("cache/zlota_data_cache", expire_after=timedelta(days=LIST_CACHE_TIME_DAYS))
|
||||
|
||||
def __init__(self, provider_config):
|
||||
super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC)
|
||||
|
||||
def http_response_to_spots(self, http_response):
|
||||
new_spots = []
|
||||
# Iterate through source data
|
||||
for source_spot in http_response.json():
|
||||
# Frequency is often inconsistent as to whether it's in Hz or kHz. Make a guess.
|
||||
freq_hz = float(source_spot["frequency"])
|
||||
if freq_hz < 1000000:
|
||||
freq_hz = freq_hz * 1000
|
||||
|
||||
# Convert to our spot format
|
||||
spot = Spot(source=self.name,
|
||||
source_id=source_spot["id"],
|
||||
dx_call=source_spot["activator"].upper(),
|
||||
de_call=source_spot["spotter"].upper(),
|
||||
freq=freq_hz,
|
||||
mode=source_spot["mode"].upper().strip(),
|
||||
comment=source_spot["comments"],
|
||||
sig="ZLOTA",
|
||||
sig_refs=[source_spot["reference"]],
|
||||
sig_refs_names=[source_spot["name"]],
|
||||
icon=get_icon_for_sig("ZLOTA"),
|
||||
time=datetime.fromisoformat(source_spot["referenced_time"]).astimezone(pytz.UTC).timestamp())
|
||||
|
||||
# ZLOTA name/lat/lon lookup
|
||||
zlota_data = self.LIST_CACHE.get(self.LIST_URL, headers=HTTP_HEADERS).json()
|
||||
for asset in zlota_data:
|
||||
if asset["code"] == spot.sig_refs[0]:
|
||||
spot.sig_refs_names = [asset["name"]]
|
||||
spot.dx_latitude = asset["y"]
|
||||
spot.dx_longitude = asset["x"]
|
||||
break
|
||||
|
||||
new_spots.append(spot)
|
||||
return new_spots
|
||||
Reference in New Issue
Block a user