mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2025-10-27 08:49:27 +00:00
44 lines
1.7 KiB
Python
44 lines
1.7 KiB
Python
from datetime import datetime
|
|
|
|
import pytz
|
|
|
|
from core.constants import SOFTWARE_NAME, SOFTWARE_VERSION, SERVER_OWNER_CALLSIGN
|
|
|
|
|
|
# Generic data provider class. Subclasses of this query the individual APIs for data.
|
|
class Provider:
|
|
|
|
# HTTP headers used for providers that use HTTP
|
|
HTTP_HEADERS = { "User-Agent": SOFTWARE_NAME + " " + SOFTWARE_VERSION + " (operated by " + SERVER_OWNER_CALLSIGN + ")" }
|
|
|
|
# Constructor
|
|
def __init__(self):
|
|
self.last_update_time = datetime.min.replace(tzinfo=pytz.UTC)
|
|
self.last_spot_time = datetime.min.replace(tzinfo=pytz.UTC)
|
|
self.status = "Not Started"
|
|
self.spot_list = None
|
|
|
|
# Return the name of the provider
|
|
def name(self):
|
|
raise NotImplementedError("Subclasses must implement this method")
|
|
|
|
# Set up the provider, e.g. giving it the spot list to work from
|
|
def setup(self, spot_list):
|
|
self.spot_list = spot_list
|
|
|
|
# Start the provider. This should return immediately after spawning threads to access the remote resources
|
|
def start(self):
|
|
raise NotImplementedError("Subclasses must implement this method")
|
|
|
|
# Submit one or more new spots retrieved from the provider. Only spots that are newer than the last spot retrieved
|
|
# by this provider will be added to the spot list, to prevent duplications. This is called by the subclasses on
|
|
# receiving spots.
|
|
def submit(self, spots):
|
|
for spot in spots:
|
|
if spot.time > self.last_spot_time:
|
|
self.spot_list.append(spot)
|
|
self.last_spot_time = max(map(lambda s: s.time, spots))
|
|
|
|
# Stop any threads and prepare for application shutdown
|
|
def stop(self):
|
|
raise NotImplementedError("Subclasses must implement this method") |