diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 0a11fbefe..4c32de8b5 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -127,26 +127,29 @@ class ExternalMessageConsumer: self._channel_streams = {} - if self._sub_tasks: - # Cancel sub tasks - for task in self._sub_tasks: - task.cancel() + asyncio.run_coroutine_threadsafe(self._shutdown_async(), loop=self._loop) - if self._main_task: - # Cancel the main task - self._main_task.cancel() - - self._thread.join() + self._thread.join(timeout=5) self._thread = None self._loop = None self._sub_tasks = None self._main_task = None + async def _shutdown_async(self): + """Cancel all tasks, let them finish, then stop the loop.""" + if self._sub_tasks: + for task in self._sub_tasks: + task.cancel() + await asyncio.gather(*self._sub_tasks, return_exceptions=True) + + if self._main_task: + self._main_task.cancel() + + self._loop.stop() + async def _main(self): - """ - The main task coroutine - """ + """The main task coroutine""" lock = asyncio.Lock() try: @@ -161,7 +164,8 @@ class ExternalMessageConsumer: pass finally: # Stop the loop once we are done - self._loop.stop() + if self._loop: + self._loop.stop() async def _handle_producer_connection(self, producer: Producer, lock: asyncio.Lock): """