diff --git a/tests/exchange/test_exchange_ws.py b/tests/exchange/test_exchange_ws.py index 371021609..cde15a2c3 100644 --- a/tests/exchange/test_exchange_ws.py +++ b/tests/exchange/test_exchange_ws.py @@ -69,14 +69,30 @@ async def test_exchangews_ohlcv(mocker, time_machine, caplog): ccxt_object = MagicMock() caplog.set_level(logging.DEBUG) - async def sleeper(*args, **kwargs): - # pass - await asyncio.sleep(0.12) + # Create synchronization events for deterministic testing + watch_call_event = asyncio.Event() + watch_call_count = 0 + + async def controlled_sleeper(*args, **kwargs): + """Controlled async function that signals when called.""" + nonlocal watch_call_count + watch_call_count += 1 + # Signal that a watch call happened + watch_call_event.set() + await asyncio.sleep(0.01) # Minimal delay for realism return MagicMock() - ccxt_object.un_watch_ohlcv_for_symbols = AsyncMock(side_effect=NotSupported) + async def wait_for_condition(condition_func, timeout=5.0, check_interval=0.01): + """Wait for a condition to be true with timeout.""" + start_time = asyncio.get_event_loop().time() + while asyncio.get_event_loop().time() - start_time < timeout: + if condition_func(): + return True + await asyncio.sleep(check_interval) + return False - ccxt_object.watch_ohlcv = AsyncMock(side_effect=sleeper) + ccxt_object.un_watch_ohlcv_for_symbols = AsyncMock(side_effect=NotSupported) + ccxt_object.watch_ohlcv = AsyncMock(side_effect=controlled_sleeper) ccxt_object.close = AsyncMock() time_machine.move_to("2024-11-01 01:00:02 +00:00") @@ -90,7 +106,14 @@ async def test_exchangews_ohlcv(mocker, time_machine, caplog): exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT) exchange_ws.schedule_ohlcv("XRP/BTC", "1m", CandleType.SPOT) - await asyncio.sleep(0.2) + + # Wait for both pairs to be properly scheduled and watching + await wait_for_condition( + lambda: ( + len(exchange_ws._klines_watching) == 2 and len(exchange_ws._klines_scheduled) == 2 + ), + timeout=2.0, + ) assert exchange_ws._klines_watching == { ("ETH/BTC", "1m", CandleType.SPOT), @@ -100,22 +123,31 @@ async def test_exchangews_ohlcv(mocker, time_machine, caplog): ("ETH/BTC", "1m", CandleType.SPOT), ("XRP/BTC", "1m", CandleType.SPOT), } - await asyncio.sleep(0.1) + + # Wait for the expected number of watch calls (should be 6 based on original test logic) + await wait_for_condition(lambda: ccxt_object.watch_ohlcv.call_count >= 6, timeout=3.0) assert ccxt_object.watch_ohlcv.call_count == 6 ccxt_object.watch_ohlcv.reset_mock() time_machine.shift(timedelta(minutes=5)) exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT) - await asyncio.sleep(1) - assert log_has_re("un_watch_ohlcv_for_symbols not supported: ", caplog) - # XRP/BTC should be cleaned up. - assert exchange_ws._klines_watching == { - ("ETH/BTC", "1m", CandleType.SPOT), - } + + # Wait for log message and state changes with timeout + await wait_for_condition( + lambda: log_has_re("un_watch_ohlcv_for_symbols not supported: ", caplog), timeout=2.0 + ) + + # Wait for XRP/BTC cleanup + await wait_for_condition( + lambda: exchange_ws._klines_watching == {("ETH/BTC", "1m", CandleType.SPOT)}, + timeout=2.0, + ) # Cleanup happened. ccxt_object.un_watch_ohlcv_for_symbols = AsyncMock(side_effect=ValueError) exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT) + + # Verify final state assert exchange_ws._klines_watching == { ("ETH/BTC", "1m", CandleType.SPOT), }