diff --git a/freqtrade/exchange/exchange_ws.py b/freqtrade/exchange/exchange_ws.py index b304247a3..bded6ec27 100644 --- a/freqtrade/exchange/exchange_ws.py +++ b/freqtrade/exchange/exchange_ws.py @@ -18,9 +18,11 @@ class ExchangeWS(): self.ccxt_object = ccxt_object self._thread = Thread(name="ccxt_ws", target=self.start) self._background_tasks = set() + self._pairs_watching: Set[Tuple[str, str, CandleType]] = set() self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set() self.pairs_last_refresh: Dict[Tuple[str, str, CandleType], int] = {} + self.pairs_last_request: Dict[Tuple[str, str, CandleType], int] = {} self._thread.start() def start(self) -> None: @@ -34,39 +36,19 @@ class ExchangeWS(): self._thread.join() logger.debug("Stopped") -# One task per Watch - # async def schedule_schedule(self) -> None: - - # for p in self._pairs_watching: - # if p not in self._pairs_scheduled: - # self._pairs_scheduled.add(p) - # await self.schedule_one_task(p[0], p[1], p[2]) - - # async def schedule_one_task(self, pair: str, timeframe: str, candle_type: CandleType) -> None: - # task = asyncio.create_task(self._async_watch_ohlcv(pair, timeframe, candle_type)) - - # # Add task to the set. This creates a strong reference. - # self._background_tasks.add(task) - # task.add_done_callback(self.reschedule_or_stop) - - # async def _async_watch_ohlcv(self, pair: str, timeframe: str, - # candle_type: CandleType) -> Tuple[str, str, str, List]: - # start = time.time() - # data = await self.ccxt_object.watch_ohlcv(pair, timeframe, ) - # logger.info(f"watch done {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s") - # return pair, timeframe, candle_type, data - - # def reschedule_or_stop(self, task: asyncio.Task): - # # logger.info(f"Task finished {task}") - - # self._background_tasks.discard(task) - # pair, timeframe, candle_type, data = task.result() - - # # reschedule - # asyncio.run_coroutine_threadsafe(self.schedule_one_task( - # pair, timeframe, candle_type), loop=self._loop) - -# End one task epr watch + def cleanup_expired(self) -> None: + """ + Remove pairs from watchlist if they've not been requested within + the last timeframe (+ offset) + """ + from freqtrade.exchange.exchange import timeframe_to_seconds + for p in list(self._pairs_watching): + _, timeframe, _ = p + timeframe_s = timeframe_to_seconds(timeframe) + last_refresh = self.pairs_last_request.get(p, 0) + if last_refresh > 0 and time.time() - last_refresh > timeframe_s + 20: + logger.info(f"Removing {p} from watchlist") + self._pairs_watching.discard(p) async def schedule_while_true(self) -> None: @@ -99,6 +81,7 @@ class ExchangeWS(): def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None: self._pairs_watching.add((pair, timeframe, candle_type)) + self.pairs_last_request[(pair, timeframe, candle_type)] = time.time() # asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop) asyncio.run_coroutine_threadsafe(self.schedule_while_true(), loop=self._loop) - + self.cleanup_expired()