diff --git a/docs/commands/download-data.md b/docs/commands/download-data.md index ce80e704e..2975d0717 100644 --- a/docs/commands/download-data.md +++ b/docs/commands/download-data.md @@ -4,6 +4,7 @@ usage: freqtrade download-data [-h] [-v] [--no-color] [--logfile FILE] [-V] [-p PAIRS [PAIRS ...]] [--pairs-file FILE] [--days INT] [--new-pairs-days INT] [--include-inactive-pairs] + [--no-parallel-download] [--timerange TIMERANGE] [--dl-trades] [--convert] [--exchange EXCHANGE] [-t TIMEFRAMES [TIMEFRAMES ...]] [--erase] @@ -24,6 +25,9 @@ options: Default: `None`. --include-inactive-pairs Also download data from inactive pairs. + --no-parallel-download + Disable parallel startup download. Only use this if + you experience issues. --timerange TIMERANGE Specify what timerange of data to use. --dl-trades Download trades instead of OHLCV data. diff --git a/freqtrade/commands/arguments.py b/freqtrade/commands/arguments.py index f96148e3c..17d8f43d7 100755 --- a/freqtrade/commands/arguments.py +++ b/freqtrade/commands/arguments.py @@ -164,6 +164,7 @@ ARGS_DOWNLOAD_DATA = [ "days", "new_pairs_days", "include_inactive", + "no_parallel_download", "timerange", "download_trades", "convert_trades", diff --git a/freqtrade/commands/cli_options.py b/freqtrade/commands/cli_options.py index b4641597d..62a007f59 100755 --- a/freqtrade/commands/cli_options.py +++ b/freqtrade/commands/cli_options.py @@ -454,6 +454,11 @@ AVAILABLE_CLI_OPTIONS = { help="Also download data from inactive pairs.", action="store_true", ), + "no_parallel_download": Arg( + "--no-parallel-download", + help="Disable parallel startup download. Only use this if you experience issues.", + action="store_true", + ), "new_pairs_days": Arg( "--new-pairs-days", help="Download data of new pairs for given number of days. Default: `%(default)s`.", diff --git a/freqtrade/configuration/configuration.py b/freqtrade/configuration/configuration.py index 5f1715d9a..7bf448335 100644 --- a/freqtrade/configuration/configuration.py +++ b/freqtrade/configuration/configuration.py @@ -391,6 +391,7 @@ class Configuration: ("timeframes", "timeframes --timeframes: {}"), ("days", "Detected --days: {}"), ("include_inactive", "Detected --include-inactive-pairs: {}"), + ("no_parallel_download", "Detected --no-parallel-download: {}"), ("download_trades", "Detected --dl-trades: {}"), ("convert_trades", "Detected --convert: {} - Converting Trade data to OHCV {}"), ("dataformat_ohlcv", 'Using "{}" to store OHLCV data.'), diff --git a/freqtrade/data/history/history_utils.py b/freqtrade/data/history/history_utils.py index 9cb14e3e0..3b93d2bdc 100644 --- a/freqtrade/data/history/history_utils.py +++ b/freqtrade/data/history/history_utils.py @@ -6,7 +6,14 @@ from pathlib import Path from pandas import DataFrame, concat from freqtrade.configuration import TimeRange -from freqtrade.constants import DATETIME_PRINT_FORMAT, DL_DATA_TIMEFRAMES, DOCS_LINK, Config +from freqtrade.constants import ( + DATETIME_PRINT_FORMAT, + DL_DATA_TIMEFRAMES, + DOCS_LINK, + Config, + ListPairsWithTimeframes, + PairWithTimeframe, +) from freqtrade.data.converter import ( clean_ohlcv_dataframe, convert_trades_to_ohlcv, @@ -17,6 +24,7 @@ from freqtrade.data.history.datahandlers import IDataHandler, get_datahandler from freqtrade.enums import CandleType, TradingMode from freqtrade.exceptions import OperationalException from freqtrade.exchange import Exchange +from freqtrade.exchange.exchange_utils import date_minus_candles from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist from freqtrade.util import dt_now, dt_ts, format_ms_time, format_ms_time_det from freqtrade.util.migrations import migrate_data @@ -226,6 +234,7 @@ def _download_pair_history( candle_type: CandleType, erase: bool = False, prepend: bool = False, + pair_candles: DataFrame | None = None, ) -> bool: """ Download latest candles from the exchange for the pair and timeframe passed in parameters @@ -238,6 +247,7 @@ def _download_pair_history( :param timerange: range of time to download :param candle_type: Any of the enum CandleType (must match trading mode!) :param erase: Erase existing data + :param pair_candles: Optional with "1 call" pair candles. :return: bool with success state """ data_handler = get_datahandler(datadir, data_handler=data_handler) @@ -271,21 +281,40 @@ def _download_pair_history( "Current End: %s", f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None", ) - - # Default since_ms to 30 days if nothing is given - new_dataframe = exchange.get_historic_ohlcv( - pair=pair, - timeframe=timeframe, - since_ms=( - since_ms - if since_ms - else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000 - ), - is_new_pair=data.empty, - candle_type=candle_type, - until_ms=until_ms if until_ms else None, + # used to check if the passed in pair_candles (parallel downloaded) covers since_ms. + # If we need more data, we have to fall back to the standard method. + pair_candles_since_ms = ( + dt_ts(pair_candles.iloc[0]["date"]) + if pair_candles is not None and len(pair_candles.index) > 0 + else 0 ) - logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.") + if ( + pair_candles is None + or len(pair_candles.index) == 0 + or data.empty + or prepend is True + or erase is True + or pair_candles_since_ms > (since_ms if since_ms else 0) + ): + new_dataframe = exchange.get_historic_ohlcv( + pair=pair, + timeframe=timeframe, + since_ms=( + since_ms + if since_ms + else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000 + ), + is_new_pair=data.empty, + candle_type=candle_type, + until_ms=until_ms if until_ms else None, + ) + logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.") + else: + new_dataframe = pair_candles + logger.info( + f"Downloaded data for {pair} with length {len(new_dataframe)}. Parallel Method." + ) + if data.empty: data = new_dataframe else: @@ -330,6 +359,7 @@ def refresh_backtest_ohlcv_data( data_format: str | None = None, prepend: bool = False, progress_tracker: CustomProgress | None = None, + no_parallel_download: bool = False, ) -> list[str]: """ Refresh stored ohlcv data for backtesting and hyperopt operations. @@ -339,6 +369,7 @@ def refresh_backtest_ohlcv_data( progress_tracker = retrieve_progress_tracker(progress_tracker) pairs_not_available = [] + fast_candles: dict[PairWithTimeframe, DataFrame] = {} data_handler = get_datahandler(datadir, data_format) candle_type = CandleType.get_default(trading_mode) with progress_tracker as progress: @@ -355,6 +386,30 @@ def refresh_backtest_ohlcv_data( logger.info(f"Skipping pair {pair}...") continue for timeframe in timeframes: + # Get fast candles via parallel method on first loop through per timeframe + # and candle type. Downloads all the pairs in the list and stores them. + if ( + not no_parallel_download + and exchange.get_option("download_data_parallel_quick", True) + and ( + ((pair, timeframe, candle_type) not in fast_candles) + and (erase is False) + and (prepend is False) + ) + ): + fast_candles.update( + _download_all_pairs_history_parallel( + exchange=exchange, + pairs=pairs, + timeframe=timeframe, + candle_type=candle_type, + timerange=timerange, + ) + ) + + # get the already downloaded pair candles if they exist + pair_candles = fast_candles.pop((pair, timeframe, candle_type), None) + progress.update(timeframe_task, description=f"Timeframe {timeframe}") logger.debug(f"Downloading pair {pair}, {candle_type}, interval {timeframe}.") _download_pair_history( @@ -368,6 +423,7 @@ def refresh_backtest_ohlcv_data( candle_type=candle_type, erase=erase, prepend=prepend, + pair_candles=pair_candles, # optional pass of dataframe of parallel candles ) progress.update(timeframe_task, advance=1) if trading_mode == "futures": @@ -404,6 +460,41 @@ def refresh_backtest_ohlcv_data( return pairs_not_available +def _download_all_pairs_history_parallel( + exchange: Exchange, + pairs: list[str], + timeframe: str, + candle_type: CandleType, + timerange: TimeRange | None = None, +) -> dict[PairWithTimeframe, DataFrame]: + """ + Allows to use the faster parallel async download method for many coins + but only if the data is short enough to be retrieved in one call. + Used by freqtrade download-data subcommand. + :return: Candle pairs with timeframes + """ + candles: dict[PairWithTimeframe, DataFrame] = {} + since = 0 + if timerange: + if timerange.starttype == "date": + since = timerange.startts * 1000 + + candle_limit = exchange.ohlcv_candle_limit(timeframe, candle_type) + one_call_min_time_dt = dt_ts(date_minus_candles(timeframe, candle_limit)) + # check if we can get all candles in one go, if so then we can download them in parallel + if since > one_call_min_time_dt: + logger.info( + f"Downloading parallel candles for {timeframe} for all pairs " + f"since {format_ms_time(since)}" + ) + needed_pairs: ListPairsWithTimeframes = [ + (p, timeframe, candle_type) for p in [p for p in pairs] + ] + candles = exchange.refresh_latest_ohlcv(needed_pairs, since_ms=since, cache=False) + + return candles + + def _download_trades_history( exchange: Exchange, pair: str, @@ -702,6 +793,7 @@ def download_data( trading_mode=config.get("trading_mode", "spot"), prepend=config.get("prepend_data", False), progress_tracker=progress_tracker, + no_parallel_download=config.get("no_parallel_download", False), ) finally: if pairs_not_available: diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 17353e8dd..e56276c36 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -138,6 +138,7 @@ class Exchange: "ohlcv_has_history": True, # Some exchanges (Kraken) don't provide history via ohlcv "ohlcv_partial_candle": True, "ohlcv_require_since": False, + "download_data_parallel_quick": True, "always_require_api_keys": False, # purge API keys for Dry-run. Must default to false. # Check https://github.com/ccxt/ccxt/issues/10767 for removal of ohlcv_volume_currency "ohlcv_volume_currency": "base", # "base" or "quote" diff --git a/freqtrade/exchange/exchange_types.py b/freqtrade/exchange/exchange_types.py index 4859b9cf9..cd2bd0059 100644 --- a/freqtrade/exchange/exchange_types.py +++ b/freqtrade/exchange/exchange_types.py @@ -28,6 +28,8 @@ class FtHas(TypedDict, total=False): ohlcv_volume_currency: str ohlcv_candle_limit_per_timeframe: dict[str, int] always_require_api_keys: bool + # allow disabling of parallel download-data for specific exchanges + download_data_parallel_quick: bool # Tickers tickers_have_quoteVolume: bool tickers_have_percentage: bool diff --git a/freqtrade/exchange/hyperliquid.py b/freqtrade/exchange/hyperliquid.py index a880a60df..74f7c5694 100644 --- a/freqtrade/exchange/hyperliquid.py +++ b/freqtrade/exchange/hyperliquid.py @@ -28,6 +28,7 @@ class Hyperliquid(Exchange): "stoploss_on_exchange": False, "exchange_has_overrides": {"fetchTrades": False}, "marketOrderRequiresPrice": True, + "download_data_parallel_quick": False, "ws_enabled": True, } _ft_has_futures: FtHas = { diff --git a/tests/data/test_history.py b/tests/data/test_history.py index 5554e04b3..de216eec9 100644 --- a/tests/data/test_history.py +++ b/tests/data/test_history.py @@ -18,6 +18,7 @@ from freqtrade.data.converter import ohlcv_to_dataframe from freqtrade.data.history import get_datahandler from freqtrade.data.history.datahandlers.jsondatahandler import JsonDataHandler, JsonGzDataHandler from freqtrade.data.history.history_utils import ( + _download_all_pairs_history_parallel, _download_pair_history, _download_trades_history, _load_cached_data_for_updating, @@ -545,6 +546,14 @@ def test_refresh_backtest_ohlcv_data( ): caplog.set_level(logging.DEBUG) dl_mock = mocker.patch("freqtrade.data.history.history_utils._download_pair_history") + + def parallel_mock(pairs, timeframe, candle_type, **kwargs): + return {(pair, timeframe, candle_type): DataFrame() for pair in pairs} + + parallel_mock = mocker.patch( + "freqtrade.data.history.history_utils._download_all_pairs_history_parallel", + side_effect=parallel_mock, + ) mocker.patch(f"{EXMS}.markets", PropertyMock(return_value=markets)) mocker.patch.object(Path, "exists", MagicMock(return_value=True)) @@ -559,10 +568,12 @@ def test_refresh_backtest_ohlcv_data( timeframes=["1m", "5m"], datadir=testdatadir, timerange=timerange, - erase=True, + erase=False, trading_mode=trademode, ) + # Called once per timeframe (as we return an empty dataframe) + assert parallel_mock.call_count == 2 assert dl_mock.call_count == callcount assert dl_mock.call_args[1]["timerange"].starttype == "date" @@ -699,3 +710,256 @@ def test_download_trades_history( assert ght_mock.call_count == 0 _clean_test_file(file2) + + +def test_download_all_pairs_history_parallel(mocker, default_conf_usdt): + pairs = ["PAIR1/BTC", "PAIR2/USDT"] + timeframe = "5m" + candle_type = CandleType.SPOT + + df1 = DataFrame( + { + "date": [1, 2], + "open": [1, 2], + "close": [1, 2], + "high": [1, 2], + "low": [1, 2], + "volume": [1, 2], + } + ) + df2 = DataFrame( + { + "date": [3, 4], + "open": [3, 4], + "close": [3, 4], + "high": [3, 4], + "low": [3, 4], + "volume": [3, 4], + } + ) + expected = { + ("PAIR1/BTC", timeframe, candle_type): df1, + ("PAIR2/USDT", timeframe, candle_type): df2, + } + # Mock exchange + mocker.patch.multiple( + EXMS, + exchange_has=MagicMock(return_value=True), + ohlcv_candle_limit=MagicMock(return_value=1000), + refresh_latest_ohlcv=MagicMock(return_value=expected), + ) + exchange = get_patched_exchange(mocker, default_conf_usdt) + # timerange with starttype 'date' and startts far in the future to trigger parallel download + + timerange = TimeRange("date", None, 9999999999, 0) + result = _download_all_pairs_history_parallel( + exchange=exchange, + pairs=pairs, + timeframe=timeframe, + candle_type=candle_type, + timerange=timerange, + ) + assert result == expected + + assert exchange.ohlcv_candle_limit.call_args[0] == (timeframe, candle_type) + assert exchange.refresh_latest_ohlcv.call_count == 1 + + # If since is not after one_call_min_time_dt, should not call refresh_latest_ohlcv + exchange.refresh_latest_ohlcv.reset_mock() + timerange2 = TimeRange("date", None, 0, 0) + result2 = _download_all_pairs_history_parallel( + exchange=exchange, + pairs=pairs, + timeframe=timeframe, + candle_type=candle_type, + timerange=timerange2, + ) + assert result2 == {} + assert exchange.refresh_latest_ohlcv.call_count == 0 + + exchange.refresh_latest_ohlcv.reset_mock() + + # Test without timerange + result3 = _download_all_pairs_history_parallel( + exchange=exchange, + pairs=pairs, + timeframe=timeframe, + candle_type=candle_type, + timerange=None, + ) + assert result3 == {} + assert exchange.refresh_latest_ohlcv.call_count == 0 + + +def test_download_pair_history_with_pair_candles(mocker, default_conf, tmp_path, caplog) -> None: + """ + Test _download_pair_history with pair_candles parameter (parallel method). + """ + exchange = get_patched_exchange(mocker, default_conf) + + # Create test data for existing cached data + existing_data = DataFrame( + { + "date": [dt_utc(2018, 1, 10, 10, 0), dt_utc(2018, 1, 10, 10, 5)], + "open": [1.0, 1.15], + "high": [1.1, 1.2], + "low": [0.9, 1.1], + "close": [1.05, 1.15], + "volume": [100, 150], + } + ) + + # Create pair_candles data that will be used instead of exchange download + # This data should start before or at the same time as since_ms to trigger the else branch + pair_candles_data = DataFrame( + { + "date": [ + dt_utc(2018, 1, 10, 10, 5), + dt_utc(2018, 1, 10, 10, 10), + dt_utc(2018, 1, 10, 10, 15), + ], + "open": [1.15, 1.2, 1.25], + "high": [1.25, 1.3, 1.35], + "low": [1.1, 1.15, 1.2], + "close": [1.2, 1.25, 1.3], + "volume": [200, 250, 300], + } + ) + + # Mock the data handler to return existing cached data + data_handler_mock = MagicMock() + data_handler_mock.ohlcv_load.return_value = existing_data + data_handler_mock.ohlcv_store = MagicMock() + mocker.patch( + "freqtrade.data.history.history_utils.get_datahandler", return_value=data_handler_mock + ) + + # Mock _load_cached_data_for_updating to return existing data and since_ms + since_ms = dt_ts(dt_utc(2018, 1, 10, 10, 5)) # Time of last existing candle + mocker.patch( + "freqtrade.data.history.history_utils._load_cached_data_for_updating", + return_value=(existing_data, since_ms, None), + ) + + # Mock clean_ohlcv_dataframe to return concatenated data + expected_result = DataFrame( + { + "date": [ + dt_utc(2018, 1, 10, 10, 0), + dt_utc(2018, 1, 10, 10, 5), + dt_utc(2018, 1, 10, 10, 10), + dt_utc(2018, 1, 10, 10, 15), + ], + "open": [1.0, 1.15, 1.2, 1.25], + "high": [1.1, 1.25, 1.3, 1.35], + "low": [0.9, 1.1, 1.15, 1.2], + "close": [1.05, 1.2, 1.25, 1.3], + "volume": [100, 200, 250, 300], + } + ) + + get_historic_ohlcv_mock = MagicMock() + mocker.patch.object(exchange, "get_historic_ohlcv", get_historic_ohlcv_mock) + + # Call _download_pair_history with pre-loaded pair_candles + result = _download_pair_history( + datadir=tmp_path, + exchange=exchange, + pair="TEST/BTC", + timeframe="5m", + candle_type=CandleType.SPOT, + pair_candles=pair_candles_data, + ) + + # Verify the function succeeded + assert result is True + + # Verify that exchange.get_historic_ohlcv was NOT called (parallel method was used) + assert get_historic_ohlcv_mock.call_count == 0 + + # Verify the log message indicating parallel method was used (line 315-316) + assert log_has("Downloaded data for TEST/BTC with length 3. Parallel Method.", caplog) + + # Verify data was stored + assert data_handler_mock.ohlcv_store.call_count == 1 + stored_data = data_handler_mock.ohlcv_store.call_args_list[0][1]["data"] + assert stored_data.equals(expected_result) + assert len(stored_data) == 4 + + +def test_download_pair_history_with_pair_candles_no_overlap( + mocker, default_conf, tmp_path, caplog +) -> None: + exchange = get_patched_exchange(mocker, default_conf) + + # Create test data for existing cached data + existing_data = DataFrame( + { + "date": [dt_utc(2018, 1, 10, 10, 0), dt_utc(2018, 1, 10, 10, 5)], + "open": [1.0, 1.1], + "high": [1.1, 1.2], + "low": [0.9, 1.0], + "close": [1.05, 1.15], + "volume": [100, 150], + } + ) + + # Create pair_candles data that will be used instead of exchange download + # This data should start before or at the same time as since_ms to trigger the else branch + pair_candles_data = DataFrame( + { + "date": [ + dt_utc(2018, 1, 10, 10, 10), + dt_utc(2018, 1, 10, 10, 15), + dt_utc(2018, 1, 10, 10, 20), + ], + "open": [1.15, 1.2, 1.25], + "high": [1.25, 1.3, 1.35], + "low": [1.1, 1.15, 1.2], + "close": [1.2, 1.25, 1.3], + "volume": [200, 250, 300], + } + ) + + # Mock the data handler to return existing cached data + data_handler_mock = MagicMock() + data_handler_mock.ohlcv_load.return_value = existing_data + data_handler_mock.ohlcv_store = MagicMock() + mocker.patch( + "freqtrade.data.history.history_utils.get_datahandler", return_value=data_handler_mock + ) + + # Mock _load_cached_data_for_updating to return existing data and since_ms + since_ms = dt_ts(dt_utc(2018, 1, 10, 10, 5)) # Time of last existing candle + mocker.patch( + "freqtrade.data.history.history_utils._load_cached_data_for_updating", + return_value=(existing_data, since_ms, None), + ) + + get_historic_ohlcv_mock = MagicMock(return_value=DataFrame()) + mocker.patch.object(exchange, "get_historic_ohlcv", get_historic_ohlcv_mock) + + # Call _download_pair_history with pre-loaded pair_candles + result = _download_pair_history( + datadir=tmp_path, + exchange=exchange, + pair="TEST/BTC", + timeframe="5m", + candle_type=CandleType.SPOT, + pair_candles=pair_candles_data, + ) + + # Verify the function succeeded + assert result is True + + # Verify that exchange.get_historic_ohlcv was NOT called (parallel method was used) + assert get_historic_ohlcv_mock.call_count == 1 + + # Verify the log message indicating parallel method was used (line 315-316) + assert not log_has_re(r"Downloaded .* Parallel Method.", caplog) + + # Verify data was stored + assert data_handler_mock.ohlcv_store.call_count == 1 + stored_data = data_handler_mock.ohlcv_store.call_args_list[0][1]["data"] + assert stored_data.equals(existing_data) + assert len(stored_data) == 2