diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 15312ba10..743698b24 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -224,20 +224,21 @@ class ExternalMessageConsumer: websockets.exceptions.InvalidMessage ) as e: logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + continue except ( websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK ): # Just keep trying to connect again indefinitely - pass + await asyncio.sleep(self.sleep_time) + continue except Exception as e: # An unforseen error has occurred, log and continue logger.error("Unexpected error has occurred:") logger.exception(e) - - finally: await asyncio.sleep(self.sleep_time) continue diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index 93ae829d5..155239e94 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -94,7 +94,7 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): assert log_has( f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`", caplog) - # Test handle analyzed_df message + # Test handle analyzed_df single candle message df_message = { "type": "analyzed_df", "data": { @@ -106,8 +106,7 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): patched_emc.handle_producer_message(test_producer, df_message) assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) - assert log_has( - f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`", caplog) + assert log_has_re(r"Holes in data or no existing df,.+", caplog) # Test unhandled message unhandled_message = {"type": "status", "data": "RUNNING"} @@ -183,7 +182,7 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker): async with websockets.serve(eat, _TEST_WS_HOST, _TEST_WS_PORT): await emc._create_connection(test_producer, lock) - assert log_has_re(r"Producer connection success.+", caplog) + assert log_has_re(r"Connected to channel.+", caplog) finally: emc.shutdown() @@ -212,7 +211,8 @@ async def test_emc_create_connection_invalid_url(default_conf, caplog, mocker, h dp = DataProvider(default_conf, None, None, None) # Handle start explicitly to avoid messing with threading in tests - mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start",) + mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start") + mocker.patch("freqtrade.rpc.api_server.ws.channel.create_channel") emc = ExternalMessageConsumer(default_conf, dp) try: @@ -390,7 +390,9 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker): try: change_running(emc) loop.call_soon(functools.partial(change_running, emc=emc)) - await emc._receive_messages(TestChannel(), test_producer, lock) + + with pytest.raises(asyncio.TimeoutError): + await emc._receive_messages(TestChannel(), test_producer, lock) assert log_has_re(r"Ping error.+", caplog) finally: