From c04cce52eaf9b1e88ff461d54fc4f956c0a71471 Mon Sep 17 00:00:00 2001 From: Joe Schr <8218910+TheJoeSchr@users.noreply.github.com> Date: Tue, 6 Feb 2024 20:13:23 +0100 Subject: [PATCH] Fix unnecessary deep intend --- freqtrade/exchange/exchange.py | 355 +++++++++++++++++---------------- 1 file changed, 181 insertions(+), 174 deletions(-) diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 8c6669fe2..b958d2f6b 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -1981,53 +1981,54 @@ class Exchange: return data async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, - since_ms: int, candle_type: CandleType, - is_new_pair: bool = False, raise_: bool = False, - until_ms: Optional[int] = None - ) -> OHLCVResponse: - """ - Download historic ohlcv - :param is_new_pair: used by binance subclass to allow "fast" new pair downloading - :param candle_type: Any of the enum CandleType (must match trading mode!) - """ - - one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( - timeframe, candle_type, since_ms) - logger.debug( - "one_call: %s msecs (%s)", - one_call, - dt_humanize(dt_now() - timedelta(milliseconds=one_call), only_distance=True) - ) - input_coroutines = [self._async_get_candle_history( - pair, timeframe, candle_type, since) for since in - range(since_ms, until_ms or dt_ts(), one_call)] - - data: List = [] - # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling - for input_coro in chunks(input_coroutines, 100): - - results = await asyncio.gather(*input_coro, return_exceptions=True) - for res in results: - if isinstance(res, Exception): - logger.warning(f"Async code raised an exception: {repr(res)}") - if raise_: - raise - continue - else: - # Deconstruct tuple if it's not an exception - p, _, c, new_data, _ = res - if p == pair and c == candle_type: - data.extend(new_data) - # Sort data again after extending the result - above calls return in "async order" - data = sorted(data, key=lambda x: x[0]) - return pair, timeframe, candle_type, data, self._ohlcv_partial_candle - - - async def _async_get_historic_trades(self, pair: str, timeframe: str, since_ms: int, candle_type: CandleType, is_new_pair: bool = False, raise_: bool = False, until_ms: Optional[int] = None - ) -> Ticker: + ) -> OHLCVResponse: + """ + Download historic ohlcv + :param is_new_pair: used by binance subclass to allow "fast" new pair downloading + :param candle_type: Any of the enum CandleType (must match trading mode!) + """ + + one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( + timeframe, candle_type, since_ms) + logger.debug( + "one_call: %s msecs (%s)", + one_call, + dt_humanize(dt_now() - timedelta(milliseconds=one_call), + only_distance=True) + ) + input_coroutines = [self._async_get_candle_history( + pair, timeframe, candle_type, since) for since in + range(since_ms, until_ms or dt_ts(), one_call)] + + data: List = [] + # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling + for input_coro in chunks(input_coroutines, 100): + + results = await asyncio.gather(*input_coro, return_exceptions=True) + for res in results: + if isinstance(res, Exception): + logger.warning( + f"Async code raised an exception: {repr(res)}") + if raise_: + raise + continue + else: + # Deconstruct tuple if it's not an exception + p, _, c, new_data, _ = res + if p == pair and c == candle_type: + data.extend(new_data) + # Sort data again after extending the result - above calls return in "async order" + data = sorted(data, key=lambda x: x[0]) + return pair, timeframe, candle_type, data, self._ohlcv_partial_candle + + async def _async_get_historic_trades(self, pair: str, timeframe: str, + since_ms: int, candle_type: CandleType, + is_new_pair: bool = False, raise_: bool = False, + until_ms: Optional[int] = None + ) -> Ticker: """ Download historic trades :param is_new_pair: used by binance subclass to allow "fast" new pair downloading @@ -2060,41 +2061,41 @@ class Exchange: # Deconstruct tuple if it's not an exception p, _, c, new_data, _ = res if p == pair and c == candle_type: - data.extend(new_data) + data.extend(new_data) # Sort data again after extending the result - above calls return in "async order" - data = sorted(data, key=lambda x: x['timestamp'])# TODO: sort via 'timestamp' or 'id'? + data = sorted(data, key=lambda x: x['timestamp']) # TODO: sort via 'timestamp' or 'id'? return pair, timeframe, candle_type, data, self._ohlcv_partial_candle def _build_coroutine_get_ohlcv( - self, pair: str, timeframe: str, candle_type: CandleType, - since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: - not_all_data = cache and self.required_candle_call_count > 1 - if cache and (pair, timeframe, candle_type) in self._klines: - candle_limit = self.ohlcv_candle_limit(timeframe, candle_type) - min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp() - # Check if 1 call can get us updated candles without hole in the data. - if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0): - # Cache can be used - do one-off call. - not_all_data = False - else: - # Time jump detected, evict cache - logger.info( - f"Time jump detected. Evicting ohlcv cache for {pair}, {timeframe}, {candle_type}") - del self._klines[(pair, timeframe, candle_type)] - - if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)): - # Multiple calls for one pair - to get more history - since_ms = self.needed_candle_ms(timeframe,candle_type) - - # TODO: fetch_trades and return as results - if since_ms: - return self._async_get_historic_ohlcv( - pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type) + self, pair: str, timeframe: str, candle_type: CandleType, + since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: + not_all_data = cache and self.required_candle_call_count > 1 + if cache and (pair, timeframe, candle_type) in self._klines: + candle_limit = self.ohlcv_candle_limit(timeframe, candle_type) + min_date = date_minus_candles( + timeframe, candle_limit - 5).timestamp() + # Check if 1 call can get us updated candles without hole in the data. + if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0): + # Cache can be used - do one-off call. + not_all_data = False else: - # One call ... "regular" refresh - return self._async_get_candle_history( - pair, timeframe, since_ms=since_ms, candle_type=candle_type) + # Time jump detected, evict cache + logger.info( + f"Time jump detected. Evicting ohlcv cache for {pair}, {timeframe}, {candle_type}") + del self._klines[(pair, timeframe, candle_type)] + if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)): + # Multiple calls for one pair - to get more history + since_ms = self.needed_candle_ms(timeframe, candle_type) + + # TODO: fetch_trades and return as results + if since_ms: + return self._async_get_historic_ohlcv( + pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type) + else: + # One call ... "regular" refresh + return self._async_get_candle_history( + pair, timeframe, since_ms=since_ms, candle_type=candle_type) def _build_coroutine_get_trades( self, pair: str, timeframe: str, candle_type: CandleType, @@ -2129,38 +2130,36 @@ class Exchange: return self._async_get_trades_history( pair, timeframe, since_ms=since_ms, candle_type=candle_type) - def _build_ohlcv_dl_jobs( - self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], - cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]: - """ - Build Coroutines to execute as part of refresh_latest_ohlcv - """ - input_coroutines: List[Coroutine[Any, Any, Ticker]] = [] - cached_pairs = [] - for pair, timeframe, candle_type in set(pair_list): - if (timeframe not in self.timeframes - and candle_type in (CandleType.SPOT, CandleType.FUTURES)): - logger.warning( - f"Cannot download ({pair}, {timeframe}) combination as this timeframe is " - f"not available on {self.name}. Available timeframes are " - f"{', '.join(self.timeframes)}.") - continue + self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], + cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]: + """ + Build Coroutines to execute as part of refresh_latest_ohlcv + """ + input_coroutines: List[Coroutine[Any, Any, Ticker]] = [] + cached_pairs = [] + for pair, timeframe, candle_type in set(pair_list): + if (timeframe not in self.timeframes + and candle_type in (CandleType.SPOT, CandleType.FUTURES)): + logger.warning( + f"Cannot download ({pair}, {timeframe}) combination as this timeframe is " + f"not available on {self.name}. Available timeframes are " + f"{', '.join(self.timeframes)}.") + continue - if ((pair, timeframe, candle_type) not in self._klines or not cache - or self._now_is_time_to_refresh(pair, timeframe, candle_type)): + if ((pair, timeframe, candle_type) not in self._klines or not cache + or self._now_is_time_to_refresh(pair, timeframe, candle_type)): - input_coroutines.append( - self._build_coroutine_get_ohlcv(pair, timeframe, candle_type, since_ms, cache)) + input_coroutines.append( + self._build_coroutine_get_ohlcv(pair, timeframe, candle_type, since_ms, cache)) - else: - logger.debug( - f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..." - ) - cached_pairs.append((pair, timeframe, candle_type)) - - return input_coroutines, cached_pairs + else: + logger.debug( + f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..." + ) + cached_pairs.append((pair, timeframe, candle_type)) + return input_coroutines, cached_pairs def _build_trades_dl_jobs( self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], @@ -2209,116 +2208,124 @@ class Exchange: return input_coroutines, cached_pairs def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], - cache: bool, drop_incomplete: bool) -> DataFrame: - # keeping last candle time as last refreshed time of the pair - if ticks and cache: - self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000 - # keeping parsed dataframe in cache - ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True, - drop_incomplete=drop_incomplete) - if cache: - if (pair, timeframe, c_type) in self._klines: - old = self._klines[(pair, timeframe, c_type)] - # Reassign so we return the updated, combined df - ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair, - fill_missing=True, drop_incomplete=False) - candle_limit = self.ohlcv_candle_limit(timeframe, self._config['candle_type_def']) - # Age out old candles - ohlcv_df = ohlcv_df.tail(candle_limit + self._startup_candle_count) - ohlcv_df = ohlcv_df.reset_index(drop=True) - self._klines[(pair, timeframe, c_type)] = ohlcv_df - else: - self._klines[(pair, timeframe, c_type)] = ohlcv_df - return ohlcv_df + cache: bool, drop_incomplete: bool) -> DataFrame: + # keeping last candle time as last refreshed time of the pair + if ticks and cache: + self._pairs_last_refresh_time[( + pair, timeframe, c_type)] = ticks[-1][0] // 1000 + # keeping parsed dataframe in cache + ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True, + drop_incomplete=drop_incomplete) + if cache: + if (pair, timeframe, c_type) in self._klines: + old = self._klines[(pair, timeframe, c_type)] + # Reassign so we return the updated, combined df + ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair, + fill_missing=True, drop_incomplete=False) + candle_limit = self.ohlcv_candle_limit( + timeframe, self._config['candle_type_def']) + # Age out old candles + ohlcv_df = ohlcv_df.tail( + candle_limit + self._startup_candle_count) + ohlcv_df = ohlcv_df.reset_index(drop=True) + self._klines[(pair, timeframe, c_type)] = ohlcv_df + else: + self._klines[(pair, timeframe, c_type)] = ohlcv_df + return ohlcv_df def _process_trades_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], - cache: bool, drop_incomplete: bool, first_required_candle_date:Optional[int]) -> DataFrame: + cache: bool, drop_incomplete: bool, first_required_candle_date: Optional[int]) -> DataFrame: # keeping parsed dataframe in cache # TODO: pass last_full_candle_date to drop as incomplete trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False, - drop_incomplete=drop_incomplete) + drop_incomplete=drop_incomplete) # keeping last candle time as last refreshed time of the pair if ticks and cache: idx = -2 if drop_incomplete and len(ticks) > 1 else -1 - self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # NOTE: // is floor: divides and rounds to nearest int + self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # NOTE: // is floor: divides and rounds to nearest int if cache: if (pair, timeframe, c_type) in self._trades: old = self._trades[(pair, timeframe, c_type)] # Reassign so we return the updated, combined df - trades_df = clean_duplicate_trades(concat([old, trades_df], axis=0), timeframe, pair, fill_missing=False, drop_incomplete=False) + trades_df = clean_duplicate_trades(concat( + [old, trades_df], axis=0), timeframe, pair, fill_missing=False, drop_incomplete=False) # warn_of_tick_duplicates(trades_df, pair) # Age out old candles if first_required_candle_date: # slice of older dates - trades_df = trades_df[first_required_candle_date < trades_df['timestamp']] + trades_df = trades_df[first_required_candle_date < + trades_df['timestamp']] trades_df = trades_df.reset_index(drop=True) self._trades[(pair, timeframe, c_type)] = trades_df return trades_df def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, - since_ms: Optional[int] = None, cache: bool = True, - drop_incomplete: Optional[bool] = None - ) -> Dict[PairWithTimeframe, DataFrame]: - """ - Refresh in-memory OHLCV asynchronously and set `_klines` with the result - Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). - Only used in the dataprovider.refresh() method. - :param pair_list: List of 2 element tuples containing pair, interval to refresh - :param since_ms: time since when to download, in milliseconds - :param cache: Assign result to _trades. Usefull for one-off downloads like for pairlists - :param drop_incomplete: Control candle dropping. - Specifying None defaults to _ohlcv_partial_candle - :return: Dict of [{(pair, timeframe): Dataframe}] - """ - logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) + since_ms: Optional[int] = None, cache: bool = True, + drop_incomplete: Optional[bool] = None + ) -> Dict[PairWithTimeframe, DataFrame]: + """ + Refresh in-memory OHLCV asynchronously and set `_klines` with the result + Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). + Only used in the dataprovider.refresh() method. + :param pair_list: List of 2 element tuples containing pair, interval to refresh + :param since_ms: time since when to download, in milliseconds + :param cache: Assign result to _trades. Usefull for one-off downloads like for pairlists + :param drop_incomplete: Control candle dropping. + Specifying None defaults to _ohlcv_partial_candle + :return: Dict of [{(pair, timeframe): Dataframe}] + """ + logger.debug( + "Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) - # Gather coroutines to run - input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache) + # Gather coroutines to run + input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs( + pair_list, since_ms, cache) - results_df = {} - # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling - for input_coro in chunks(input_coroutines, 100): + results_df = {} + # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling + for input_coro in chunks(input_coroutines, 100): - async def gather_stuff(): - return await asyncio.gather(*input_coro, return_exceptions=True) + async def gather_stuff(): + return await asyncio.gather(*input_coro, return_exceptions=True) - with self._loop_lock: - results = self.loop.run_until_complete(gather_stuff()) + with self._loop_lock: + results = self.loop.run_until_complete(gather_stuff()) - for res in results: - if isinstance(res, Exception): - logger.warning(f"Async code raised an exception: {repr(res)}") - continue - # Deconstruct tuple (has 5 elements) - pair, timeframe, c_type, ticks, drop_hint = res - drop_incomplete = drop_hint if drop_incomplete is None else drop_incomplete - # TODO: here ohlcv candles get saved into self._trades - ohlcv_df = self._process_ohlcv_df( - pair, timeframe, c_type, ticks, cache, drop_incomplete) + for res in results: + if isinstance(res, Exception): + logger.warning( + f"Async code raised an exception: {repr(res)}") + continue + # Deconstruct tuple (has 5 elements) + pair, timeframe, c_type, ticks, drop_hint = res + drop_incomplete = drop_hint if drop_incomplete is None else drop_incomplete + # TODO: here ohlcv candles get saved into self._trades + ohlcv_df = self._process_ohlcv_df( + pair, timeframe, c_type, ticks, cache, drop_incomplete) - results_df[(pair, timeframe, c_type)] = ohlcv_df + results_df[(pair, timeframe, c_type)] = ohlcv_df - # Return cached trades - for pair, timeframe, c_type in cached_pairs: - results_df[(pair, timeframe, c_type)] = self.klines( - (pair, timeframe, c_type), - copy=False - ) + # Return cached trades + for pair, timeframe, c_type in cached_pairs: + results_df[(pair, timeframe, c_type)] = self.klines( + (pair, timeframe, c_type), + copy=False + ) - return results_df + return results_df - def needed_candle_ms(self, timeframe:str, candle_type:CandleType): + def needed_candle_ms(self, timeframe: str, candle_type:CandleType): one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( timeframe, candle_type) move_to = one_call * self.required_candle_call_count now = timeframe_to_next_date(timeframe) return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) - def refresh_latest_trades(self, - pair_list: ListPairsWithTimeframes , - data_handler: Callable,# using IDataHandler ends with circular import, - *, - cache: bool = True, + def refresh_latest_trades(self, + pair_list: ListPairsWithTimeframes, + data_handler: Callable, # using IDataHandler ends with circular import, + *, + cache: bool = True, ) -> Dict[PairWithTimeframe, DataFrame]: """ Refresh in-memory TRADES asynchronously and set `_trades` with the result