import logging from datetime import datetime, timezone from threading import Thread import aprslib import pytz from core.config import SERVER_OWNER_CALLSIGN from data.spot import Spot from spotproviders.spot_provider import SpotProvider # Spot provider for the APRS-IS. class APRSIS(SpotProvider): def __init__(self, provider_config): super().__init__(provider_config) self.thread = Thread(target=self.connect) self.thread.daemon = True self.aprsis = None def start(self): self.thread.start() def connect(self): self.aprsis = aprslib.IS(SERVER_OWNER_CALLSIGN) self.status = "Connecting" logging.info("APRS-IS connecting...") self.aprsis.connect() self.aprsis.consumer(self.handle) logging.info("APRS-IS connected.") def stop(self): self.status = "Shutting down" self.aprsis.close() self.thread.join() def handle(self, data): # Split SSID in "from" call and store separately from_parts = data["from"].split("-") dx_call = from_parts[0] dx_aprs_ssid = from_parts[1] if len(from_parts) > 1 else None spot = Spot(source="APRS-IS", dx_call=dx_call, dx_aprs_ssid=dx_aprs_ssid, de_call=data["via"], comment=data["comment"] if "comment" in data else None, latitude=data["latitude"] if "latitude" in data else None, longitude=data["longitude"] if "longitude" in data else None, icon="tower-cell", time=datetime.now(pytz.UTC).timestamp()) # APRS-IS spots are live so we can assume spot time is "now" # Add to our list self.submit(spot) self.status = "OK" self.last_update_time = datetime.now(timezone.utc) logging.debug("Data received from APRS-IS.")