diff --git a/freqtrade/exchange/common.py b/freqtrade/exchange/common.py index 62221d0cc..251325a0c 100644 --- a/freqtrade/exchange/common.py +++ b/freqtrade/exchange/common.py @@ -93,7 +93,7 @@ EXCHANGE_HAS_OPTIONAL = [ # 'fetchOpenOrder', 'fetchClosedOrder', # replacement for fetchOrder # 'fetchOpenOrders', 'fetchClosedOrders', # 'fetchOrders', # Refinding balance... # ccxt.pro - 'watchOHLCV' + "watchOHLCV", ] diff --git a/freqtrade/exchange/exchange_ws.py b/freqtrade/exchange/exchange_ws.py index 951d8a8d5..7cc58978f 100644 --- a/freqtrade/exchange/exchange_ws.py +++ b/freqtrade/exchange/exchange_ws.py @@ -1,4 +1,3 @@ - import asyncio import logging import time @@ -46,7 +45,7 @@ class ExchangeWS: self._klines_watching.clear() for task in self._background_tasks: task.cancel() - if hasattr(self, '_loop'): + if hasattr(self, "_loop"): asyncio.run_coroutine_threadsafe(self._cleanup_async(), loop=self._loop) while not self.__cleanup_called: time.sleep(0.1) @@ -74,10 +73,7 @@ class ExchangeWS: _, timeframe, _ = p timeframe_s = timeframe_to_seconds(timeframe) last_refresh = self.klines_last_request.get(p, 0) - if ( - last_refresh > 0 - and (dt_ts() - last_refresh) > ((timeframe_s + 20) * 1000) - ): + if last_refresh > 0 and (dt_ts() - last_refresh) > ((timeframe_s + 20) * 1000): logger.info(f"Removing {p} from watchlist") self._klines_watching.discard(p) changed = True @@ -92,19 +88,21 @@ class ExchangeWS: self._klines_scheduled.add(p) pair, timeframe, candle_type = p task = asyncio.create_task( - self._continuously_async_watch_ohlcv(pair, timeframe, candle_type)) + self._continuously_async_watch_ohlcv(pair, timeframe, candle_type) + ) self._background_tasks.add(task) task.add_done_callback( partial( self._continuous_stopped, pair=pair, timeframe=timeframe, - candle_type=candle_type + candle_type=candle_type, ) ) def _continuous_stopped( - self, task: asyncio.Task, pair: str, timeframe: str, candle_type: CandleType): + self, task: asyncio.Task, pair: str, timeframe: str, candle_type: CandleType + ): self._background_tasks.discard(task) if task.cancelled(): result = "cancelled" @@ -115,7 +113,8 @@ class ExchangeWS: self._klines_scheduled.discard((pair, timeframe, candle_type)) async def _continuously_async_watch_ohlcv( - self, pair: str, timeframe: str, candle_type: CandleType) -> None: + self, pair: str, timeframe: str, candle_type: CandleType + ) -> None: try: while (pair, timeframe, candle_type) in self._klines_watching: start = dt_ts() @@ -123,10 +122,10 @@ class ExchangeWS: self.klines_last_refresh[(pair, timeframe, candle_type)] = dt_ts() logger.debug( f"watch done {pair}, {timeframe}, data {len(data)} " - f"in {dt_ts() - start:.2f}s") + f"in {dt_ts() - start:.2f}s" + ) except ccxt.BaseError: - logger.exception( - f"Exception in continuously_async_watch_ohlcv for {pair}, {timeframe}") + logger.exception(f"Exception in continuously_async_watch_ohlcv for {pair}, {timeframe}") finally: self._klines_watching.discard((pair, timeframe, candle_type)) @@ -141,11 +140,11 @@ class ExchangeWS: self.cleanup_expired() async def get_ohlcv( - self, - pair: str, - timeframe: str, - candle_type: CandleType, - candle_date: int, + self, + pair: str, + timeframe: str, + candle_type: CandleType, + candle_date: int, ) -> OHLCVResponse: """ Returns cached klines from ccxt's "watch" cache. @@ -164,5 +163,5 @@ class ExchangeWS: f"{format_ms_time(candles[-1][0])}, " f"lref={format_ms_time(refresh_date)}, " f"candle_date={format_ms_time(candle_date)}, {drop_hint=}" - ) + ) return pair, timeframe, candle_type, candles, drop_hint diff --git a/tests/exchange/test_exchange_ws.py b/tests/exchange/test_exchange_ws.py index 5e93de1f4..07819fc7a 100644 --- a/tests/exchange/test_exchange_ws.py +++ b/tests/exchange/test_exchange_ws.py @@ -8,7 +8,6 @@ from freqtrade.exchange.exchange_ws import ExchangeWS def test_exchangews_init(mocker): - config = MagicMock() ccxt_object = MagicMock() mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock()) @@ -36,6 +35,7 @@ def patch_eventloop_threading(exchange): exchange._loop = asyncio.new_event_loop() is_init = True exchange._loop.run_forever() + x = threading.Thread(target=thread_fuck, daemon=True) x.start() while not is_init: @@ -52,16 +52,15 @@ async def test_exchangews_ohlcv(mocker): exchange_ws = ExchangeWS(config, ccxt_object) patch_eventloop_threading(exchange_ws) try: - assert exchange_ws._klines_watching == set() assert exchange_ws._klines_scheduled == set() exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT) - sleep(.5) + sleep(0.5) assert exchange_ws._klines_watching == {("ETH/BTC", "1m", CandleType.SPOT)} assert exchange_ws._klines_scheduled == {("ETH/BTC", "1m", CandleType.SPOT)} - sleep(.1) + sleep(0.1) assert ccxt_object.watch_ohlcv.call_count == 1 except Exception as e: print(e) diff --git a/tests/exchange_online/test_ccxt_ws_compat.py b/tests/exchange_online/test_ccxt_ws_compat.py index c99c3db1d..32398573a 100644 --- a/tests/exchange_online/test_ccxt_ws_compat.py +++ b/tests/exchange_online/test_ccxt_ws_compat.py @@ -20,15 +20,14 @@ from tests.exchange_online.conftest import EXCHANGE_WS_FIXTURE_TYPE @pytest.mark.longrun class TestCCXTExchangeWs: - def test_ccxt_ohlcv(self, exchange_ws: EXCHANGE_WS_FIXTURE_TYPE, caplog, mocker): exch, exchangename, pair = exchange_ws assert exch._ws_async is not None - timeframe = '1m' + timeframe = "1m" pair_tf = (pair, timeframe, CandleType.SPOT) - m_hist = mocker.spy(exch, '_async_get_historic_ohlcv') - m_cand = mocker.spy(exch, '_async_get_candle_history') + m_hist = mocker.spy(exch, "_async_get_historic_ohlcv") + m_cand = mocker.spy(exch, "_async_get_candle_history") res = exch.refresh_latest_ohlcv([pair_tf]) assert m_cand.call_count == 1 @@ -45,7 +44,7 @@ class TestCCXTExchangeWs: df1 = res[pair_tf] caplog.set_level(logging.DEBUG) set_loggers(1) - assert df1.iloc[-1]['date'] == curr_candle + assert df1.iloc[-1]["date"] == curr_candle # Wait until the next candle (might be up to 1 minute). while True: @@ -53,9 +52,9 @@ class TestCCXTExchangeWs: res = exch.refresh_latest_ohlcv([pair_tf]) df2 = res[pair_tf] assert df2 is not None - if df2.iloc[-1]['date'] == next_candle: + if df2.iloc[-1]["date"] == next_candle: break - assert df2.iloc[-1]['date'] == curr_candle + assert df2.iloc[-1]["date"] == curr_candle sleep(1) assert m_hist.call_count == 0