diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index a074334c5..e29419102 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -3,7 +3,6 @@ Unit test file for rpc/external_message_consumer.py """ import asyncio import functools -import json import logging from datetime import datetime, timezone from unittest.mock import MagicMock @@ -220,10 +219,11 @@ async def test_emc_create_connection_invalid(default_conf, caplog, mocker): mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', MagicMock()) + test_producer = default_conf['external_message_consumer']['producers'][0] lock = asyncio.Lock() + dp = DataProvider(default_conf, None, None, None) emc = ExternalMessageConsumer(default_conf, dp) - test_producer = default_conf['external_message_consumer']['producers'][0] try: # Test invalid URL @@ -267,13 +267,7 @@ async def test_emc_create_connection_error(default_conf, caplog, mocker): emc.shutdown() -async def test_emc_receive_messages(default_conf, caplog, mocker): - """ - Test ExternalMessageConsumer._receive_messages - - Instantiates a patched ExternalMessageConsumer, creates a dummy websocket server, - and listens to the generated messages from the server for 1 second, then checks logs - """ +async def test_emc_receive_messages_valid(default_conf, caplog, mocker): default_conf.update({ "external_message_consumer": { "enabled": True, @@ -284,9 +278,9 @@ async def test_emc_receive_messages(default_conf, caplog, mocker): "ws_token": _TEST_WS_TOKEN } ], - "wait_timeout": 60, + "wait_timeout": 1, "ping_timeout": 60, - "sleep_timeout": 60 + "sleep_time": 60 } }) @@ -299,28 +293,161 @@ async def test_emc_receive_messages(default_conf, caplog, mocker): dp = DataProvider(default_conf, None, None, None) emc = ExternalMessageConsumer(default_conf, dp) - # Dummy generator - async def generate_messages(websocket): - try: - for i in range(3): - message = json.dumps({"type": "whitelist", "data": ["BTC/USDT"]}) - await websocket.send(message) - await asyncio.sleep(1) - except websockets.exceptions.ConnectionClosedOK: - return - loop = asyncio.get_event_loop() def change_running(emc): emc._running = not emc._running + class TestChannel: + async def recv(self, *args, **kwargs): + return {"type": "whitelist", "data": ["BTC/USDT"]} + + async def ping(self, *args, **kwargs): + return asyncio.Future() + try: - # Start the dummy websocket server - async with websockets.serve(generate_messages, _TEST_WS_HOST, _TEST_WS_PORT): - # Change running to True, and call change_running in 1 second - emc._running = True - loop.call_later(1, functools.partial(change_running, emc=emc)) - # Create the connection that receives messages - await emc._create_connection(test_producer, lock) + change_running(emc) + loop.call_soon(functools.partial(change_running, emc=emc)) + await emc._receive_messages(TestChannel(), test_producer, lock) assert log_has_re(r"Received message of type `whitelist`.+", caplog) finally: emc.shutdown() + + +async def test_emc_receive_messages_invalid(default_conf, caplog, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 1, + "ping_timeout": 60, + "sleep_time": 60 + } + }) + + mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', + MagicMock()) + + lock = asyncio.Lock() + test_producer = default_conf['external_message_consumer']['producers'][0] + + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + + loop = asyncio.get_event_loop() + def change_running(emc): emc._running = not emc._running + + class TestChannel: + async def recv(self, *args, **kwargs): + return {"type": ["BTC/USDT"]} + + async def ping(self, *args, **kwargs): + return asyncio.Future() + + try: + change_running(emc) + loop.call_soon(functools.partial(change_running, emc=emc)) + await emc._receive_messages(TestChannel(), test_producer, lock) + + assert log_has_re(r"Invalid message from.+", caplog) + finally: + emc.shutdown() + + +async def test_emc_receive_messages_timeout(default_conf, caplog, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 1, + "ping_timeout": 1, + "sleep_time": 1 + } + }) + + mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', + MagicMock()) + + lock = asyncio.Lock() + test_producer = default_conf['external_message_consumer']['producers'][0] + + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + + loop = asyncio.get_event_loop() + def change_running(emc): emc._running = not emc._running + + class TestChannel: + async def recv(self, *args, **kwargs): + await asyncio.sleep(10) + + async def ping(self, *args, **kwargs): + return asyncio.Future() + + try: + change_running(emc) + loop.call_soon(functools.partial(change_running, emc=emc)) + await emc._receive_messages(TestChannel(), test_producer, lock) + + assert log_has_re(r"Ping error.+", caplog) + finally: + emc.shutdown() + + +async def test_emc_receive_messages_handle_error(default_conf, caplog, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 1, + "ping_timeout": 1, + "sleep_time": 1 + } + }) + + mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', + MagicMock()) + + lock = asyncio.Lock() + test_producer = default_conf['external_message_consumer']['producers'][0] + + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + + emc.handle_producer_message = MagicMock(side_effect=Exception) + + loop = asyncio.get_event_loop() + def change_running(emc): emc._running = not emc._running + + class TestChannel: + async def recv(self, *args, **kwargs): + return {"type": "whitelist", "data": ["BTC/USDT"]} + + async def ping(self, *args, **kwargs): + return asyncio.Future() + + try: + change_running(emc) + loop.call_soon(functools.partial(change_running, emc=emc)) + await emc._receive_messages(TestChannel(), test_producer, lock) + + assert log_has_re(r"Error handling producer message.+", caplog) + finally: + emc.shutdown()