diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 91875abfb..7cee95e6d 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -51,12 +51,18 @@ class WebSocketChannel: def remote_addr(self): return self._websocket.remote_addr - async def send(self, data): + async def _send(self, data): """ Send data on the wrapped websocket """ await self._wrapped_ws.send(data) + async def send(self, data): + """ + Add the data to the queue to be sent + """ + self.queue.put_nowait(data) + async def recv(self): """ Receive data on the wrapped websocket @@ -107,7 +113,7 @@ class WebSocketChannel: while True: message = await self.queue.get() try: - await self.send(message) + await self._send(message) self.queue.task_done() except RuntimeError: # The connection was closed, just exit the task @@ -175,7 +181,7 @@ class ChannelManager: for websocket, channel in self.channels.copy().items(): if channel.subscribed_to(message_type): if not channel.queue.full(): - channel.queue.put_nowait(data) + await channel.send(data) else: logger.info(f"Channel {channel} is too far behind, disconnecting") await self.on_disconnect(websocket) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index a182ddc57..88ade185e 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -174,6 +174,7 @@ class ExternalMessageConsumer: :param producer: Dictionary containing producer info :param lock: An asyncio Lock """ + channel = None while self._running: try: host, port = producer['host'], producer['port'] @@ -224,6 +225,10 @@ class ExternalMessageConsumer: logger.exception(e) continue + finally: + if channel: + await channel.close() + async def _receive_messages( self, channel: WebSocketChannel,