import logging import re from datetime import datetime, timezone from threading import Thread from time import sleep import pytz import telnetlib3 from data.spot import Spot from core.config import config from providers.provider import Provider # Provider for the Reverse Beacon Network. Connects to a single port, if you want both CW/RTTY (port 7000) and FT8 # (port 7001) you need to instantiate two copies of this. The port is provided as an argument to the constructor. class RBN(Provider): CALLSIGN_PATTERN = "([a-z|0-9|/]+)" FREQUENCY_PATTERM = "([0-9|.]+)" LINE_PATTERN = re.compile( "^DX de " + CALLSIGN_PATTERN + "-.*:\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)", re.IGNORECASE) # Constructor requires port number. def __init__(self, port): super().__init__() self.port = port self.telnet = None self.thread = Thread(target=self.handle) self.thread.daemon = True self.run = True def name(self): return "RBN port " + str(self.port) def start(self): self.thread.start() def stop(self): self.run = False self.telnet.close() self.thread.join() def handle(self): while self.run: connected = False while not connected and self.run: try: self.status = "Connecting" logging.info("RBN port " + str(self.port) + " connecting...") self.telnet = telnetlib3.Telnet("telnet.reversebeacon.net", self.port) telnet_output = self.telnet.read_until("Please enter your call: ".encode("latin-1")) self.telnet.write((config["server-owner-callsign"] + "\n").encode("latin-1")) connected = True logging.info("RBN port " + str(self.port) + " connected.") except Exception as e: self.status = "Error" logging.exception("Exception while connecting to RBN (port " + str(self.port) + ").") sleep(5) self.status = "Waiting for Data" while connected and self.run: try: # Check new telnet info against regular expression telnet_output = self.telnet.read_until("\n".encode("latin-1")) match = self.LINE_PATTERN.match(telnet_output.decode("latin-1")) if match: spot_time = datetime.strptime(match.group(5), "%H%MZ") spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC) spot = Spot(source="RBN", dx_call=match.group(3), de_call=match.group(1), freq=float(match.group(2)), comment=match.group(4).strip(), time=spot_datetime) # Add to our list self.submit(spot) self.status = "OK" self.last_update_time = datetime.now(timezone.utc) logging.debug("Data received from RBN on port " + str(self.port) + ".") except Exception as e: connected = False if self.run: self.status = "Error" logging.exception("Exception in RBN provider (port " + str(self.port) + ")") sleep(5) else: logging.info("RBN provider (port " + str(self.port) + ") shutting down...") self.status = "Shutting down" self.status = "Disconnected"