import logging import re from datetime import datetime, timezone from threading import Thread from time import sleep import pytz import telnetlib3 from data.spot import Spot from core.config import SERVER_OWNER_CALLSIGN from spotproviders.spot_provider import SpotProvider # Spot provider for the Reverse Beacon Network. Connects to a single port, if you want both CW/RTTY (port 7000) and FT8 # (port 7001) you need to instantiate two copies of this. The port is provided as an argument to the constructor. class RBN(SpotProvider): CALLSIGN_PATTERN = "([a-z|0-9|/]+)" FREQUENCY_PATTERM = "([0-9|.]+)" LINE_PATTERN = re.compile( "^DX de " + CALLSIGN_PATTERN + "-.*:\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)", re.IGNORECASE) # Constructor requires port number. def __init__(self, provider_config): super().__init__(provider_config) self.port = provider_config["port"] self.telnet = None self.thread = Thread(target=self.handle) self.thread.daemon = True self.run = True def start(self): self.thread.start() def stop(self): self.run = False self.telnet.close() self.thread.join() def handle(self): while self.run: connected = False while not connected and self.run: try: self.status = "Connecting" logging.info("RBN port " + str(self.port) + " connecting...") self.telnet = telnetlib3.Telnet("telnet.reversebeacon.net", self.port) telnet_output = self.telnet.read_until("Please enter your call: ".encode("latin-1")) self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("latin-1")) connected = True logging.info("RBN port " + str(self.port) + " connected.") except Exception as e: self.status = "Error" logging.exception("Exception while connecting to RBN (port " + str(self.port) + ").") sleep(5) self.status = "Waiting for Data" while connected and self.run: try: # Check new telnet info against regular expression telnet_output = self.telnet.read_until("\n".encode("latin-1")) match = self.LINE_PATTERN.match(telnet_output.decode("latin-1")) if match: spot_time = datetime.strptime(match.group(5), "%H%MZ") spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC) spot = Spot(source=self.name, dx_call=match.group(3), de_call=match.group(1), freq=float(match.group(2)) * 1000, comment=match.group(4).strip(), icon="tower-cell", time=spot_datetime.timestamp()) # Add to our list self.submit(spot) self.status = "OK" self.last_update_time = datetime.now(timezone.utc) logging.debug("Data received from RBN on port " + str(self.port) + ".") except Exception as e: connected = False if self.run: self.status = "Error" logging.exception("Exception in RBN provider (port " + str(self.port) + ")") sleep(5) else: logging.info("RBN provider (port " + str(self.port) + ") shutting down...") self.status = "Shutting down" self.status = "Disconnected"