Merge pull request #11625 from alisalama/develop

Parallelisation of iterative data downloads for speed improvement
This commit is contained in:
Matthias
2025-09-07 19:31:52 +02:00
committed by GitHub
9 changed files with 387 additions and 16 deletions

View File

@@ -18,6 +18,7 @@ from freqtrade.data.converter import ohlcv_to_dataframe
from freqtrade.data.history import get_datahandler
from freqtrade.data.history.datahandlers.jsondatahandler import JsonDataHandler, JsonGzDataHandler
from freqtrade.data.history.history_utils import (
_download_all_pairs_history_parallel,
_download_pair_history,
_download_trades_history,
_load_cached_data_for_updating,
@@ -545,6 +546,14 @@ def test_refresh_backtest_ohlcv_data(
):
caplog.set_level(logging.DEBUG)
dl_mock = mocker.patch("freqtrade.data.history.history_utils._download_pair_history")
def parallel_mock(pairs, timeframe, candle_type, **kwargs):
return {(pair, timeframe, candle_type): DataFrame() for pair in pairs}
parallel_mock = mocker.patch(
"freqtrade.data.history.history_utils._download_all_pairs_history_parallel",
side_effect=parallel_mock,
)
mocker.patch(f"{EXMS}.markets", PropertyMock(return_value=markets))
mocker.patch.object(Path, "exists", MagicMock(return_value=True))
@@ -559,10 +568,12 @@ def test_refresh_backtest_ohlcv_data(
timeframes=["1m", "5m"],
datadir=testdatadir,
timerange=timerange,
erase=True,
erase=False,
trading_mode=trademode,
)
# Called once per timeframe (as we return an empty dataframe)
assert parallel_mock.call_count == 2
assert dl_mock.call_count == callcount
assert dl_mock.call_args[1]["timerange"].starttype == "date"
@@ -699,3 +710,256 @@ def test_download_trades_history(
assert ght_mock.call_count == 0
_clean_test_file(file2)
def test_download_all_pairs_history_parallel(mocker, default_conf_usdt):
pairs = ["PAIR1/BTC", "PAIR2/USDT"]
timeframe = "5m"
candle_type = CandleType.SPOT
df1 = DataFrame(
{
"date": [1, 2],
"open": [1, 2],
"close": [1, 2],
"high": [1, 2],
"low": [1, 2],
"volume": [1, 2],
}
)
df2 = DataFrame(
{
"date": [3, 4],
"open": [3, 4],
"close": [3, 4],
"high": [3, 4],
"low": [3, 4],
"volume": [3, 4],
}
)
expected = {
("PAIR1/BTC", timeframe, candle_type): df1,
("PAIR2/USDT", timeframe, candle_type): df2,
}
# Mock exchange
mocker.patch.multiple(
EXMS,
exchange_has=MagicMock(return_value=True),
ohlcv_candle_limit=MagicMock(return_value=1000),
refresh_latest_ohlcv=MagicMock(return_value=expected),
)
exchange = get_patched_exchange(mocker, default_conf_usdt)
# timerange with starttype 'date' and startts far in the future to trigger parallel download
timerange = TimeRange("date", None, 9999999999, 0)
result = _download_all_pairs_history_parallel(
exchange=exchange,
pairs=pairs,
timeframe=timeframe,
candle_type=candle_type,
timerange=timerange,
)
assert result == expected
assert exchange.ohlcv_candle_limit.call_args[0] == (timeframe, candle_type)
assert exchange.refresh_latest_ohlcv.call_count == 1
# If since is not after one_call_min_time_dt, should not call refresh_latest_ohlcv
exchange.refresh_latest_ohlcv.reset_mock()
timerange2 = TimeRange("date", None, 0, 0)
result2 = _download_all_pairs_history_parallel(
exchange=exchange,
pairs=pairs,
timeframe=timeframe,
candle_type=candle_type,
timerange=timerange2,
)
assert result2 == {}
assert exchange.refresh_latest_ohlcv.call_count == 0
exchange.refresh_latest_ohlcv.reset_mock()
# Test without timerange
result3 = _download_all_pairs_history_parallel(
exchange=exchange,
pairs=pairs,
timeframe=timeframe,
candle_type=candle_type,
timerange=None,
)
assert result3 == {}
assert exchange.refresh_latest_ohlcv.call_count == 0
def test_download_pair_history_with_pair_candles(mocker, default_conf, tmp_path, caplog) -> None:
"""
Test _download_pair_history with pair_candles parameter (parallel method).
"""
exchange = get_patched_exchange(mocker, default_conf)
# Create test data for existing cached data
existing_data = DataFrame(
{
"date": [dt_utc(2018, 1, 10, 10, 0), dt_utc(2018, 1, 10, 10, 5)],
"open": [1.0, 1.15],
"high": [1.1, 1.2],
"low": [0.9, 1.1],
"close": [1.05, 1.15],
"volume": [100, 150],
}
)
# Create pair_candles data that will be used instead of exchange download
# This data should start before or at the same time as since_ms to trigger the else branch
pair_candles_data = DataFrame(
{
"date": [
dt_utc(2018, 1, 10, 10, 5),
dt_utc(2018, 1, 10, 10, 10),
dt_utc(2018, 1, 10, 10, 15),
],
"open": [1.15, 1.2, 1.25],
"high": [1.25, 1.3, 1.35],
"low": [1.1, 1.15, 1.2],
"close": [1.2, 1.25, 1.3],
"volume": [200, 250, 300],
}
)
# Mock the data handler to return existing cached data
data_handler_mock = MagicMock()
data_handler_mock.ohlcv_load.return_value = existing_data
data_handler_mock.ohlcv_store = MagicMock()
mocker.patch(
"freqtrade.data.history.history_utils.get_datahandler", return_value=data_handler_mock
)
# Mock _load_cached_data_for_updating to return existing data and since_ms
since_ms = dt_ts(dt_utc(2018, 1, 10, 10, 5)) # Time of last existing candle
mocker.patch(
"freqtrade.data.history.history_utils._load_cached_data_for_updating",
return_value=(existing_data, since_ms, None),
)
# Mock clean_ohlcv_dataframe to return concatenated data
expected_result = DataFrame(
{
"date": [
dt_utc(2018, 1, 10, 10, 0),
dt_utc(2018, 1, 10, 10, 5),
dt_utc(2018, 1, 10, 10, 10),
dt_utc(2018, 1, 10, 10, 15),
],
"open": [1.0, 1.15, 1.2, 1.25],
"high": [1.1, 1.25, 1.3, 1.35],
"low": [0.9, 1.1, 1.15, 1.2],
"close": [1.05, 1.2, 1.25, 1.3],
"volume": [100, 200, 250, 300],
}
)
get_historic_ohlcv_mock = MagicMock()
mocker.patch.object(exchange, "get_historic_ohlcv", get_historic_ohlcv_mock)
# Call _download_pair_history with pre-loaded pair_candles
result = _download_pair_history(
datadir=tmp_path,
exchange=exchange,
pair="TEST/BTC",
timeframe="5m",
candle_type=CandleType.SPOT,
pair_candles=pair_candles_data,
)
# Verify the function succeeded
assert result is True
# Verify that exchange.get_historic_ohlcv was NOT called (parallel method was used)
assert get_historic_ohlcv_mock.call_count == 0
# Verify the log message indicating parallel method was used (line 315-316)
assert log_has("Downloaded data for TEST/BTC with length 3. Parallel Method.", caplog)
# Verify data was stored
assert data_handler_mock.ohlcv_store.call_count == 1
stored_data = data_handler_mock.ohlcv_store.call_args_list[0][1]["data"]
assert stored_data.equals(expected_result)
assert len(stored_data) == 4
def test_download_pair_history_with_pair_candles_no_overlap(
mocker, default_conf, tmp_path, caplog
) -> None:
exchange = get_patched_exchange(mocker, default_conf)
# Create test data for existing cached data
existing_data = DataFrame(
{
"date": [dt_utc(2018, 1, 10, 10, 0), dt_utc(2018, 1, 10, 10, 5)],
"open": [1.0, 1.1],
"high": [1.1, 1.2],
"low": [0.9, 1.0],
"close": [1.05, 1.15],
"volume": [100, 150],
}
)
# Create pair_candles data that will be used instead of exchange download
# This data should start before or at the same time as since_ms to trigger the else branch
pair_candles_data = DataFrame(
{
"date": [
dt_utc(2018, 1, 10, 10, 10),
dt_utc(2018, 1, 10, 10, 15),
dt_utc(2018, 1, 10, 10, 20),
],
"open": [1.15, 1.2, 1.25],
"high": [1.25, 1.3, 1.35],
"low": [1.1, 1.15, 1.2],
"close": [1.2, 1.25, 1.3],
"volume": [200, 250, 300],
}
)
# Mock the data handler to return existing cached data
data_handler_mock = MagicMock()
data_handler_mock.ohlcv_load.return_value = existing_data
data_handler_mock.ohlcv_store = MagicMock()
mocker.patch(
"freqtrade.data.history.history_utils.get_datahandler", return_value=data_handler_mock
)
# Mock _load_cached_data_for_updating to return existing data and since_ms
since_ms = dt_ts(dt_utc(2018, 1, 10, 10, 5)) # Time of last existing candle
mocker.patch(
"freqtrade.data.history.history_utils._load_cached_data_for_updating",
return_value=(existing_data, since_ms, None),
)
get_historic_ohlcv_mock = MagicMock(return_value=DataFrame())
mocker.patch.object(exchange, "get_historic_ohlcv", get_historic_ohlcv_mock)
# Call _download_pair_history with pre-loaded pair_candles
result = _download_pair_history(
datadir=tmp_path,
exchange=exchange,
pair="TEST/BTC",
timeframe="5m",
candle_type=CandleType.SPOT,
pair_candles=pair_candles_data,
)
# Verify the function succeeded
assert result is True
# Verify that exchange.get_historic_ohlcv was NOT called (parallel method was used)
assert get_historic_ohlcv_mock.call_count == 1
# Verify the log message indicating parallel method was used (line 315-316)
assert not log_has_re(r"Downloaded .* Parallel Method.", caplog)
# Verify data was stored
assert data_handler_mock.ohlcv_store.call_count == 1
stored_data = data_handler_mock.ohlcv_store.call_args_list[0][1]["data"]
assert stored_data.equals(existing_data)
assert len(stored_data) == 2