mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2025-10-27 16:59:25 +00:00
93 lines
3.9 KiB
Python
93 lines
3.9 KiB
Python
import logging
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from threading import Thread
|
|
from time import sleep
|
|
|
|
import pytz
|
|
import telnetlib3
|
|
|
|
from data.spot import Spot
|
|
from core.config import SERVER_OWNER_CALLSIGN
|
|
from spotproviders.spot_provider import SpotProvider
|
|
|
|
|
|
# Spot provider for the Reverse Beacon Network. Connects to a single port, if you want both CW/RTTY (port 7000) and FT8
|
|
# (port 7001) you need to instantiate two copies of this. The port is provided as an argument to the constructor.
|
|
class RBN(SpotProvider):
|
|
CALLSIGN_PATTERN = "([a-z|0-9|/]+)"
|
|
FREQUENCY_PATTERM = "([0-9|.]+)"
|
|
LINE_PATTERN = re.compile(
|
|
"^DX de " + CALLSIGN_PATTERN + "-.*:\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)",
|
|
re.IGNORECASE)
|
|
|
|
# Constructor requires port number.
|
|
def __init__(self, provider_config):
|
|
super().__init__(provider_config)
|
|
self.port = provider_config["port"]
|
|
self.telnet = None
|
|
self.thread = Thread(target=self.handle)
|
|
self.thread.daemon = True
|
|
self.run = True
|
|
|
|
|
|
def start(self):
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
self.run = False
|
|
self.telnet.close()
|
|
self.thread.join()
|
|
|
|
def handle(self):
|
|
while self.run:
|
|
connected = False
|
|
while not connected and self.run:
|
|
try:
|
|
self.status = "Connecting"
|
|
logging.info("RBN port " + str(self.port) + " connecting...")
|
|
self.telnet = telnetlib3.Telnet("telnet.reversebeacon.net", self.port)
|
|
telnet_output = self.telnet.read_until("Please enter your call: ".encode("latin-1"))
|
|
self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("latin-1"))
|
|
connected = True
|
|
logging.info("RBN port " + str(self.port) + " connected.")
|
|
except Exception as e:
|
|
self.status = "Error"
|
|
logging.exception("Exception while connecting to RBN (port " + str(self.port) + ").")
|
|
sleep(5)
|
|
|
|
self.status = "Waiting for Data"
|
|
while connected and self.run:
|
|
try:
|
|
# Check new telnet info against regular expression
|
|
telnet_output = self.telnet.read_until("\n".encode("latin-1"))
|
|
match = self.LINE_PATTERN.match(telnet_output.decode("latin-1"))
|
|
if match:
|
|
spot_time = datetime.strptime(match.group(5), "%H%MZ")
|
|
spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC)
|
|
spot = Spot(source=self.name,
|
|
dx_call=match.group(3),
|
|
de_call=match.group(1),
|
|
freq=float(match.group(2)) * 1000,
|
|
comment=match.group(4).strip(),
|
|
icon="tower-cell",
|
|
time=spot_datetime.timestamp())
|
|
|
|
# Add to our list
|
|
self.submit(spot)
|
|
|
|
self.status = "OK"
|
|
self.last_update_time = datetime.now(timezone.utc)
|
|
logging.debug("Data received from RBN on port " + str(self.port) + ".")
|
|
|
|
except Exception as e:
|
|
connected = False
|
|
if self.run:
|
|
self.status = "Error"
|
|
logging.exception("Exception in RBN provider (port " + str(self.port) + ")")
|
|
sleep(5)
|
|
else:
|
|
logging.info("RBN provider (port " + str(self.port) + ") shutting down...")
|
|
self.status = "Shutting down"
|
|
|
|
self.status = "Disconnected" |