Files
spothole/providers/rbn.py
2025-10-02 09:38:05 +01:00

92 lines
3.8 KiB
Python

import logging
import re
from datetime import datetime, timezone
from threading import Thread
from time import sleep
import pytz
import telnetlib3
from data.spot import Spot
from core.config import config
from providers.provider import Provider
# Provider for the Reverse Beacon Network. Connects to a single port, if you want both CW/RTTY (port 7000) and FT8
# (port 7001) you need to instantiate two copies of this. The port is provided as an argument to the constructor.
class RBN(Provider):
CALLSIGN_PATTERN = "([a-z|0-9|/]+)"
FREQUENCY_PATTERM = "([0-9|.]+)"
LINE_PATTERN = re.compile(
"^DX de " + CALLSIGN_PATTERN + "-.*:\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)",
re.IGNORECASE)
# Constructor requires port number.
def __init__(self, provider_config):
super().__init__(provider_config)
self.port = provider_config["port"]
self.telnet = None
self.thread = Thread(target=self.handle)
self.thread.daemon = True
self.run = True
def start(self):
self.thread.start()
def stop(self):
self.run = False
self.telnet.close()
self.thread.join()
def handle(self):
while self.run:
connected = False
while not connected and self.run:
try:
self.status = "Connecting"
logging.info("RBN port " + str(self.port) + " connecting...")
self.telnet = telnetlib3.Telnet("telnet.reversebeacon.net", self.port)
telnet_output = self.telnet.read_until("Please enter your call: ".encode("latin-1"))
self.telnet.write((config["server-owner-callsign"] + "\n").encode("latin-1"))
connected = True
logging.info("RBN port " + str(self.port) + " connected.")
except Exception as e:
self.status = "Error"
logging.exception("Exception while connecting to RBN (port " + str(self.port) + ").")
sleep(5)
self.status = "Waiting for Data"
while connected and self.run:
try:
# Check new telnet info against regular expression
telnet_output = self.telnet.read_until("\n".encode("latin-1"))
match = self.LINE_PATTERN.match(telnet_output.decode("latin-1"))
if match:
spot_time = datetime.strptime(match.group(5), "%H%MZ")
spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC)
spot = Spot(source=self.name,
dx_call=match.group(3),
de_call=match.group(1),
freq=float(match.group(2)),
comment=match.group(4).strip(),
time=spot_datetime)
# Add to our list
self.submit(spot)
self.status = "OK"
self.last_update_time = datetime.now(timezone.utc)
logging.debug("Data received from RBN on port " + str(self.port) + ".")
except Exception as e:
connected = False
if self.run:
self.status = "Error"
logging.exception("Exception in RBN provider (port " + str(self.port) + ")")
sleep(5)
else:
logging.info("RBN provider (port " + str(self.port) + ") shutting down...")
self.status = "Shutting down"
self.status = "Disconnected"