Merge pull request #11625 from alisalama/develop

Parallelisation of iterative data downloads for speed improvement
This commit is contained in:
Matthias
2025-09-07 19:31:52 +02:00
committed by GitHub
9 changed files with 387 additions and 16 deletions

View File

@@ -4,6 +4,7 @@ usage: freqtrade download-data [-h] [-v] [--no-color] [--logfile FILE] [-V]
[-p PAIRS [PAIRS ...]] [--pairs-file FILE]
[--days INT] [--new-pairs-days INT]
[--include-inactive-pairs]
[--no-parallel-download]
[--timerange TIMERANGE] [--dl-trades]
[--convert] [--exchange EXCHANGE]
[-t TIMEFRAMES [TIMEFRAMES ...]] [--erase]
@@ -24,6 +25,9 @@ options:
Default: `None`.
--include-inactive-pairs
Also download data from inactive pairs.
--no-parallel-download
Disable parallel startup download. Only use this if
you experience issues.
--timerange TIMERANGE
Specify what timerange of data to use.
--dl-trades Download trades instead of OHLCV data.

View File

@@ -164,6 +164,7 @@ ARGS_DOWNLOAD_DATA = [
"days",
"new_pairs_days",
"include_inactive",
"no_parallel_download",
"timerange",
"download_trades",
"convert_trades",

View File

@@ -454,6 +454,11 @@ AVAILABLE_CLI_OPTIONS = {
help="Also download data from inactive pairs.",
action="store_true",
),
"no_parallel_download": Arg(
"--no-parallel-download",
help="Disable parallel startup download. Only use this if you experience issues.",
action="store_true",
),
"new_pairs_days": Arg(
"--new-pairs-days",
help="Download data of new pairs for given number of days. Default: `%(default)s`.",

View File

@@ -391,6 +391,7 @@ class Configuration:
("timeframes", "timeframes --timeframes: {}"),
("days", "Detected --days: {}"),
("include_inactive", "Detected --include-inactive-pairs: {}"),
("no_parallel_download", "Detected --no-parallel-download: {}"),
("download_trades", "Detected --dl-trades: {}"),
("convert_trades", "Detected --convert: {} - Converting Trade data to OHCV {}"),
("dataformat_ohlcv", 'Using "{}" to store OHLCV data.'),

View File

@@ -6,7 +6,14 @@ from pathlib import Path
from pandas import DataFrame, concat
from freqtrade.configuration import TimeRange
from freqtrade.constants import DATETIME_PRINT_FORMAT, DL_DATA_TIMEFRAMES, DOCS_LINK, Config
from freqtrade.constants import (
DATETIME_PRINT_FORMAT,
DL_DATA_TIMEFRAMES,
DOCS_LINK,
Config,
ListPairsWithTimeframes,
PairWithTimeframe,
)
from freqtrade.data.converter import (
clean_ohlcv_dataframe,
convert_trades_to_ohlcv,
@@ -17,6 +24,7 @@ from freqtrade.data.history.datahandlers import IDataHandler, get_datahandler
from freqtrade.enums import CandleType, TradingMode
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import Exchange
from freqtrade.exchange.exchange_utils import date_minus_candles
from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist
from freqtrade.util import dt_now, dt_ts, format_ms_time, format_ms_time_det
from freqtrade.util.migrations import migrate_data
@@ -226,6 +234,7 @@ def _download_pair_history(
candle_type: CandleType,
erase: bool = False,
prepend: bool = False,
pair_candles: DataFrame | None = None,
) -> bool:
"""
Download latest candles from the exchange for the pair and timeframe passed in parameters
@@ -238,6 +247,7 @@ def _download_pair_history(
:param timerange: range of time to download
:param candle_type: Any of the enum CandleType (must match trading mode!)
:param erase: Erase existing data
:param pair_candles: Optional with "1 call" pair candles.
:return: bool with success state
"""
data_handler = get_datahandler(datadir, data_handler=data_handler)
@@ -271,21 +281,40 @@ def _download_pair_history(
"Current End: %s",
f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None",
)
# Default since_ms to 30 days if nothing is given
new_dataframe = exchange.get_historic_ohlcv(
pair=pair,
timeframe=timeframe,
since_ms=(
since_ms
if since_ms
else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000
),
is_new_pair=data.empty,
candle_type=candle_type,
until_ms=until_ms if until_ms else None,
# used to check if the passed in pair_candles (parallel downloaded) covers since_ms.
# If we need more data, we have to fall back to the standard method.
pair_candles_since_ms = (
dt_ts(pair_candles.iloc[0]["date"])
if pair_candles is not None and len(pair_candles.index) > 0
else 0
)
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
if (
pair_candles is None
or len(pair_candles.index) == 0
or data.empty
or prepend is True
or erase is True
or pair_candles_since_ms > (since_ms if since_ms else 0)
):
new_dataframe = exchange.get_historic_ohlcv(
pair=pair,
timeframe=timeframe,
since_ms=(
since_ms
if since_ms
else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000
),
is_new_pair=data.empty,
candle_type=candle_type,
until_ms=until_ms if until_ms else None,
)
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
else:
new_dataframe = pair_candles
logger.info(
f"Downloaded data for {pair} with length {len(new_dataframe)}. Parallel Method."
)
if data.empty:
data = new_dataframe
else:
@@ -330,6 +359,7 @@ def refresh_backtest_ohlcv_data(
data_format: str | None = None,
prepend: bool = False,
progress_tracker: CustomProgress | None = None,
no_parallel_download: bool = False,
) -> list[str]:
"""
Refresh stored ohlcv data for backtesting and hyperopt operations.
@@ -339,6 +369,7 @@ def refresh_backtest_ohlcv_data(
progress_tracker = retrieve_progress_tracker(progress_tracker)
pairs_not_available = []
fast_candles: dict[PairWithTimeframe, DataFrame] = {}
data_handler = get_datahandler(datadir, data_format)
candle_type = CandleType.get_default(trading_mode)
with progress_tracker as progress:
@@ -355,6 +386,30 @@ def refresh_backtest_ohlcv_data(
logger.info(f"Skipping pair {pair}...")
continue
for timeframe in timeframes:
# Get fast candles via parallel method on first loop through per timeframe
# and candle type. Downloads all the pairs in the list and stores them.
if (
not no_parallel_download
and exchange.get_option("download_data_parallel_quick", True)
and (
((pair, timeframe, candle_type) not in fast_candles)
and (erase is False)
and (prepend is False)
)
):
fast_candles.update(
_download_all_pairs_history_parallel(
exchange=exchange,
pairs=pairs,
timeframe=timeframe,
candle_type=candle_type,
timerange=timerange,
)
)
# get the already downloaded pair candles if they exist
pair_candles = fast_candles.pop((pair, timeframe, candle_type), None)
progress.update(timeframe_task, description=f"Timeframe {timeframe}")
logger.debug(f"Downloading pair {pair}, {candle_type}, interval {timeframe}.")
_download_pair_history(
@@ -368,6 +423,7 @@ def refresh_backtest_ohlcv_data(
candle_type=candle_type,
erase=erase,
prepend=prepend,
pair_candles=pair_candles, # optional pass of dataframe of parallel candles
)
progress.update(timeframe_task, advance=1)
if trading_mode == "futures":
@@ -404,6 +460,41 @@ def refresh_backtest_ohlcv_data(
return pairs_not_available
def _download_all_pairs_history_parallel(
exchange: Exchange,
pairs: list[str],
timeframe: str,
candle_type: CandleType,
timerange: TimeRange | None = None,
) -> dict[PairWithTimeframe, DataFrame]:
"""
Allows to use the faster parallel async download method for many coins
but only if the data is short enough to be retrieved in one call.
Used by freqtrade download-data subcommand.
:return: Candle pairs with timeframes
"""
candles: dict[PairWithTimeframe, DataFrame] = {}
since = 0
if timerange:
if timerange.starttype == "date":
since = timerange.startts * 1000
candle_limit = exchange.ohlcv_candle_limit(timeframe, candle_type)
one_call_min_time_dt = dt_ts(date_minus_candles(timeframe, candle_limit))
# check if we can get all candles in one go, if so then we can download them in parallel
if since > one_call_min_time_dt:
logger.info(
f"Downloading parallel candles for {timeframe} for all pairs "
f"since {format_ms_time(since)}"
)
needed_pairs: ListPairsWithTimeframes = [
(p, timeframe, candle_type) for p in [p for p in pairs]
]
candles = exchange.refresh_latest_ohlcv(needed_pairs, since_ms=since, cache=False)
return candles
def _download_trades_history(
exchange: Exchange,
pair: str,
@@ -702,6 +793,7 @@ def download_data(
trading_mode=config.get("trading_mode", "spot"),
prepend=config.get("prepend_data", False),
progress_tracker=progress_tracker,
no_parallel_download=config.get("no_parallel_download", False),
)
finally:
if pairs_not_available:

View File

@@ -138,6 +138,7 @@ class Exchange:
"ohlcv_has_history": True, # Some exchanges (Kraken) don't provide history via ohlcv
"ohlcv_partial_candle": True,
"ohlcv_require_since": False,
"download_data_parallel_quick": True,
"always_require_api_keys": False, # purge API keys for Dry-run. Must default to false.
# Check https://github.com/ccxt/ccxt/issues/10767 for removal of ohlcv_volume_currency
"ohlcv_volume_currency": "base", # "base" or "quote"

View File

@@ -28,6 +28,8 @@ class FtHas(TypedDict, total=False):
ohlcv_volume_currency: str
ohlcv_candle_limit_per_timeframe: dict[str, int]
always_require_api_keys: bool
# allow disabling of parallel download-data for specific exchanges
download_data_parallel_quick: bool
# Tickers
tickers_have_quoteVolume: bool
tickers_have_percentage: bool

View File

@@ -28,6 +28,7 @@ class Hyperliquid(Exchange):
"stoploss_on_exchange": False,
"exchange_has_overrides": {"fetchTrades": False},
"marketOrderRequiresPrice": True,
"download_data_parallel_quick": False,
"ws_enabled": True,
}
_ft_has_futures: FtHas = {

View File

@@ -18,6 +18,7 @@ from freqtrade.data.converter import ohlcv_to_dataframe
from freqtrade.data.history import get_datahandler
from freqtrade.data.history.datahandlers.jsondatahandler import JsonDataHandler, JsonGzDataHandler
from freqtrade.data.history.history_utils import (
_download_all_pairs_history_parallel,
_download_pair_history,
_download_trades_history,
_load_cached_data_for_updating,
@@ -545,6 +546,14 @@ def test_refresh_backtest_ohlcv_data(
):
caplog.set_level(logging.DEBUG)
dl_mock = mocker.patch("freqtrade.data.history.history_utils._download_pair_history")
def parallel_mock(pairs, timeframe, candle_type, **kwargs):
return {(pair, timeframe, candle_type): DataFrame() for pair in pairs}
parallel_mock = mocker.patch(
"freqtrade.data.history.history_utils._download_all_pairs_history_parallel",
side_effect=parallel_mock,
)
mocker.patch(f"{EXMS}.markets", PropertyMock(return_value=markets))
mocker.patch.object(Path, "exists", MagicMock(return_value=True))
@@ -559,10 +568,12 @@ def test_refresh_backtest_ohlcv_data(
timeframes=["1m", "5m"],
datadir=testdatadir,
timerange=timerange,
erase=True,
erase=False,
trading_mode=trademode,
)
# Called once per timeframe (as we return an empty dataframe)
assert parallel_mock.call_count == 2
assert dl_mock.call_count == callcount
assert dl_mock.call_args[1]["timerange"].starttype == "date"
@@ -699,3 +710,256 @@ def test_download_trades_history(
assert ght_mock.call_count == 0
_clean_test_file(file2)
def test_download_all_pairs_history_parallel(mocker, default_conf_usdt):
pairs = ["PAIR1/BTC", "PAIR2/USDT"]
timeframe = "5m"
candle_type = CandleType.SPOT
df1 = DataFrame(
{
"date": [1, 2],
"open": [1, 2],
"close": [1, 2],
"high": [1, 2],
"low": [1, 2],
"volume": [1, 2],
}
)
df2 = DataFrame(
{
"date": [3, 4],
"open": [3, 4],
"close": [3, 4],
"high": [3, 4],
"low": [3, 4],
"volume": [3, 4],
}
)
expected = {
("PAIR1/BTC", timeframe, candle_type): df1,
("PAIR2/USDT", timeframe, candle_type): df2,
}
# Mock exchange
mocker.patch.multiple(
EXMS,
exchange_has=MagicMock(return_value=True),
ohlcv_candle_limit=MagicMock(return_value=1000),
refresh_latest_ohlcv=MagicMock(return_value=expected),
)
exchange = get_patched_exchange(mocker, default_conf_usdt)
# timerange with starttype 'date' and startts far in the future to trigger parallel download
timerange = TimeRange("date", None, 9999999999, 0)
result = _download_all_pairs_history_parallel(
exchange=exchange,
pairs=pairs,
timeframe=timeframe,
candle_type=candle_type,
timerange=timerange,
)
assert result == expected
assert exchange.ohlcv_candle_limit.call_args[0] == (timeframe, candle_type)
assert exchange.refresh_latest_ohlcv.call_count == 1
# If since is not after one_call_min_time_dt, should not call refresh_latest_ohlcv
exchange.refresh_latest_ohlcv.reset_mock()
timerange2 = TimeRange("date", None, 0, 0)
result2 = _download_all_pairs_history_parallel(
exchange=exchange,
pairs=pairs,
timeframe=timeframe,
candle_type=candle_type,
timerange=timerange2,
)
assert result2 == {}
assert exchange.refresh_latest_ohlcv.call_count == 0
exchange.refresh_latest_ohlcv.reset_mock()
# Test without timerange
result3 = _download_all_pairs_history_parallel(
exchange=exchange,
pairs=pairs,
timeframe=timeframe,
candle_type=candle_type,
timerange=None,
)
assert result3 == {}
assert exchange.refresh_latest_ohlcv.call_count == 0
def test_download_pair_history_with_pair_candles(mocker, default_conf, tmp_path, caplog) -> None:
"""
Test _download_pair_history with pair_candles parameter (parallel method).
"""
exchange = get_patched_exchange(mocker, default_conf)
# Create test data for existing cached data
existing_data = DataFrame(
{
"date": [dt_utc(2018, 1, 10, 10, 0), dt_utc(2018, 1, 10, 10, 5)],
"open": [1.0, 1.15],
"high": [1.1, 1.2],
"low": [0.9, 1.1],
"close": [1.05, 1.15],
"volume": [100, 150],
}
)
# Create pair_candles data that will be used instead of exchange download
# This data should start before or at the same time as since_ms to trigger the else branch
pair_candles_data = DataFrame(
{
"date": [
dt_utc(2018, 1, 10, 10, 5),
dt_utc(2018, 1, 10, 10, 10),
dt_utc(2018, 1, 10, 10, 15),
],
"open": [1.15, 1.2, 1.25],
"high": [1.25, 1.3, 1.35],
"low": [1.1, 1.15, 1.2],
"close": [1.2, 1.25, 1.3],
"volume": [200, 250, 300],
}
)
# Mock the data handler to return existing cached data
data_handler_mock = MagicMock()
data_handler_mock.ohlcv_load.return_value = existing_data
data_handler_mock.ohlcv_store = MagicMock()
mocker.patch(
"freqtrade.data.history.history_utils.get_datahandler", return_value=data_handler_mock
)
# Mock _load_cached_data_for_updating to return existing data and since_ms
since_ms = dt_ts(dt_utc(2018, 1, 10, 10, 5)) # Time of last existing candle
mocker.patch(
"freqtrade.data.history.history_utils._load_cached_data_for_updating",
return_value=(existing_data, since_ms, None),
)
# Mock clean_ohlcv_dataframe to return concatenated data
expected_result = DataFrame(
{
"date": [
dt_utc(2018, 1, 10, 10, 0),
dt_utc(2018, 1, 10, 10, 5),
dt_utc(2018, 1, 10, 10, 10),
dt_utc(2018, 1, 10, 10, 15),
],
"open": [1.0, 1.15, 1.2, 1.25],
"high": [1.1, 1.25, 1.3, 1.35],
"low": [0.9, 1.1, 1.15, 1.2],
"close": [1.05, 1.2, 1.25, 1.3],
"volume": [100, 200, 250, 300],
}
)
get_historic_ohlcv_mock = MagicMock()
mocker.patch.object(exchange, "get_historic_ohlcv", get_historic_ohlcv_mock)
# Call _download_pair_history with pre-loaded pair_candles
result = _download_pair_history(
datadir=tmp_path,
exchange=exchange,
pair="TEST/BTC",
timeframe="5m",
candle_type=CandleType.SPOT,
pair_candles=pair_candles_data,
)
# Verify the function succeeded
assert result is True
# Verify that exchange.get_historic_ohlcv was NOT called (parallel method was used)
assert get_historic_ohlcv_mock.call_count == 0
# Verify the log message indicating parallel method was used (line 315-316)
assert log_has("Downloaded data for TEST/BTC with length 3. Parallel Method.", caplog)
# Verify data was stored
assert data_handler_mock.ohlcv_store.call_count == 1
stored_data = data_handler_mock.ohlcv_store.call_args_list[0][1]["data"]
assert stored_data.equals(expected_result)
assert len(stored_data) == 4
def test_download_pair_history_with_pair_candles_no_overlap(
mocker, default_conf, tmp_path, caplog
) -> None:
exchange = get_patched_exchange(mocker, default_conf)
# Create test data for existing cached data
existing_data = DataFrame(
{
"date": [dt_utc(2018, 1, 10, 10, 0), dt_utc(2018, 1, 10, 10, 5)],
"open": [1.0, 1.1],
"high": [1.1, 1.2],
"low": [0.9, 1.0],
"close": [1.05, 1.15],
"volume": [100, 150],
}
)
# Create pair_candles data that will be used instead of exchange download
# This data should start before or at the same time as since_ms to trigger the else branch
pair_candles_data = DataFrame(
{
"date": [
dt_utc(2018, 1, 10, 10, 10),
dt_utc(2018, 1, 10, 10, 15),
dt_utc(2018, 1, 10, 10, 20),
],
"open": [1.15, 1.2, 1.25],
"high": [1.25, 1.3, 1.35],
"low": [1.1, 1.15, 1.2],
"close": [1.2, 1.25, 1.3],
"volume": [200, 250, 300],
}
)
# Mock the data handler to return existing cached data
data_handler_mock = MagicMock()
data_handler_mock.ohlcv_load.return_value = existing_data
data_handler_mock.ohlcv_store = MagicMock()
mocker.patch(
"freqtrade.data.history.history_utils.get_datahandler", return_value=data_handler_mock
)
# Mock _load_cached_data_for_updating to return existing data and since_ms
since_ms = dt_ts(dt_utc(2018, 1, 10, 10, 5)) # Time of last existing candle
mocker.patch(
"freqtrade.data.history.history_utils._load_cached_data_for_updating",
return_value=(existing_data, since_ms, None),
)
get_historic_ohlcv_mock = MagicMock(return_value=DataFrame())
mocker.patch.object(exchange, "get_historic_ohlcv", get_historic_ohlcv_mock)
# Call _download_pair_history with pre-loaded pair_candles
result = _download_pair_history(
datadir=tmp_path,
exchange=exchange,
pair="TEST/BTC",
timeframe="5m",
candle_type=CandleType.SPOT,
pair_candles=pair_candles_data,
)
# Verify the function succeeded
assert result is True
# Verify that exchange.get_historic_ohlcv was NOT called (parallel method was used)
assert get_historic_ohlcv_mock.call_count == 1
# Verify the log message indicating parallel method was used (line 315-316)
assert not log_has_re(r"Downloaded .* Parallel Method.", caplog)
# Verify data was stored
assert data_handler_mock.ohlcv_store.call_count == 1
stored_data = data_handler_mock.ohlcv_store.call_args_list[0][1]["data"]
assert stored_data.equals(existing_data)
assert len(stored_data) == 2