Files
spothole/spotproviders/aprsis.py

63 lines
2.1 KiB
Python

import logging
from datetime import datetime
from threading import Thread
import aprslib
import pytz
from core.config import SERVER_OWNER_CALLSIGN
from data.spot import Spot
from spotproviders.spot_provider import SpotProvider
class APRSIS(SpotProvider):
"""Spot provider for the APRS-IS."""
def __init__(self, provider_config):
super().__init__(provider_config)
self._thread = Thread(target=self._connect)
self._thread.daemon = True
self._aprsis = None
def start(self):
self._thread.start()
def _connect(self):
self._aprsis = aprslib.IS(SERVER_OWNER_CALLSIGN)
self.status = "Connecting"
logging.info("APRS-IS connecting...")
self._aprsis.connect()
self._aprsis.consumer(self._handle)
logging.info("APRS-IS connected.")
def stop(self):
self.status = "Shutting down"
self._aprsis.close()
self._thread.join()
def _handle(self, data):
# Split SSID in "from" call and store separately
from_parts = str(data["from"]).split("-")
dx_call = from_parts[0].upper()
dx_ssid = from_parts[1].upper() if len(from_parts) > 1 else None
via_parts = str(data["via"]).split("-")
de_call = via_parts[0].upper()
de_ssid = via_parts[1].upper() if len(via_parts) > 1 else None
spot = Spot(source="APRS-IS",
dx_call=dx_call,
dx_ssid=dx_ssid,
de_call=de_call,
de_ssid=de_ssid,
comment=str(data["comment"]) if "comment" in data else None,
dx_latitude=float(data["latitude"]) if "latitude" in data else None,
dx_longitude=float(data["longitude"]) if "longitude" in data else None,
time=datetime.now(
pytz.UTC).timestamp()) # APRS-IS spots are live so we can assume spot time is "now"
# Add to our list
self._submit(spot)
self.status = "OK"
self.last_update_time = datetime.now(pytz.UTC)
logging.debug("Data received from APRS-IS.")