diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 39cc9a0b3..55c981007 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -58,7 +58,6 @@ from freqtrade.exchange.exchange_utils import ( ROUND, ROUND_DOWN, ROUND_UP, - CcxtModuleType, amount_to_contract_precision, amount_to_contracts, amount_to_precision, @@ -75,6 +74,7 @@ from freqtrade.exchange.exchange_utils_timeframe import ( timeframe_to_prev_date, timeframe_to_seconds, ) +from freqtrade.exchange.exchange_ws import ExchangeWS from freqtrade.exchange.types import OHLCVResponse, OrderBook, Ticker, Tickers from freqtrade.misc import ( chunks, @@ -230,7 +230,11 @@ class Exchange: exchange_conf.get("ccxt_async_config", {}), ccxt_async_config ) self._api_async = self._init_ccxt(exchange_conf, False, ccxt_async_config) - self._has_watch_ohlcv = self.exchange_has('watchOHLCV') + self._ws_async = self._init_ccxt(exchange_conf, False, ccxt_async_config) + self._has_watch_ohlcv = self.exchange_has("watchOHLCV") + self._exchange_ws: Optional[ExchangeWS] = None + if self._has_watch_ohlcv: + self._exchange_ws = ExchangeWS(self._config, self._ws_async) logger.info(f'Using Exchange "{self.name}"') self.required_candle_call_count = 1 @@ -2222,10 +2226,14 @@ class Exchange: data = sorted(data, key=lambda x: x[0]) return pair, timeframe, candle_type, data, self._ohlcv_partial_candle - async def _async_watch_ohlcv(self, pair: str, timeframe: str, - candle_type: CandleType) -> Tuple[str, str, str, List]: + async def _async_watch_ohlcv( + self, pair: str, timeframe: str, candle_type: CandleType + ) -> Tuple[str, str, str, List]: start = time.time() - data = await self._api_async.watch_ohlcv(pair, timeframe, ) + data = await self._api_async.watch_ohlcv( + pair, + timeframe, + ) logger.info(f"watch {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s") return pair, timeframe, candle_type, data @@ -2239,19 +2247,28 @@ class Exchange: cache: bool, ) -> Coroutine[Any, Any, OHLCVResponse]: not_all_data = cache and self.required_candle_call_count > 1 + if cache: + if self._exchange_ws: + # Subscribe to websocket + self._exchange_ws.schedule_ohlcv(pair, timeframe, candle_type) if cache and (pair, timeframe, candle_type) in self._klines: candle_limit = self.ohlcv_candle_limit(timeframe, candle_type) min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp() - one_date = date_minus_candles(timeframe, 1).timestamp() + date_minus_candles(timeframe, 1).timestamp() last_refresh = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) - if (self._has_watch_ohlcv - and candle_type in (CandleType.SPOT, CandleType.FUTURES) - and one_date <= last_refresh): - logger.info(f"Using watch {pair}, {timeframe}, {candle_type}") - return self._async_watch_ohlcv(pair, timeframe, candle_type) - pass + # if self._exchange_ws: + # self._exchange_ws.schedule_ohlcv(pair, timeframe, candle_type) + # if ( + # self._has_watch_ohlcv + # and candle_type in (CandleType.SPOT, CandleType.FUTURES) + # and one_date <= last_refresh + # ): + # logger.info(f"Using watch {pair}, {timeframe}, {candle_type}") + # return self._async_watch_ohlcv(pair, timeframe, candle_type) + # pass # Check if 1 call can get us updated candles without hole in the data. - elif min_date < last_refresh: + # el + if min_date < last_refresh: # Cache can be used - do one-off call. not_all_data = False else: diff --git a/freqtrade/exchange/exchange_ws.py b/freqtrade/exchange/exchange_ws.py new file mode 100644 index 000000000..55b0fb372 --- /dev/null +++ b/freqtrade/exchange/exchange_ws.py @@ -0,0 +1,94 @@ + +import asyncio +import logging +import time +from threading import Thread +from typing import List, Set, Tuple + +from freqtrade.constants import Config +from freqtrade.enums.candletype import CandleType + + +logger = logging.getLogger(__name__) + + +class ExchangeWS(): + def __init__(self, config: Config, ccxt_object) -> None: + self.config = config + self.ccxt_object = ccxt_object + self._thread = Thread(name="ccxt_ws", target=self.start) + self._background_tasks = set() + self._pairs_watching: Set[Tuple[str, str, CandleType]] = set() + self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set() + self._thread.start() + + def start(self) -> None: + self._loop = asyncio.new_event_loop() + self._loop.run_forever() + +## One task per Watch + # async def schedule_schedule(self) -> None: + + # for p in self._pairs_watching: + # if p not in self._pairs_scheduled: + # self._pairs_scheduled.add(p) + # await self.schedule_one_task(p[0], p[1], p[2]) + + # async def schedule_one_task(self, pair: str, timeframe: str, candle_type: CandleType) -> None: + # task = asyncio.create_task(self._async_watch_ohlcv(pair, timeframe, candle_type)) + + # # Add task to the set. This creates a strong reference. + # self._background_tasks.add(task) + # task.add_done_callback(self.reschedule_or_stop) + + # async def _async_watch_ohlcv(self, pair: str, timeframe: str, + # candle_type: CandleType) -> Tuple[str, str, str, List]: + # start = time.time() + # data = await self.ccxt_object.watch_ohlcv(pair, timeframe, ) + # logger.info(f"watch done {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s") + # return pair, timeframe, candle_type, data + + # def reschedule_or_stop(self, task: asyncio.Task): + # # logger.info(f"Task finished {task}") + + # self._background_tasks.discard(task) + # pair, timeframe, candle_type, data = task.result() + + # # reschedule + # asyncio.run_coroutine_threadsafe(self.schedule_one_task( + # pair, timeframe, candle_type), loop=self._loop) + +## End one task epr watch + + async def schedule_while_true(self) -> None: + + for p in self._pairs_watching: + if p not in self._pairs_scheduled: + self._pairs_scheduled.add(p) + pair, timeframe, candle_type = p + task = asyncio.create_task( + self.continuously_async_watch_ohlcv(pair, timeframe, candle_type)) + self._background_tasks.add(task) + task.add_done_callback(self.continuous_stopped) + + def continuous_stopped(self, task: asyncio.Task): + self._background_tasks.discard(task) + pair, timeframe, candle_type, data = task.result() + self._pairs_scheduled.discard(p) + + logger.info(f"Task finished {task}") + + async def continuously_async_watch_ohlcv( + self, pair: str, timeframe: str, candle_type: CandleType) -> Tuple[str, str, str, List]: + + while (pair, timeframe, candle_type) in self._pairs_watching: + start = time.time() + data = await self.ccxt_object.watch_ohlcv(pair, timeframe, ) + logger.info( + f"watch1 done {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s") + + def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None: + self._pairs_watching.add((pair, timeframe, candle_type)) + # asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop) + asyncio.run_coroutine_threadsafe(self.schedule_while_true(), loop=self._loop) +