Parallelisation of iterative data downloads

This commit is contained in:
Ali Salama
2025-04-11 16:16:07 +01:00
parent 30eb32862c
commit 559f6e2791

View File

@@ -6,7 +6,14 @@ from pathlib import Path
from pandas import DataFrame, concat
from freqtrade.configuration import TimeRange
from freqtrade.constants import DATETIME_PRINT_FORMAT, DL_DATA_TIMEFRAMES, DOCS_LINK, Config
from freqtrade.constants import (
DATETIME_PRINT_FORMAT,
DL_DATA_TIMEFRAMES,
DOCS_LINK,
Config,
ListPairsWithTimeframes,
PairWithTimeframe,
)
from freqtrade.data.converter import (
clean_ohlcv_dataframe,
convert_trades_to_ohlcv,
@@ -17,6 +24,7 @@ from freqtrade.data.history.datahandlers import IDataHandler, get_datahandler
from freqtrade.enums import CandleType, TradingMode
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import Exchange
from freqtrade.exchange.exchange_utils import date_minus_candles
from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist
from freqtrade.util import dt_now, dt_ts, format_ms_time, format_ms_time_det
from freqtrade.util.migrations import migrate_data
@@ -226,6 +234,7 @@ def _download_pair_history(
candle_type: CandleType,
erase: bool = False,
prepend: bool = False,
pair_candles: DataFrame | None = None,
) -> bool:
"""
Download latest candles from the exchange for the pair and timeframe passed in parameters
@@ -271,21 +280,40 @@ def _download_pair_history(
"Current End: %s",
f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None",
)
# Default since_ms to 30 days if nothing is given
new_dataframe = exchange.get_historic_ohlcv(
pair=pair,
timeframe=timeframe,
since_ms=(
since_ms
if since_ms
else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000
),
is_new_pair=data.empty,
candle_type=candle_type,
until_ms=until_ms if until_ms else None,
# used to check if the passed in pair_candles are not as old as since_ms
# if not then we need more data and so we will have to collect more using the typical method
pair_candles_since_ms = (
dt_ts(pair_candles.iloc[0]["date"])
if pair_candles is not None and len(pair_candles.index) > 0
else 0
)
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
if (
pair_candles is None
or len(pair_candles.index) == 0
or prepend is True
or erase is True
or pair_candles_since_ms > (since_ms if since_ms else 0)
):
# Default since_ms to 30 days if nothing is given
new_dataframe = exchange.get_historic_ohlcv(
pair=pair,
timeframe=timeframe,
since_ms=(
since_ms
if since_ms
else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000
),
is_new_pair=data.empty,
candle_type=candle_type,
until_ms=until_ms if until_ms else None,
)
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
else:
new_dataframe = pair_candles # following clean_ohlcv_dataframe can do the clean up
logger.info(
f"Downloaded data for {pair} with length {len(new_dataframe)}. Parallel Method."
)
if data.empty:
data = new_dataframe
else:
@@ -339,6 +367,7 @@ def refresh_backtest_ohlcv_data(
progress_tracker = retrieve_progress_tracker(progress_tracker)
pairs_not_available = []
fast_candles: dict[PairWithTimeframe, DataFrame] = {}
data_handler = get_datahandler(datadir, data_format)
candle_type = CandleType.get_default(trading_mode)
with progress_tracker as progress:
@@ -355,6 +384,30 @@ def refresh_backtest_ohlcv_data(
logger.info(f"Skipping pair {pair}...")
continue
for timeframe in timeframes:
# Get fast candles via parallel method on first loop through per timeframe
# and candle type. Downloads all the pairs in the list and stores them.
if (
((pair, timeframe, candle_type) not in fast_candles)
and (erase is False)
and (prepend is False)
):
fast_candles.update(
_download_all_pairs_history_parallel(
exchange=exchange,
pairs=pairs,
timeframe=timeframe,
trading_mode=trading_mode,
timerange=timerange,
)
)
# get the already downloaded pair candles if they exist
pair_candles = (
fast_candles[(pair, timeframe, candle_type)]
if (pair, timeframe, candle_type) in fast_candles
else None
)
progress.update(timeframe_task, description=f"Timeframe {timeframe}")
logger.debug(f"Downloading pair {pair}, {candle_type}, interval {timeframe}.")
_download_pair_history(
@@ -368,6 +421,7 @@ def refresh_backtest_ohlcv_data(
candle_type=candle_type,
erase=erase,
prepend=prepend,
pair_candles=pair_candles, # optional pass of dataframe of parallel candles
)
progress.update(timeframe_task, advance=1)
if trading_mode == "futures":
@@ -404,6 +458,38 @@ def refresh_backtest_ohlcv_data(
return pairs_not_available
def _download_all_pairs_history_parallel(
exchange: Exchange,
pairs: list[str],
timeframe: str,
trading_mode: str,
timerange: TimeRange | None = None,
) -> dict[PairWithTimeframe, DataFrame]:
"""
Allows us to use the faster parallel async download method for many coins
but only if the data is short enough to be retrieved in one call.
Used by freqtrade download-data subcommand.
:return: Candle pairs with timeframes
"""
candles: dict[PairWithTimeframe, DataFrame] = {}
since = 0
if timerange:
if timerange.starttype == "date":
since = timerange.startts * 1000
candle_limit = exchange.ohlcv_candle_limit(timeframe, CandleType.get_default(trading_mode))
one_call_min_time_dt = dt_ts(date_minus_candles(timeframe, candle_limit))
# check if we can get them all in one go, if so then we can download them in parallel
if since > one_call_min_time_dt:
logger.info(f"Downloading Parallel Candles for {timeframe} since {format_ms_time(since)}")
needed_pairs: ListPairsWithTimeframes = [
(p, timeframe, CandleType.get_default(trading_mode)) for p in [p for p in pairs]
]
candles = exchange.refresh_ohlcv_with_cache(needed_pairs, since)
return candles
def _download_trades_history(
exchange: Exchange,
pair: str,