From f4f8b910fe05b824b5bff20dbdabaf64a4377725 Mon Sep 17 00:00:00 2001 From: Matthias Date: Fri, 11 Nov 2022 06:46:14 +0100 Subject: [PATCH] Improve exchange_ws terminology --- freqtrade/exchange/exchange.py | 2 +- freqtrade/exchange/exchange_ws.py | 43 +++++++++++++++++-------------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 9d1a2d797..64d048b1d 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -2249,7 +2249,7 @@ class Exchange: if self._exchange_ws: candle_date = int(timeframe_to_prev_date(timeframe).timestamp()) candles = self._exchange_ws.ccxt_object.ohlcvs.get(pair, {}).get(timeframe) - x = self._exchange_ws.pairs_last_refresh[(pair, timeframe, candle_type)] + x = self._exchange_ws.klines_last_refresh.get((pair, timeframe, candle_type)) logger.info(f"{candle_date < x}, {candle_date}, {x}") if candles and candles[-1][0] > min_date and candle_date < x: # Usable result ... diff --git a/freqtrade/exchange/exchange_ws.py b/freqtrade/exchange/exchange_ws.py index b5bbb663d..1b685b116 100644 --- a/freqtrade/exchange/exchange_ws.py +++ b/freqtrade/exchange/exchange_ws.py @@ -20,22 +20,22 @@ class ExchangeWS(): def __init__(self, config: Config, ccxt_object) -> None: self.config = config self.ccxt_object = ccxt_object - self._thread = Thread(name="ccxt_ws", target=self.start) + self._thread = Thread(name="ccxt_ws", target=self.__start_forever) self._background_tasks: Set[asyncio.Task] = set() - self._pairs_watching: Set[Tuple[str, str, CandleType]] = set() - self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set() - self.pairs_last_refresh: Dict[Tuple[str, str, CandleType], float] = {} - self.pairs_last_request: Dict[Tuple[str, str, CandleType], float] = {} + self._klines_watching: Set[Tuple[str, str, CandleType]] = set() + self._klines_scheduled: Set[Tuple[str, str, CandleType]] = set() + self.klines_last_refresh: Dict[Tuple[str, str, CandleType], float] = {} + self.klines_last_request: Dict[Tuple[str, str, CandleType], float] = {} self._thread.start() - def start(self) -> None: + def __start_forever(self) -> None: self._loop = asyncio.new_event_loop() self._loop.run_forever() def cleanup(self) -> None: logger.debug("Cleanup called - stopping") - self._pairs_watching.clear() + self._klines_watching.clear() self._loop.stop() self._thread.join() logger.debug("Stopped") @@ -45,19 +45,19 @@ class ExchangeWS(): Remove pairs from watchlist if they've not been requested within the last timeframe (+ offset) """ - for p in list(self._pairs_watching): + for p in list(self._klines_watching): _, timeframe, _ = p timeframe_s = timeframe_to_seconds(timeframe) - last_refresh = self.pairs_last_request.get(p, 0) + last_refresh = self.klines_last_request.get(p, 0) if last_refresh > 0 and time.time() - last_refresh > timeframe_s + 20: logger.info(f"Removing {p} from watchlist") - self._pairs_watching.discard(p) + self._klines_watching.discard(p) async def schedule_while_true(self) -> None: - for p in self._pairs_watching: - if p not in self._pairs_scheduled: - self._pairs_scheduled.add(p) + for p in self._klines_watching: + if p not in self._klines_scheduled: + self._klines_scheduled.add(p) pair, timeframe, candle_type = p task = asyncio.create_task( self.continuously_async_watch_ohlcv(pair, timeframe, candle_type)) @@ -73,21 +73,24 @@ class ExchangeWS(): async def continuously_async_watch_ohlcv( self, pair: str, timeframe: str, candle_type: CandleType) -> None: try: - while (pair, timeframe, candle_type) in self._pairs_watching: + while (pair, timeframe, candle_type) in self._klines_watching: start = time.time() data = await self.ccxt_object.watch_ohlcv(pair, timeframe) - self.pairs_last_refresh[(pair, timeframe, candle_type)] = time.time() + self.klines_last_refresh[(pair, timeframe, candle_type)] = time.time() # logger.info( # f"watch done {pair}, {timeframe}, data {len(data)} " # f"in {time.time() - start:.2f}s") except ccxt.BaseError: logger.exception("Exception in continuously_async_watch_ohlcv") finally: - self._pairs_watching.discard((pair, timeframe, candle_type)) + self._klines_watching.discard((pair, timeframe, candle_type)) def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None: - self._pairs_watching.add((pair, timeframe, candle_type)) - self.pairs_last_request[(pair, timeframe, candle_type)] = time.time() + """ + Schedule a pair/timeframe combination to be watched + """ + self._klines_watching.add((pair, timeframe, candle_type)) + self.klines_last_request[(pair, timeframe, candle_type)] = time.time() # asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop) asyncio.run_coroutine_threadsafe(self.schedule_while_true(), loop=self._loop) self.cleanup_expired() @@ -100,10 +103,10 @@ class ExchangeWS(): candles = self.ccxt_object.ohlcvs.get(pair, {}).get(timeframe) # Fake 1 candle - which is then removed again # TODO: is this really a good idea?? - refresh_time = int(self.pairs_last_refresh[(pair, timeframe, candle_type)] * 1000) + refresh_time = int(self.klines_last_refresh[(pair, timeframe, candle_type)] * 1000) candles.append([refresh_time, 0, 0, 0, 0, 0]) logger.info( f"watch result for {pair}, {timeframe} with length {len(candles)}, " f"{datetime.fromtimestamp(candles[-1][0] // 1000)}, " - f"lref={datetime.fromtimestamp(self.pairs_last_refresh[(pair, timeframe, candle_type)])}") + f"lref={datetime.fromtimestamp(self.klines_last_refresh[(pair, timeframe, candle_type)])}") return pair, timeframe, candle_type, candles