diff --git a/freqtrade/data/history/history_utils.py b/freqtrade/data/history/history_utils.py index 9fd254057..1408d47ef 100644 --- a/freqtrade/data/history/history_utils.py +++ b/freqtrade/data/history/history_utils.py @@ -6,7 +6,14 @@ from pathlib import Path from pandas import DataFrame, concat from freqtrade.configuration import TimeRange -from freqtrade.constants import DATETIME_PRINT_FORMAT, DL_DATA_TIMEFRAMES, DOCS_LINK, Config +from freqtrade.constants import ( + DATETIME_PRINT_FORMAT, + DL_DATA_TIMEFRAMES, + DOCS_LINK, + Config, + ListPairsWithTimeframes, + PairWithTimeframe, +) from freqtrade.data.converter import ( clean_ohlcv_dataframe, convert_trades_to_ohlcv, @@ -17,6 +24,7 @@ from freqtrade.data.history.datahandlers import IDataHandler, get_datahandler from freqtrade.enums import CandleType, TradingMode from freqtrade.exceptions import OperationalException from freqtrade.exchange import Exchange +from freqtrade.exchange.exchange_utils import date_minus_candles from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist from freqtrade.util import dt_now, dt_ts, format_ms_time, format_ms_time_det from freqtrade.util.migrations import migrate_data @@ -226,6 +234,7 @@ def _download_pair_history( candle_type: CandleType, erase: bool = False, prepend: bool = False, + pair_candles: DataFrame | None = None, ) -> bool: """ Download latest candles from the exchange for the pair and timeframe passed in parameters @@ -271,21 +280,40 @@ def _download_pair_history( "Current End: %s", f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None", ) - - # Default since_ms to 30 days if nothing is given - new_dataframe = exchange.get_historic_ohlcv( - pair=pair, - timeframe=timeframe, - since_ms=( - since_ms - if since_ms - else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000 - ), - is_new_pair=data.empty, - candle_type=candle_type, - until_ms=until_ms if until_ms else None, + # used to check if the passed in pair_candles are not as old as since_ms + # if not then we need more data and so we will have to collect more using the typical method + pair_candles_since_ms = ( + dt_ts(pair_candles.iloc[0]["date"]) + if pair_candles is not None and len(pair_candles.index) > 0 + else 0 ) - logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.") + if ( + pair_candles is None + or len(pair_candles.index) == 0 + or prepend is True + or erase is True + or pair_candles_since_ms > (since_ms if since_ms else 0) + ): + # Default since_ms to 30 days if nothing is given + new_dataframe = exchange.get_historic_ohlcv( + pair=pair, + timeframe=timeframe, + since_ms=( + since_ms + if since_ms + else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000 + ), + is_new_pair=data.empty, + candle_type=candle_type, + until_ms=until_ms if until_ms else None, + ) + logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.") + else: + new_dataframe = pair_candles # following clean_ohlcv_dataframe can do the clean up + logger.info( + f"Downloaded data for {pair} with length {len(new_dataframe)}. Parallel Method." + ) + if data.empty: data = new_dataframe else: @@ -339,6 +367,7 @@ def refresh_backtest_ohlcv_data( progress_tracker = retrieve_progress_tracker(progress_tracker) pairs_not_available = [] + fast_candles: dict[PairWithTimeframe, DataFrame] = {} data_handler = get_datahandler(datadir, data_format) candle_type = CandleType.get_default(trading_mode) with progress_tracker as progress: @@ -355,6 +384,30 @@ def refresh_backtest_ohlcv_data( logger.info(f"Skipping pair {pair}...") continue for timeframe in timeframes: + # Get fast candles via parallel method on first loop through per timeframe + # and candle type. Downloads all the pairs in the list and stores them. + if ( + ((pair, timeframe, candle_type) not in fast_candles) + and (erase is False) + and (prepend is False) + ): + fast_candles.update( + _download_all_pairs_history_parallel( + exchange=exchange, + pairs=pairs, + timeframe=timeframe, + trading_mode=trading_mode, + timerange=timerange, + ) + ) + + # get the already downloaded pair candles if they exist + pair_candles = ( + fast_candles[(pair, timeframe, candle_type)] + if (pair, timeframe, candle_type) in fast_candles + else None + ) + progress.update(timeframe_task, description=f"Timeframe {timeframe}") logger.debug(f"Downloading pair {pair}, {candle_type}, interval {timeframe}.") _download_pair_history( @@ -368,6 +421,7 @@ def refresh_backtest_ohlcv_data( candle_type=candle_type, erase=erase, prepend=prepend, + pair_candles=pair_candles, # optional pass of dataframe of parallel candles ) progress.update(timeframe_task, advance=1) if trading_mode == "futures": @@ -404,6 +458,38 @@ def refresh_backtest_ohlcv_data( return pairs_not_available +def _download_all_pairs_history_parallel( + exchange: Exchange, + pairs: list[str], + timeframe: str, + trading_mode: str, + timerange: TimeRange | None = None, +) -> dict[PairWithTimeframe, DataFrame]: + """ + Allows us to use the faster parallel async download method for many coins + but only if the data is short enough to be retrieved in one call. + Used by freqtrade download-data subcommand. + :return: Candle pairs with timeframes + """ + candles: dict[PairWithTimeframe, DataFrame] = {} + since = 0 + if timerange: + if timerange.starttype == "date": + since = timerange.startts * 1000 + + candle_limit = exchange.ohlcv_candle_limit(timeframe, CandleType.get_default(trading_mode)) + one_call_min_time_dt = dt_ts(date_minus_candles(timeframe, candle_limit)) + # check if we can get them all in one go, if so then we can download them in parallel + if since > one_call_min_time_dt: + logger.info(f"Downloading Parallel Candles for {timeframe} since {format_ms_time(since)}") + needed_pairs: ListPairsWithTimeframes = [ + (p, timeframe, CandleType.get_default(trading_mode)) for p in [p for p in pairs] + ] + candles = exchange.refresh_ohlcv_with_cache(needed_pairs, since) + + return candles + + def _download_trades_history( exchange: Exchange, pair: str,