ccxt.pro - move get_klines to ws_exchange

This commit is contained in:
Matthias
2022-10-29 19:19:30 +02:00
parent e2b567165c
commit 18dabd519a
2 changed files with 17 additions and 14 deletions

View File

@@ -2228,15 +2228,6 @@ class Exchange:
data = sorted(data, key=lambda x: x[0]) data = sorted(data, key=lambda x: x[0])
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
async def _async_watch_ohlcv(
self, pair: str, timeframe: str, candle_type: CandleType
) -> Tuple[str, str, str, List]:
candles = self._exchange_ws.ccxt_object.ohlcvs.get(pair, {}).get(timeframe)
# Fake 1 candle - which is then removed again
candles.append([int(datetime.now(timezone.utc).timestamp() * 1000), 0, 0, 0, 0, 0])
logger.info(f"watch result for {pair}, {timeframe} with length {len(candles)}")
return pair, timeframe, candle_type, candles
def _build_coroutine( def _build_coroutine(
self, self,
pair: str, pair: str,
@@ -2264,7 +2255,7 @@ class Exchange:
# Usable result ... # Usable result ...
logger.info(f"reuse watch result for {pair}, {timeframe}, {x}") logger.info(f"reuse watch result for {pair}, {timeframe}, {x}")
return self._async_watch_ohlcv(pair, timeframe, candle_type) return self._exchange_ws.get_ohlcv(pair, timeframe, candle_type)
# Check if 1 call can get us updated candles without hole in the data. # Check if 1 call can get us updated candles without hole in the data.
if min_date < last_refresh: if min_date < last_refresh:

View File

@@ -2,6 +2,7 @@
import asyncio import asyncio
import logging import logging
import time import time
from datetime import datetime, timezone
from threading import Thread from threading import Thread
from typing import Dict, List, Set, Tuple from typing import Dict, List, Set, Tuple
@@ -17,12 +18,12 @@ class ExchangeWS():
self.config = config self.config = config
self.ccxt_object = ccxt_object self.ccxt_object = ccxt_object
self._thread = Thread(name="ccxt_ws", target=self.start) self._thread = Thread(name="ccxt_ws", target=self.start)
self._background_tasks = set() self._background_tasks: Set[asyncio.Task] = set()
self._pairs_watching: Set[Tuple[str, str, CandleType]] = set() self._pairs_watching: Set[Tuple[str, str, CandleType]] = set()
self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set() self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set()
self.pairs_last_refresh: Dict[Tuple[str, str, CandleType], int] = {} self.pairs_last_refresh: Dict[Tuple[str, str, CandleType], float] = {}
self.pairs_last_request: Dict[Tuple[str, str, CandleType], int] = {} self.pairs_last_request: Dict[Tuple[str, str, CandleType], float] = {}
self._thread.start() self._thread.start()
def start(self) -> None: def start(self) -> None:
@@ -68,7 +69,7 @@ class ExchangeWS():
# self._pairs_scheduled.discard(pair, timeframe, candle_type) # self._pairs_scheduled.discard(pair, timeframe, candle_type)
async def continuously_async_watch_ohlcv( async def continuously_async_watch_ohlcv(
self, pair: str, timeframe: str, candle_type: CandleType) -> Tuple[str, str, str, List]: self, pair: str, timeframe: str, candle_type: CandleType) -> None:
while (pair, timeframe, candle_type) in self._pairs_watching: while (pair, timeframe, candle_type) in self._pairs_watching:
start = time.time() start = time.time()
@@ -83,3 +84,14 @@ class ExchangeWS():
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop) # asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
asyncio.run_coroutine_threadsafe(self.schedule_while_true(), loop=self._loop) asyncio.run_coroutine_threadsafe(self.schedule_while_true(), loop=self._loop)
self.cleanup_expired() self.cleanup_expired()
async def get_ohlcv(
self, pair: str, timeframe: str, candle_type: CandleType) -> Tuple[str, str, str, List]:
"""
Returns cached klines from ccxt's "watch" cache.
"""
candles = self.ccxt_object.ohlcvs.get(pair, {}).get(timeframe)
# Fake 1 candle - which is then removed again
candles.append([int(datetime.now(timezone.utc).timestamp() * 1000), 0, 0, 0, 0, 0])
logger.info(f"watch result for {pair}, {timeframe} with length {len(candles)}")
return pair, timeframe, candle_type, candles