diff --git a/.gitignore b/.gitignore index 507bc33..a1c38ff 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /.venv __pycache__ *.pyc +/sota_summit_data_cache.sqlite diff --git a/core/constants.py b/core/constants.py index b3ed0b5..87ab3c8 100644 --- a/core/constants.py +++ b/core/constants.py @@ -5,6 +5,12 @@ SOFTWARE_NAME = "Metaspot by M0TRT" SOFTWARE_VERSION = "0.1" SERVER_OWNER_CALLSIGN = "M0TRT" +# Modes +CW_MODES = ["CW"] +PHONE_MODES = ["PHONE", "SSB", "USB", "LSB", "AM", "FM", "DV", "DMR", "DSTAR", "C4FM", "M17"] +DATA_MODES = ["DIGI", "DATA", "FT8", "FT4", "RTTY", "SSTV", "JS8", "HELL", "BPSK", "PSK", "BPSK31", "OLIVIA"] +ALL_MODES = CW_MODES + PHONE_MODES + DATA_MODES + # Band definitions BANDS = [ Band(name="160m", start_freq=1800, end_freq=2000, color="#7cfc00", contrast_color="black"), diff --git a/core/utils.py b/core/utils.py index e86d96b..541698f 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,4 +1,4 @@ -from core.constants import BANDS, UNKNOWN_BAND +from core.constants import BANDS, UNKNOWN_BAND, CW_MODES, PHONE_MODES, DATA_MODES, ALL_MODES from pyhamtools import LookupLib, Callinfo # Static lookup helpers from pyhamtools @@ -6,14 +6,24 @@ from pyhamtools import LookupLib, Callinfo lookuplib = LookupLib(lookuptype="countryfile") callinfo = Callinfo(lookuplib) +# Infer a mode from the comment +def infer_mode_from_comment(comment): + for mode in ALL_MODES: + if mode in comment.upper(): + return mode + return None + # Infer a "mode family" from a mode. def infer_mode_family_from_mode(mode): - if mode.upper() == "CW": + if mode.upper() in CW_MODES: return "CW" - elif mode.upper() in ["PHONE", "SSB", "USB", "LSB", "AM", "FM", "DMR", "DSTAR", "C4FM", "M17"]: + elif mode.upper() in PHONE_MODES: return "PHONE" + elif mode.upper() in DATA_MODES: + return "DATA" else: - return "DIGI" + print("Found an unrecognised mode: " + mode + ". Developer should categorise this.") + return None # Infer a band from a frequency in kHz def infer_band_from_freq(freq): diff --git a/data/spot.py b/data/spot.py index 2cdd359..16f7392 100644 --- a/data/spot.py +++ b/data/spot.py @@ -1,11 +1,14 @@ from dataclasses import dataclass from datetime import datetime +import pytz from pyhamtools.locator import locator_to_latlong, latlong_to_locator from core.constants import DXCC_FLAGS from core.utils import infer_mode_family_from_mode, infer_band_from_freq, infer_continent_from_callsign, \ - infer_country_from_callsign, infer_cq_zone_from_callsign, infer_itu_zone_from_callsign, infer_dxcc_id_from_callsign + infer_country_from_callsign, infer_cq_zone_from_callsign, infer_itu_zone_from_callsign, infer_dxcc_id_from_callsign, \ + infer_mode_from_comment + # Data class that defines a spot. @dataclass @@ -14,6 +17,8 @@ class Spot: dx_call: str = None # Callsign of the operator that has spotted them de_call: str = None + # Name of the operator that has been spotted + dx_name: str = None # Country of the DX operator dx_country: str = None # Country of the spotter @@ -48,6 +53,9 @@ class Spot: band_contrast_color: str = None # Time of the spot time: datetime = None + # Time that this software received the spot. This is used with the "since" call to our API to receive all data that + # is new to us, even if by a quirk of the API it might be older than the list time the client polled the API. + received_time: datetime = datetime.now(pytz.UTC), # Comment left by the spotter, if any comment: str = None # Special Interest Group (SIG), e.g. outdoor activity programme such as POTA @@ -56,6 +64,8 @@ class Spot: sig_refs: list = None # SIG reference names sig_refs_names: list = None + # Activation score. SOTA only + activation_score: int = None # Maidenhead grid locator for the spot. This could be from a geographical reference e.g. POTA, or just from the country grid: str = None # Latitude & longitude, in degrees. This could be from a geographical reference e.g. POTA, or just from the country @@ -70,6 +80,7 @@ class Spot: # Infer missing parameters where possible def infer_missing(self): + # DX country, continent, zones etc. from callsign if self.dx_call and not self.dx_country: self.dx_country = infer_country_from_callsign(self.dx_call) if self.dx_call and not self.dx_continent: @@ -83,6 +94,7 @@ class Spot: if self.dx_dxcc_id and not self.dx_flag: self.dx_flag = DXCC_FLAGS[self.dx_dxcc_id] + # Spotter country, continent, zones etc. from callsign if self.de_call and not self.de_country: self.de_country = infer_country_from_callsign(self.de_call) if self.de_call and not self.de_continent: @@ -92,22 +104,31 @@ class Spot: if self.de_dxcc_id and not self.de_flag: self.de_flag = DXCC_FLAGS[self.de_dxcc_id] + # Band from frequency if self.freq and not self.band: band = infer_band_from_freq(self.freq) self.band = band.name self.band_color = band.color self.band_contrast_color = band.contrast_color + # Mode from comments, mode family from mode + if self.comment and not self.mode: + self.mode=infer_mode_from_comment(self.comment) if self.mode and not self.mode_family: self.mode_family=infer_mode_family_from_mode(self.mode) + # Grid to lat/lon and vice versa if self.grid and not self.latitude: ll = locator_to_latlong(self.grid) self.latitude = ll[0] self.longitude = ll[1] - if self.latitude and self.longitude and not self.grid: self.grid = latlong_to_locator(self.latitude, self.longitude, 8) - # TODO use QRZ provider to get grids, lat Lon, DX name - # TODO lat/lon from DXCC centre? \ No newline at end of file + # QRT comment detection + if self.comment and not self.qrt: + self.qrt = "QRT" in self.comment.upper() + + # TODO use QRZ/HamQTH provider to get grids, lat Lon, when missing; and DX name + # credentials in config file which is .gitignored; sample provided + # TODO lat/lon from DXCC centre as last resort? \ No newline at end of file diff --git a/main.py b/main.py index bebcd71..69144cb 100644 --- a/main.py +++ b/main.py @@ -1,14 +1,17 @@ # Main script import signal -from time import sleep from providers.dxcluster import DXCluster +from providers.gma import GMA from providers.pota import POTA +from providers.sota import SOTA +from providers.wwbota import WWBOTA +from providers.wwff import WWFF # Shutdown function def shutdown(sig, frame): - # Start data providers + print("Stopping program, this may take a few seconds...") for p in providers: p.stop() @@ -20,8 +23,18 @@ if __name__ == '__main__': # Create providers providers = [ POTA(), - DXCluster("hrd.wa9pie.net", 8000) - ] # todo all other providers + SOTA(), + WWFF(), + WWBOTA(), + GMA(), + # todo HEMA + # todo PNP + # todo RBN + # todo packet? + # todo APRS? + DXCluster("hrd.wa9pie.net", 8000), + # DXCluster("dxc.w3lpl.net", 22) + ] # Set up spot list spot_list = [] # Set up data providers @@ -32,12 +45,16 @@ if __name__ == '__main__': # todo thread to clear spot list of old data # Todo serve spot API + # Todo spot API arguments e.g. "since" based on received_time of spots, sig only, dx cont, dxcc, de cont, band, mode, filter out qrt, filter pre-qsy # Todo serve status API # Todo serve apidocs # Todo serve website +# TODO NOTES FOR NGINX REVERSE PROXY +# local cache time of 15 sec to avoid over burdening python? -# NOTES FOR FIELD SPOTTER +# TODO NOTES FOR FIELD SPOTTER # Still need to de-dupe spots -# Still need to do QSY checking \ No newline at end of file +# Still need to do QSY checking in FS because we can enable/disable showing them and don't want to re-query the API. +# Filter comments, still do in FS or move that here? \ No newline at end of file diff --git a/providers/dxcluster.py b/providers/dxcluster.py index 7741b98..2420720 100644 --- a/providers/dxcluster.py +++ b/providers/dxcluster.py @@ -1,19 +1,23 @@ +import logging +import re from datetime import datetime, timezone from threading import Thread +from time import sleep import pytz +import telnetlib3 from core.constants import SERVER_OWNER_CALLSIGN from data.spot import Spot from providers.provider import Provider -import telnetlib3 -import re - -callsign_pattern = "([a-z|0-9|/]+)" -frequency_pattern = "([0-9|.]+)" -pattern = re.compile("^DX de "+callsign_pattern+":\\s+"+frequency_pattern+"\\s+"+callsign_pattern+"\\s+(.*)\\s+(\\d{4}Z)", re.IGNORECASE) +# Provider for a DX Cluster. Hostname and port provided as parameters. class DXCluster(Provider): + CALLSIGN_PATTERN = "([a-z|0-9|/]+)" + FREQUENCY_PATTERM = "([0-9|.]+)" + LINE_PATTERN = re.compile( + "^DX de " + CALLSIGN_PATTERN + ":\\s+" + FREQUENCY_PATTERM + "\\s+" + CALLSIGN_PATTERN + "\\s+(.*)\\s+(\\d{4}Z)", + re.IGNORECASE) # Constructor requires hostname and port def __init__(self, hostname, port): @@ -25,7 +29,7 @@ class DXCluster(Provider): self.run = True def name(self): - return "DX Cluster " + self.hostname + " " + str(self.port) + return "DX Cluster " + self.hostname def start(self): self.thread = Thread(target=self.handle) @@ -37,29 +41,50 @@ class DXCluster(Provider): self.thread.join() def handle(self): - self.status = "Connecting" - self.telnet = telnetlib3.Telnet(self.hostname, self.port) - self.telnet.read_until("login: ".encode("ascii")) - self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("ascii")) - self.status = "Waiting for Data" - while self.run: - # Check new telnet info against regular expression - telnet_output = self.telnet.read_until("\n".encode("ascii")) - match = pattern.match(telnet_output.decode("ascii")) - if match: - spot_time = datetime.strptime(match.group(5), "%H%MZ") - spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC) - spot = Spot(source=self.name(), - dx_call=match.group(3), - de_call=match.group(1), - freq=float(match.group(2)), - comment=match.group(4).strip(), - time=spot_datetime) - # Fill in any blanks - spot.infer_missing() - # Add to our list - self.submit([spot]) + connected = False + while not connected and self.run: + try: + self.status = "Connecting" + self.telnet = telnetlib3.Telnet(self.hostname, self.port) + self.telnet.read_until("login: ".encode("ascii")) + self.telnet.write((SERVER_OWNER_CALLSIGN + "\n").encode("ascii")) + connected = True + except Exception as e: + self.status = "Error" + logging.exception("Exception while connecting to DX Cluster Provider (" + self.hostname + ").") + sleep(5) - self.status = "OK" - self.last_update_time = datetime.now(timezone.utc) + self.status = "Waiting for Data" + while connected and self.run: + try: + # Check new telnet info against regular expression + telnet_output = self.telnet.read_until("\n".encode("ascii")) + match = self.LINE_PATTERN.match(telnet_output.decode("ascii")) + if match: + spot_time = datetime.strptime(match.group(5), "%H%MZ") + spot_datetime = datetime.combine(datetime.today(), spot_time.time()).replace(tzinfo=pytz.UTC) + spot = Spot(source=self.name(), + dx_call=match.group(3), + de_call=match.group(1), + freq=float(match.group(2)), + comment=match.group(4).strip(), + time=spot_datetime) + # Fill in any blanks + spot.infer_missing() + # Add to our list + self.submit([spot]) + + self.status = "OK" + self.last_update_time = datetime.now(timezone.utc) + + except Exception as e: + connected = False + if self.run: + self.status = "Error" + logging.exception("Exception in DX Cluster Provider (" + self.hostname + ")") + sleep(5) + else: + self.status = "Shutting down" + + self.status = "Disconnected" \ No newline at end of file diff --git a/providers/gma.py b/providers/gma.py new file mode 100644 index 0000000..5e77fd7 --- /dev/null +++ b/providers/gma.py @@ -0,0 +1,51 @@ +from datetime import datetime, timedelta + +import pytz +from requests_cache import CachedSession + +from data.spot import Spot +from providers.http_provider import HTTPProvider + + +# Provider for General Mountain Activity +class GMA(HTTPProvider): + POLL_INTERVAL_SEC = 120 + SPOTS_URL = "https://www.cqgma.org/api/spots/25/" + # GMA spots don't contain the details of the programme they are for, we need a separate lookup for that + REF_INFO_URL_ROOT = "https://www.cqgma.org/api/ref/?" + REF_INFO_CACHE_TIME_DAYS = 30 + REF_INFO_CACHE = CachedSession("gma_ref_info_cache", expire_after=timedelta(days=REF_INFO_CACHE_TIME_DAYS)) + + def __init__(self): + super().__init__(self.SPOTS_URL, self.POLL_INTERVAL_SEC) + + def name(self): + return "GMA" + + def http_response_to_spots(self, http_response): + new_spots = [] + # Iterate through source data + for source_spot in http_response.json()["RCD"]: + # Convert to our spot format + spot = Spot(source=self.name(), + dx_call=source_spot["ACTIVATOR"].upper(), + de_call=source_spot["SPOTTER"].upper(), + freq=float(source_spot["QRG"]), + mode=source_spot["MODE"].upper(), + comment=source_spot["TEXT"], + sig_refs=[source_spot["REF"]], + sig_refs_names=[source_spot["NAME"]], + time=datetime.strptime(source_spot["DATE"] + source_spot["TIME"], "%Y%m%d%H%M").replace(tzinfo=pytz.UTC), + latitude=float(source_spot["LAT"]), + longitude=float(source_spot["LON"])) + + # GMA doesn't give what programme (SIG) the reference is for until we separately look it up. + ref_info = self.REF_INFO_CACHE.get(self.REF_INFO_URL_ROOT + source_spot["REF"], headers=self.HTTP_HEADERS).json() + spot.sig = ref_info["reftype"] + + # Fill in any missing data + spot.infer_missing() + # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do + # that for us. + new_spots.append(spot) + return new_spots \ No newline at end of file diff --git a/providers/http_provider.py b/providers/http_provider.py new file mode 100644 index 0000000..4b7f41c --- /dev/null +++ b/providers/http_provider.py @@ -0,0 +1,59 @@ +import logging +from datetime import datetime, timezone +from threading import Timer, Thread +from time import sleep + +import requests + +from providers.provider import Provider + + +# Generic data provider class for providers that request data via HTTP(S). Just for convenience to avoid code +# duplication. Subclasses of this query the individual APIs for data. +class HTTPProvider(Provider): + + def __init__(self, url, poll_interval): + super().__init__() + self.url = url + self.poll_interval = poll_interval + self.poll_timer = None + + def name(self): + raise NotImplementedError("Subclasses must implement this method") + + def start(self): + # Fire off a one-shot thread to run poll() for the first time, just to ensure start() returns immediately and + # the application can continue starting. The thread itself will then die, and the timer will kick in on its own + # thread. + thread = Thread(target=self.poll) + thread.start() + + def stop(self): + self.poll_timer.cancel() + + def poll(self): + try: + # Request data from API + http_response = requests.get(self.url, headers=self.HTTP_HEADERS) + # Pass off to the subclass for processing + new_spots = self.http_response_to_spots(http_response) + # Submit the new spots for processing. There might not be any spots for the less popular programs. + if new_spots: + self.submit(new_spots) + + self.status = "OK" + self.last_update_time = datetime.now(timezone.utc) + + except Exception as e: + self.status = "Error" + logging.exception("Exception in HTTP JSON Provider (" + self.name() + ")") + sleep(1) + + self.poll_timer = Timer(self.poll_interval, self.poll) + self.poll_timer.start() + + # Convert an HTTP response returned by the API into spot data. The whole response is provided here so the subclass + # implementations can check for HTTP status codes if necessary, and handle the response as JSON, XML, text, whatever + # the API actually provides. + def http_response_to_spots(self, http_response): + raise NotImplementedError("Subclasses must implement this method") \ No newline at end of file diff --git a/providers/pota.py b/providers/pota.py index 7f63999..5d74505 100644 --- a/providers/pota.py +++ b/providers/pota.py @@ -1,65 +1,44 @@ -from datetime import datetime, timezone -import pytz -from data.spot import Spot -from providers.provider import Provider -from threading import Timer -import requests +from datetime import datetime -class POTA(Provider): +import pytz + +from data.spot import Spot +from providers.http_provider import HTTPProvider + + +# Provider for Parks on the Air +class POTA(HTTPProvider): POLL_INTERVAL_SEC = 120 SPOTS_URL = "https://api.pota.app/spot/activator" def __init__(self): - super().__init__() - self.poll_timer = None + super().__init__(self.SPOTS_URL, self.POLL_INTERVAL_SEC) def name(self): return "POTA" - def start(self): - self.poll() - - def stop(self): - self.poll_timer.cancel() - - def poll(self): - try: - # Request data from API - source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS).json() - # Build a list of spots we haven't seen before - new_spots = [] - # Iterate through source data - for source_spot in source_data: - # Convert to our spot format - spot = Spot(source=self.name(), - source_id=source_spot["spotId"], - dx_call=source_spot["activator"], - de_call=source_spot["spotter"], - freq=float(source_spot["frequency"]), - mode=source_spot["mode"], - comment=source_spot["comments"], - sig="POTA", - sig_refs=[source_spot["reference"]], - sig_refs_names=[source_spot["name"]], - time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=pytz.UTC), - grid=source_spot["grid6"], - latitude=source_spot["latitude"], - longitude=source_spot["longitude"], - qrt="QRT" in source_spot["comments"].upper()) - # Fill in any blanks - spot.infer_missing() - # Add to our list - new_spots.append(spot) - - # Submit the new spots for processing - self.submit(new_spots) - - self.status = "OK" - self.last_update_time = datetime.now(timezone.utc) - - - except requests.exceptions.RequestException as e: - self.status = "Error" - - self.poll_timer = Timer(self.POLL_INTERVAL_SEC, self.poll) - self.poll_timer.start() \ No newline at end of file + def http_response_to_spots(self, http_response): + new_spots = [] + # Iterate through source data + for source_spot in http_response.json(): + # Convert to our spot format + spot = Spot(source=self.name(), + source_id=source_spot["spotId"], + dx_call=source_spot["activator"].upper(), + de_call=source_spot["spotter"].upper(), + freq=float(source_spot["frequency"]), + mode=source_spot["mode"].upper(), + comment=source_spot["comments"], + sig="POTA", + sig_refs=[source_spot["reference"]], + sig_refs_names=[source_spot["name"]], + time=datetime.strptime(source_spot["spotTime"], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=pytz.UTC), + grid=source_spot["grid6"], + latitude=source_spot["latitude"], + longitude=source_spot["longitude"]) + # Fill in any missing data + spot.infer_missing() + # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do + # that for us. + new_spots.append(spot) + return new_spots \ No newline at end of file diff --git a/providers/provider.py b/providers/provider.py index 3e962a1..898fcea 100644 --- a/providers/provider.py +++ b/providers/provider.py @@ -1,7 +1,10 @@ from datetime import datetime + import pytz + from core.constants import SOFTWARE_NAME, SOFTWARE_VERSION + # Generic data provider class. Subclasses of this query the individual APIs for data. class Provider: diff --git a/providers/sota.py b/providers/sota.py new file mode 100644 index 0000000..aa37a8d --- /dev/null +++ b/providers/sota.py @@ -0,0 +1,68 @@ +from datetime import datetime, timedelta + +import requests +from requests_cache import CachedSession + +from data.spot import Spot +from providers.http_provider import HTTPProvider + + +# Provider for Summits on the Air +class SOTA(HTTPProvider): + POLL_INTERVAL_SEC = 120 + # SOTA wants us to check for an "epoch" from the API and see if it's actually changed before querying the main data + # APIs. So it's actually the EPOCH_URL that we pass into the constructor and get the superclass to call on a timer. + # The actual data lookup all happens after parsing and checking the epoch. + EPOCH_URL = "https://api-db2.sota.org.uk/api/spots/epoch" + SPOTS_URL = "https://api-db2.sota.org.uk/api/spots/60/all/all" + # SOTA spots don't contain lat/lon, we need a separate lookup for that + SUMMIT_URL_ROOT = "https://api-db2.sota.org.uk/api/summits/" + SUMMIT_DATA_CACHE_TIME_DAYS = 30 + SUMMIT_DATA_CACHE = CachedSession("sota_summit_data_cache", expire_after=timedelta(days=SUMMIT_DATA_CACHE_TIME_DAYS)) + + def __init__(self): + super().__init__(self.EPOCH_URL, self.POLL_INTERVAL_SEC) + self.api_epoch = "" + + def name(self): + return "SOTA" + + def http_response_to_spots(self, http_response): + # OK, source data is actually just the epoch at this point. We'll then go on to fetch real data if we know this + # has changed. + epoch_changed = http_response.text != self.api_epoch + self.api_epoch = http_response.text + + new_spots = [] + # OK, if the epoch actually changed, now we make the real request for data. + if epoch_changed: + source_data = requests.get(self.SPOTS_URL, headers=self.HTTP_HEADERS).json() + # Iterate through source data + for source_spot in source_data: + # Convert to our spot format + spot = Spot(source=self.name(), + source_id=source_spot["id"], + dx_call=source_spot["activatorCallsign"].upper(), + dx_name=source_spot["activatorName"], + de_call=source_spot["callsign"].upper(), + freq=(float(source_spot["frequency"]) * 1000) if (source_spot["frequency"] is not None) else None, # Seen SOTA spots with no frequency! + mode=source_spot["mode"].upper(), + comment=source_spot["comments"], + sig="SOTA", + sig_refs=[source_spot["summitCode"]], + sig_refs_names=[source_spot["summitName"]], + time=datetime.fromisoformat(source_spot["timeStamp"]), + activation_score=source_spot["points"]) + + # SOTA doesn't give summit lat/lon/grid in the main call, so we need another separate call for this + summit_data = self.SUMMIT_DATA_CACHE.get(self.SUMMIT_URL_ROOT + source_spot["summitCode"], headers=self.HTTP_HEADERS).json() + spot.grid = summit_data["locator"] + spot.latitude = summit_data["latitude"] + spot.longitude = summit_data["longitude"] + + # Fill in any missing data + spot.infer_missing() + # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do + # that for us. + new_spots.append(spot) + return new_spots \ No newline at end of file diff --git a/providers/wwbota.py b/providers/wwbota.py new file mode 100644 index 0000000..d788aa3 --- /dev/null +++ b/providers/wwbota.py @@ -0,0 +1,51 @@ +from datetime import datetime + +from data.spot import Spot +from providers.http_provider import HTTPProvider + + +# Provider for Worldwide Bunkers on the Air +class WWBOTA(HTTPProvider): + POLL_INTERVAL_SEC = 120 + SPOTS_URL = "https://api.wwbota.org/spots/" + + def __init__(self): + super().__init__(self.SPOTS_URL, self.POLL_INTERVAL_SEC) + + def name(self): + return "WWBOTA" + + def http_response_to_spots(self, http_response): + new_spots = [] + # Iterate through source data + for source_spot in http_response.json(): + # Convert to our spot format. First we unpack references, because WWBOTA spots can have more than one for + # n-fer activations. + refs = [] + ref_names = [] + for ref in source_spot["references"]: + refs.append(ref["reference"]) + ref_names.append(ref["name"]) + spot = Spot(source=self.name(), + dx_call=source_spot["call"].upper(), + de_call=source_spot["spotter"].upper(), + freq=float(source_spot["freq"]) * 1000, # MHz to kHz + mode=source_spot["mode"].upper(), + comment=source_spot["comment"], + sig="WWBOTA", + sig_refs=refs, + sig_refs_names=ref_names, + time=datetime.fromisoformat(source_spot["time"]), + # WWBOTA spots can contain multiple references for bunkers being activated simultaneously. For + # now, we will just pick the first one to use as our grid, latitude and longitude. + grid=source_spot["references"][0]["locator"], + latitude=source_spot["references"][0]["lat"], + longitude=source_spot["references"][0]["long"], + qrt=source_spot["type"] == "QRT") + # Fill in any missing data + spot.infer_missing() + # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do + # that for us. But WWBOTA does support a special "Test" spot type, we need to avoid adding that. + if source_spot["type"] != "Test": + new_spots.append(spot) + return new_spots \ No newline at end of file diff --git a/providers/wwff.py b/providers/wwff.py new file mode 100644 index 0000000..bfcdf98 --- /dev/null +++ b/providers/wwff.py @@ -0,0 +1,43 @@ +from datetime import datetime + +import pytz + +from data.spot import Spot +from providers.http_provider import HTTPProvider + + +# Provider for Worldwide Flora & Fauna +class WWFF(HTTPProvider): + POLL_INTERVAL_SEC = 120 + SPOTS_URL = "https://spots.wwff.co/static/spots.json" + + def __init__(self): + super().__init__(self.SPOTS_URL, self.POLL_INTERVAL_SEC) + + def name(self): + return "WWFF" + + def http_response_to_spots(self, http_response): + new_spots = [] + # Iterate through source data + for source_spot in http_response.json(): + # Convert to our spot format + spot = Spot(source=self.name(), + source_id=source_spot["id"], + dx_call=source_spot["activator"].upper(), + de_call=source_spot["spotter"].upper(), + freq=float(source_spot["frequency_khz"]), + mode=source_spot["mode"].upper(), + comment=source_spot["remarks"], + sig="WWFF", + sig_refs=[source_spot["reference"]], + sig_refs_names=[source_spot["reference_name"]], + time=datetime.fromtimestamp(source_spot["spot_time"]).replace(tzinfo=pytz.UTC), + latitude=source_spot["latitude"], + longitude=source_spot["longitude"]) + # Fill in any missing data + spot.infer_missing() + # Add to our list. Don't worry about de-duping, removing old spots etc. at this point; other code will do + # that for us. + new_spots.append(spot) + return new_spots \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e6c547d..d709997 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ -pytz +requests-cache~=1.2.1 +pyhamtools~=0.12.0 +telnetlib3~=2.0.8 +pytz~=2025.2 requests~=2.32.5 \ No newline at end of file