mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2025-10-27 08:49:27 +00:00
97 lines
4.1 KiB
Python
97 lines
4.1 KiB
Python
import logging
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from threading import Thread
|
|
from time import sleep
|
|
|
|
import pytz
|
|
import telnetlib3
|
|
|
|
from data.spot import Spot
|
|
from core.config import SERVER_OWNER_CALLSIGN
|
|
from spotproviders.spot_provider import SpotProvider
|
|
|
|
|
|
# Spot provider for a DX Cluster. Hostname port and login_prompt provided as parameters.
|
|
class DXCluster(SpotProvider):
|
|
# Note the callsign pattern deliberately excludes calls ending in "-#", which are from RBN and can be enabled by
|
|
# default on some clusters. If you want RBN spots, there is a separate provider for that.
|
|
CALLSIGN_PATTERN = "([a-z|0-9|/]+)"
|
|
FREQUENCY_PATTERM = "([0-9|.]+)"
|
|
LINE_PATTERN = re.compile(
|
|
"^DX de " + CALLSIGN_PATTERN + ":\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)",
|
|
re.IGNORECASE)
|
|
|
|
# Constructor requires hostname and port
|
|
def __init__(self, provider_config):
|
|
super().__init__(provider_config)
|
|
self.hostname = provider_config["host"]
|
|
self.port = provider_config["port"]
|
|
self.login_prompt = provider_config["login_prompt"]
|
|
self.telnet = None
|
|
self.thread = Thread(target=self.handle)
|
|
self.thread.daemon = True
|
|
self.run = True
|
|
|
|
def start(self):
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
self.run = False
|
|
self.telnet.close()
|
|
self.thread.join()
|
|
|
|
def handle(self):
|
|
while self.run:
|
|
connected = False
|
|
while not connected and self.run:
|
|
try:
|
|
self.status = "Connecting"
|
|
logging.info("DX Cluster " + self.hostname + " connecting...")
|
|
self.telnet = telnetlib3.Telnet(self.hostname, self.port)
|
|
self.telnet.read_until(self.login_prompt.encode("latin-1"))
|
|
self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("latin-1"))
|
|
connected = True
|
|
logging.info("DX Cluster " + self.hostname + " connected.")
|
|
except Exception as e:
|
|
self.status = "Error"
|
|
logging.exception("Exception while connecting to DX Cluster Provider (" + self.hostname + ").")
|
|
sleep(5)
|
|
|
|
self.status = "Waiting for Data"
|
|
while connected and self.run:
|
|
try:
|
|
# Check new telnet info against regular expression
|
|
telnet_output = self.telnet.read_until("\n".encode("latin-1"))
|
|
match = self.LINE_PATTERN.match(telnet_output.decode("latin-1"))
|
|
if match:
|
|
spot_time = datetime.strptime(match.group(5), "%H%MZ")
|
|
spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC)
|
|
spot = Spot(source=self.name,
|
|
dx_call=match.group(3),
|
|
de_call=match.group(1),
|
|
freq=float(match.group(2)) * 1000,
|
|
comment=match.group(4).strip(),
|
|
icon="desktop",
|
|
time=spot_datetime.timestamp())
|
|
|
|
# See if the comment looks like it contains a SIG (and optionally SIG reference)
|
|
|
|
# Add to our list
|
|
self.submit(spot)
|
|
|
|
self.status = "OK"
|
|
self.last_update_time = datetime.now(timezone.utc)
|
|
logging.debug("Data received from DX Cluster " + self.hostname + ".")
|
|
|
|
except Exception as e:
|
|
connected = False
|
|
if self.run:
|
|
self.status = "Error"
|
|
logging.exception("Exception in DX Cluster Provider (" + self.hostname + ")")
|
|
sleep(5)
|
|
else:
|
|
logging.info("DX Cluster " + self.hostname + " shutting down...")
|
|
self.status = "Shutting down"
|
|
|
|
self.status = "Disconnected" |