diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index ad66d50e0..45fad6aa6 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -263,6 +263,8 @@ class Exchange: self.close() def close(self): + if self._exchange_ws: + self._exchange_ws.cleanup() logger.debug("Exchange object destroyed, closing async loop") if ( self._api_async @@ -2229,14 +2231,11 @@ class Exchange: async def _async_watch_ohlcv( self, pair: str, timeframe: str, candle_type: CandleType ) -> Tuple[str, str, str, List]: - start = time.time() - data = await self._api_async.watch_ohlcv( - pair, - timeframe, - ) - - logger.info(f"watch {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s") - return pair, timeframe, candle_type, data + candles = self._exchange_ws.ccxt_object.ohlcvs.get(pair, {}).get(timeframe) + # Fake 1 candle - which is then removed again + candles.append([int(datetime.now(timezone.utc).timestamp() * 1000), 0, 0, 0, 0, 0]) + logger.info(f"watch result for {pair}, {timeframe} with length {len(candles)}") + return pair, timeframe, candle_type, candles def _build_coroutine( self, @@ -2247,27 +2246,27 @@ class Exchange: cache: bool, ) -> Coroutine[Any, Any, OHLCVResponse]: not_all_data = cache and self.required_candle_call_count > 1 - if cache: - if self._exchange_ws: + if cache and candle_type in (CandleType.SPOT, CandleType.FUTURES): + if self._has_watch_ohlcv and self._exchange_ws: # Subscribe to websocket self._exchange_ws.schedule_ohlcv(pair, timeframe, candle_type) if cache and (pair, timeframe, candle_type) in self._klines: candle_limit = self.ohlcv_candle_limit(timeframe, candle_type) - min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp() - date_minus_candles(timeframe, 1).timestamp() + min_date = int(date_minus_candles(timeframe, candle_limit - 5).timestamp()) + last_refresh = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) - # if self._exchange_ws: - # self._exchange_ws.schedule_ohlcv(pair, timeframe, candle_type) - # if ( - # self._has_watch_ohlcv - # and candle_type in (CandleType.SPOT, CandleType.FUTURES) - # and one_date <= last_refresh - # ): - # logger.info(f"Using watch {pair}, {timeframe}, {candle_type}") - # return self._async_watch_ohlcv(pair, timeframe, candle_type) - # pass + if self._exchange_ws: + candle_date = int(timeframe_to_prev_date(timeframe).timestamp()) + candles = self._exchange_ws.ccxt_object.ohlcvs.get(pair, {}).get(timeframe) + x = self._exchange_ws.pairs_last_refresh[(pair, timeframe, candle_type)] + logger.info(f"{candle_date < x}, {candle_date}, {x}") + if candles and candles[-1][0] > min_date and candle_date < x: + # Usable result ... + logger.info(f"reuse watch result for {pair}, {timeframe}, {x}") + + return self._async_watch_ohlcv(pair, timeframe, candle_type) + # Check if 1 call can get us updated candles without hole in the data. - # el if min_date < last_refresh: # Cache can be used - do one-off call. not_all_data = False diff --git a/freqtrade/exchange/exchange_ws.py b/freqtrade/exchange/exchange_ws.py index 1805aedef..b304247a3 100644 --- a/freqtrade/exchange/exchange_ws.py +++ b/freqtrade/exchange/exchange_ws.py @@ -91,7 +91,6 @@ class ExchangeWS(): self, pair: str, timeframe: str, candle_type: CandleType) -> Tuple[str, str, str, List]: while (pair, timeframe, candle_type) in self._pairs_watching: - logger.info(self._pairs_watching) start = time.time() data = await self.ccxt_object.watch_ohlcv(pair, timeframe) self.pairs_last_refresh[(pair, timeframe, candle_type)] = time.time()