Files
spothole/spotproviders/http_spot_provider.py

62 lines
2.5 KiB
Python

import logging
from datetime import datetime
from threading import Thread, Event
import pytz
import requests
from core.constants import HTTP_HEADERS
from spotproviders.spot_provider import SpotProvider
# Generic spot provider class for providers that request data via HTTP(S). Just for convenience to avoid code
# duplication. Subclasses of this query the individual APIs for data.
class HTTPSpotProvider(SpotProvider):
def __init__(self, provider_config, url, poll_interval):
super().__init__(provider_config)
self.url = url
self.poll_interval = poll_interval
self._stop_event = Event()
def start(self):
# Fire off the polling thread. It will poll immediately on startup, then sleep for poll_interval between
# subsequent polls, so start() returns immediately and the application can continue starting.
logging.info("Set up query of " + self.name + " spot API every " + str(self.poll_interval) + " seconds.")
self._thread = Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
def _run(self):
while True:
self._poll()
if self._stop_event.wait(timeout=self.poll_interval):
break
def _poll(self):
try:
# Request data from API
logging.debug("Polling " + self.name + " spot API...")
http_response = requests.get(self.url, headers=HTTP_HEADERS)
# Pass off to the subclass for processing
new_spots = self.http_response_to_spots(http_response)
# Submit the new spots for processing. There might not be any spots for the less popular programs.
if new_spots:
self.submit_batch(new_spots)
self.status = "OK"
self.last_update_time = datetime.now(pytz.UTC)
logging.debug("Received data from " + self.name + " spot API.")
except Exception as e:
self.status = "Error"
logging.exception("Exception in HTTP JSON Spot Provider (" + self.name + ")")
self._stop_event.wait(timeout=1)
# Convert an HTTP response returned by the API into spot data. The whole response is provided here so the subclass
# implementations can check for HTTP status codes if necessary, and handle the response as JSON, XML, text, whatever
# the API actually provides.
def http_response_to_spots(self, http_response):
raise NotImplementedError("Subclasses must implement this method")