from datetime import datetime, timezone import pytz from data.spot import Spot from providers.provider import Provider from threading import Timer import requests class POTA(Provider): POLL_INTERVAL_SEC = 120 SPOTS_URL = "https://api.pota.app/spot/activator" def __init__(self): super().__init__() self.poll_timer = None def name(self): return "POTA" def start(self): self.poll() def stop(self): self.poll_timer.cancel() def poll(self): try: # Request data from API source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS).json() # Build a list of spots we haven't seen before new_spots = [] # Iterate through source data for source_spot in source_data: # Convert to our spot format spot = Spot(source=self.name(), source_id=source_spot["spotId"], dx_call=source_spot["activator"], de_call=source_spot["spotter"], freq=float(source_spot["frequency"]), mode=source_spot["mode"], comment=source_spot["comments"], sig="POTA", sig_refs=[source_spot["reference"]], sig_refs_names=[source_spot["name"]], time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=pytz.UTC), grid=source_spot["grid6"], latitude=source_spot["latitude"], longitude=source_spot["longitude"], qrt="QRT" in source_spot["comments"].upper()) # Fill in any blanks spot.infer_missing() # Add to our list new_spots.append(spot) # Submit the new spots for processing self.submit(new_spots) self.status = "OK" self.last_update_time = datetime.now(timezone.utc) except requests.exceptions.RequestException as e: self.status = "Error" self.poll_timer = Timer(self.POLL_INTERVAL_SEC, self.poll) self.poll_timer.start()