Files
spothole/spotproviders/websocket_spot_provider.py
2025-12-23 21:58:32 +00:00

75 lines
2.6 KiB
Python

import logging
from datetime import datetime
from threading import Thread
from time import sleep
import pytz
from websocket import create_connection
from core.constants import HTTP_HEADERS
from spotproviders.spot_provider import SpotProvider
# Spot provider using websockets.
class WebsocketSpotProvider(SpotProvider):
def __init__(self, provider_config, url):
super().__init__(provider_config)
self.url = url
self.ws = None
self.thread = None
self.stopped = False
self.last_event_id = None
def start(self):
logging.info("Set up websocket connection to " + self.name + " spot API.")
self.stopped = False
self.thread = Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.stopped = True
if self.ws:
self.ws.close()
if self.thread:
self.thread.join()
def _on_open(self):
self.status = "Waiting for Data"
def _on_error(self):
self.status = "Connecting"
def run(self):
while not self.stopped:
try:
logging.debug("Connecting to " + self.name + " spot API...")
self.status = "Connecting"
self.ws = create_connection(self.url, header=HTTP_HEADERS)
self.status = "Connected"
data = self.ws.recv()
if data:
try:
new_spot = self.ws_message_to_spot(data)
if new_spot:
self.submit(new_spot)
self.status = "OK"
self.last_update_time = datetime.now(pytz.UTC)
logging.debug("Received data from " + self.name + " spot API.")
except Exception as e:
logging.exception("Exception processing message from Websocket Spot Provider (" + self.name + ")")
except Exception as e:
self.status = "Error"
logging.exception("Exception in Websocket Spot Provider (" + self.name + ")", e)
else:
self.status = "Disconnected"
sleep(5) # Wait before trying to reconnect
# Convert a WS message received from the API into a spot. The exact message data (in bytes) is provided here so the
# subclass implementations can handle the message as string, JSON, XML, whatever the API actually provides.
def ws_message_to_spot(self, bytes):
raise NotImplementedError("Subclasses must implement this method")