import csv import logging import re from datetime import datetime, timedelta import pytz from requests_cache import CachedSession from core.constants import HTTP_HEADERS from core.sig_utils import get_icon_for_sig from data.spot import Spot from spotproviders.http_spot_provider import HTTPSpotProvider # Spot provider for ZLOTA class ZLOTA(HTTPSpotProvider): POLL_INTERVAL_SEC = 120 SPOTS_URL = "https://ontheair.nz/api/spots?zlota_only=true" LIST_URL = "https://ontheair.nz/assets/assets.json" LIST_CACHE_TIME_DAYS = 30 LIST_CACHE = CachedSession("cache/zlota_data_cache", expire_after=timedelta(days=LIST_CACHE_TIME_DAYS)) def __init__(self, provider_config): super().__init__(provider_config, self.SPOTS_URL, self.POLL_INTERVAL_SEC) def http_response_to_spots(self, http_response): new_spots = [] # Iterate through source data for source_spot in http_response.json(): # Frequency is often inconsistent as to whether it's in Hz or kHz. Make a guess. freq_hz = float(source_spot["frequency"]) if freq_hz < 1000000: freq_hz = freq_hz * 1000 # Convert to our spot format spot = Spot(source=self.name, source_id=source_spot["id"], dx_call=source_spot["activator"].upper(), de_call=source_spot["spotter"].upper(), freq=freq_hz, mode=source_spot["mode"].upper().strip(), comment=source_spot["comments"], sig="ZLOTA", sig_refs=[source_spot["reference"]], sig_refs_names=[source_spot["name"]], icon=get_icon_for_sig("ZLOTA"), time=datetime.fromisoformat(source_spot["referenced_time"]).astimezone(pytz.UTC).timestamp()) # ZLOTA name/lat/lon lookup zlota_data = self.LIST_CACHE.get(self.LIST_URL, headers=HTTP_HEADERS).json() for asset in zlota_data: if asset["code"] == spot.sig_refs[0]: spot.sig_refs_names = [asset["name"]] spot.dx_latitude = asset["y"] spot.dx_longitude = asset["x"] break new_spots.append(spot) return new_spots