mirror of
https://git.ianrenton.com/ian/spothole.git
synced 2026-02-04 09:14:30 +00:00
Re-implement xOTA using Websocket client
This commit is contained in:
74
spotproviders/websocket_spot_provider.py
Normal file
74
spotproviders/websocket_spot_provider.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
|
||||
import pytz
|
||||
from websocket import create_connection
|
||||
|
||||
from core.constants import HTTP_HEADERS
|
||||
from spotproviders.spot_provider import SpotProvider
|
||||
|
||||
|
||||
# Spot provider using websockets.
|
||||
class WebsocketSpotProvider(SpotProvider):
|
||||
|
||||
def __init__(self, provider_config, url):
|
||||
super().__init__(provider_config)
|
||||
self.url = url
|
||||
self.ws = None
|
||||
self.thread = None
|
||||
self.stopped = False
|
||||
self.last_event_id = None
|
||||
|
||||
def start(self):
|
||||
logging.info("Set up websocket connection to " + self.name + " spot API.")
|
||||
self.stopped = False
|
||||
self.thread = Thread(target=self.run)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.stopped = True
|
||||
if self.ws:
|
||||
self.ws.close()
|
||||
if self.thread:
|
||||
self.thread.join()
|
||||
|
||||
def _on_open(self):
|
||||
self.status = "Waiting for Data"
|
||||
|
||||
def _on_error(self):
|
||||
self.status = "Connecting"
|
||||
|
||||
def run(self):
|
||||
while not self.stopped:
|
||||
try:
|
||||
logging.debug("Connecting to " + self.name + " spot API...")
|
||||
self.status = "Connecting"
|
||||
self.ws = create_connection(self.url, header=HTTP_HEADERS)
|
||||
data = self.ws.recv()
|
||||
if data:
|
||||
try:
|
||||
new_spot = self.ws_message_to_spot(data)
|
||||
if new_spot:
|
||||
self.submit(new_spot)
|
||||
|
||||
self.status = "OK"
|
||||
self.last_update_time = datetime.now(pytz.UTC)
|
||||
logging.debug("Received data from " + self.name + " spot API.")
|
||||
|
||||
except Exception as e:
|
||||
logging.exception("Exception processing message from Websocket Spot Provider (" + self.name + ")")
|
||||
|
||||
except Exception as e:
|
||||
self.status = "Error"
|
||||
logging.exception("Exception in Websocket Spot Provider (" + self.name + ")", e)
|
||||
else:
|
||||
self.status = "Disconnected"
|
||||
sleep(5) # Wait before trying to reconnect
|
||||
|
||||
# Convert a WS message received from the API into a spot. The exact message data (in bytes) is provided here so the
|
||||
# subclass implementations can handle the message as string, JSON, XML, whatever the API actually provides.
|
||||
def ws_message_to_spot(self, bytes):
|
||||
raise NotImplementedError("Subclasses must implement this method")
|
||||
Reference in New Issue
Block a user