mirror of
https://github.com/freqtrade/freqtrade.git
synced 2025-12-01 17:43:06 +00:00
Merge pull request #11625 from alisalama/develop
Parallelisation of iterative data downloads for speed improvement
This commit is contained in:
@@ -4,6 +4,7 @@ usage: freqtrade download-data [-h] [-v] [--no-color] [--logfile FILE] [-V]
|
||||
[-p PAIRS [PAIRS ...]] [--pairs-file FILE]
|
||||
[--days INT] [--new-pairs-days INT]
|
||||
[--include-inactive-pairs]
|
||||
[--no-parallel-download]
|
||||
[--timerange TIMERANGE] [--dl-trades]
|
||||
[--convert] [--exchange EXCHANGE]
|
||||
[-t TIMEFRAMES [TIMEFRAMES ...]] [--erase]
|
||||
@@ -24,6 +25,9 @@ options:
|
||||
Default: `None`.
|
||||
--include-inactive-pairs
|
||||
Also download data from inactive pairs.
|
||||
--no-parallel-download
|
||||
Disable parallel startup download. Only use this if
|
||||
you experience issues.
|
||||
--timerange TIMERANGE
|
||||
Specify what timerange of data to use.
|
||||
--dl-trades Download trades instead of OHLCV data.
|
||||
|
||||
@@ -164,6 +164,7 @@ ARGS_DOWNLOAD_DATA = [
|
||||
"days",
|
||||
"new_pairs_days",
|
||||
"include_inactive",
|
||||
"no_parallel_download",
|
||||
"timerange",
|
||||
"download_trades",
|
||||
"convert_trades",
|
||||
|
||||
@@ -454,6 +454,11 @@ AVAILABLE_CLI_OPTIONS = {
|
||||
help="Also download data from inactive pairs.",
|
||||
action="store_true",
|
||||
),
|
||||
"no_parallel_download": Arg(
|
||||
"--no-parallel-download",
|
||||
help="Disable parallel startup download. Only use this if you experience issues.",
|
||||
action="store_true",
|
||||
),
|
||||
"new_pairs_days": Arg(
|
||||
"--new-pairs-days",
|
||||
help="Download data of new pairs for given number of days. Default: `%(default)s`.",
|
||||
|
||||
@@ -391,6 +391,7 @@ class Configuration:
|
||||
("timeframes", "timeframes --timeframes: {}"),
|
||||
("days", "Detected --days: {}"),
|
||||
("include_inactive", "Detected --include-inactive-pairs: {}"),
|
||||
("no_parallel_download", "Detected --no-parallel-download: {}"),
|
||||
("download_trades", "Detected --dl-trades: {}"),
|
||||
("convert_trades", "Detected --convert: {} - Converting Trade data to OHCV {}"),
|
||||
("dataformat_ohlcv", 'Using "{}" to store OHLCV data.'),
|
||||
|
||||
@@ -6,7 +6,14 @@ from pathlib import Path
|
||||
from pandas import DataFrame, concat
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.constants import DATETIME_PRINT_FORMAT, DL_DATA_TIMEFRAMES, DOCS_LINK, Config
|
||||
from freqtrade.constants import (
|
||||
DATETIME_PRINT_FORMAT,
|
||||
DL_DATA_TIMEFRAMES,
|
||||
DOCS_LINK,
|
||||
Config,
|
||||
ListPairsWithTimeframes,
|
||||
PairWithTimeframe,
|
||||
)
|
||||
from freqtrade.data.converter import (
|
||||
clean_ohlcv_dataframe,
|
||||
convert_trades_to_ohlcv,
|
||||
@@ -17,6 +24,7 @@ from freqtrade.data.history.datahandlers import IDataHandler, get_datahandler
|
||||
from freqtrade.enums import CandleType, TradingMode
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.exchange import Exchange
|
||||
from freqtrade.exchange.exchange_utils import date_minus_candles
|
||||
from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist
|
||||
from freqtrade.util import dt_now, dt_ts, format_ms_time, format_ms_time_det
|
||||
from freqtrade.util.migrations import migrate_data
|
||||
@@ -226,6 +234,7 @@ def _download_pair_history(
|
||||
candle_type: CandleType,
|
||||
erase: bool = False,
|
||||
prepend: bool = False,
|
||||
pair_candles: DataFrame | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Download latest candles from the exchange for the pair and timeframe passed in parameters
|
||||
@@ -238,6 +247,7 @@ def _download_pair_history(
|
||||
:param timerange: range of time to download
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
:param erase: Erase existing data
|
||||
:param pair_candles: Optional with "1 call" pair candles.
|
||||
:return: bool with success state
|
||||
"""
|
||||
data_handler = get_datahandler(datadir, data_handler=data_handler)
|
||||
@@ -271,21 +281,40 @@ def _download_pair_history(
|
||||
"Current End: %s",
|
||||
f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else "None",
|
||||
)
|
||||
|
||||
# Default since_ms to 30 days if nothing is given
|
||||
new_dataframe = exchange.get_historic_ohlcv(
|
||||
pair=pair,
|
||||
timeframe=timeframe,
|
||||
since_ms=(
|
||||
since_ms
|
||||
if since_ms
|
||||
else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000
|
||||
),
|
||||
is_new_pair=data.empty,
|
||||
candle_type=candle_type,
|
||||
until_ms=until_ms if until_ms else None,
|
||||
# used to check if the passed in pair_candles (parallel downloaded) covers since_ms.
|
||||
# If we need more data, we have to fall back to the standard method.
|
||||
pair_candles_since_ms = (
|
||||
dt_ts(pair_candles.iloc[0]["date"])
|
||||
if pair_candles is not None and len(pair_candles.index) > 0
|
||||
else 0
|
||||
)
|
||||
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
|
||||
if (
|
||||
pair_candles is None
|
||||
or len(pair_candles.index) == 0
|
||||
or data.empty
|
||||
or prepend is True
|
||||
or erase is True
|
||||
or pair_candles_since_ms > (since_ms if since_ms else 0)
|
||||
):
|
||||
new_dataframe = exchange.get_historic_ohlcv(
|
||||
pair=pair,
|
||||
timeframe=timeframe,
|
||||
since_ms=(
|
||||
since_ms
|
||||
if since_ms
|
||||
else int((datetime.now() - timedelta(days=new_pairs_days)).timestamp()) * 1000
|
||||
),
|
||||
is_new_pair=data.empty,
|
||||
candle_type=candle_type,
|
||||
until_ms=until_ms if until_ms else None,
|
||||
)
|
||||
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
|
||||
else:
|
||||
new_dataframe = pair_candles
|
||||
logger.info(
|
||||
f"Downloaded data for {pair} with length {len(new_dataframe)}. Parallel Method."
|
||||
)
|
||||
|
||||
if data.empty:
|
||||
data = new_dataframe
|
||||
else:
|
||||
@@ -330,6 +359,7 @@ def refresh_backtest_ohlcv_data(
|
||||
data_format: str | None = None,
|
||||
prepend: bool = False,
|
||||
progress_tracker: CustomProgress | None = None,
|
||||
no_parallel_download: bool = False,
|
||||
) -> list[str]:
|
||||
"""
|
||||
Refresh stored ohlcv data for backtesting and hyperopt operations.
|
||||
@@ -339,6 +369,7 @@ def refresh_backtest_ohlcv_data(
|
||||
progress_tracker = retrieve_progress_tracker(progress_tracker)
|
||||
|
||||
pairs_not_available = []
|
||||
fast_candles: dict[PairWithTimeframe, DataFrame] = {}
|
||||
data_handler = get_datahandler(datadir, data_format)
|
||||
candle_type = CandleType.get_default(trading_mode)
|
||||
with progress_tracker as progress:
|
||||
@@ -355,6 +386,30 @@ def refresh_backtest_ohlcv_data(
|
||||
logger.info(f"Skipping pair {pair}...")
|
||||
continue
|
||||
for timeframe in timeframes:
|
||||
# Get fast candles via parallel method on first loop through per timeframe
|
||||
# and candle type. Downloads all the pairs in the list and stores them.
|
||||
if (
|
||||
not no_parallel_download
|
||||
and exchange.get_option("download_data_parallel_quick", True)
|
||||
and (
|
||||
((pair, timeframe, candle_type) not in fast_candles)
|
||||
and (erase is False)
|
||||
and (prepend is False)
|
||||
)
|
||||
):
|
||||
fast_candles.update(
|
||||
_download_all_pairs_history_parallel(
|
||||
exchange=exchange,
|
||||
pairs=pairs,
|
||||
timeframe=timeframe,
|
||||
candle_type=candle_type,
|
||||
timerange=timerange,
|
||||
)
|
||||
)
|
||||
|
||||
# get the already downloaded pair candles if they exist
|
||||
pair_candles = fast_candles.pop((pair, timeframe, candle_type), None)
|
||||
|
||||
progress.update(timeframe_task, description=f"Timeframe {timeframe}")
|
||||
logger.debug(f"Downloading pair {pair}, {candle_type}, interval {timeframe}.")
|
||||
_download_pair_history(
|
||||
@@ -368,6 +423,7 @@ def refresh_backtest_ohlcv_data(
|
||||
candle_type=candle_type,
|
||||
erase=erase,
|
||||
prepend=prepend,
|
||||
pair_candles=pair_candles, # optional pass of dataframe of parallel candles
|
||||
)
|
||||
progress.update(timeframe_task, advance=1)
|
||||
if trading_mode == "futures":
|
||||
@@ -404,6 +460,41 @@ def refresh_backtest_ohlcv_data(
|
||||
return pairs_not_available
|
||||
|
||||
|
||||
def _download_all_pairs_history_parallel(
|
||||
exchange: Exchange,
|
||||
pairs: list[str],
|
||||
timeframe: str,
|
||||
candle_type: CandleType,
|
||||
timerange: TimeRange | None = None,
|
||||
) -> dict[PairWithTimeframe, DataFrame]:
|
||||
"""
|
||||
Allows to use the faster parallel async download method for many coins
|
||||
but only if the data is short enough to be retrieved in one call.
|
||||
Used by freqtrade download-data subcommand.
|
||||
:return: Candle pairs with timeframes
|
||||
"""
|
||||
candles: dict[PairWithTimeframe, DataFrame] = {}
|
||||
since = 0
|
||||
if timerange:
|
||||
if timerange.starttype == "date":
|
||||
since = timerange.startts * 1000
|
||||
|
||||
candle_limit = exchange.ohlcv_candle_limit(timeframe, candle_type)
|
||||
one_call_min_time_dt = dt_ts(date_minus_candles(timeframe, candle_limit))
|
||||
# check if we can get all candles in one go, if so then we can download them in parallel
|
||||
if since > one_call_min_time_dt:
|
||||
logger.info(
|
||||
f"Downloading parallel candles for {timeframe} for all pairs "
|
||||
f"since {format_ms_time(since)}"
|
||||
)
|
||||
needed_pairs: ListPairsWithTimeframes = [
|
||||
(p, timeframe, candle_type) for p in [p for p in pairs]
|
||||
]
|
||||
candles = exchange.refresh_latest_ohlcv(needed_pairs, since_ms=since, cache=False)
|
||||
|
||||
return candles
|
||||
|
||||
|
||||
def _download_trades_history(
|
||||
exchange: Exchange,
|
||||
pair: str,
|
||||
@@ -702,6 +793,7 @@ def download_data(
|
||||
trading_mode=config.get("trading_mode", "spot"),
|
||||
prepend=config.get("prepend_data", False),
|
||||
progress_tracker=progress_tracker,
|
||||
no_parallel_download=config.get("no_parallel_download", False),
|
||||
)
|
||||
finally:
|
||||
if pairs_not_available:
|
||||
|
||||
@@ -138,6 +138,7 @@ class Exchange:
|
||||
"ohlcv_has_history": True, # Some exchanges (Kraken) don't provide history via ohlcv
|
||||
"ohlcv_partial_candle": True,
|
||||
"ohlcv_require_since": False,
|
||||
"download_data_parallel_quick": True,
|
||||
"always_require_api_keys": False, # purge API keys for Dry-run. Must default to false.
|
||||
# Check https://github.com/ccxt/ccxt/issues/10767 for removal of ohlcv_volume_currency
|
||||
"ohlcv_volume_currency": "base", # "base" or "quote"
|
||||
|
||||
@@ -28,6 +28,8 @@ class FtHas(TypedDict, total=False):
|
||||
ohlcv_volume_currency: str
|
||||
ohlcv_candle_limit_per_timeframe: dict[str, int]
|
||||
always_require_api_keys: bool
|
||||
# allow disabling of parallel download-data for specific exchanges
|
||||
download_data_parallel_quick: bool
|
||||
# Tickers
|
||||
tickers_have_quoteVolume: bool
|
||||
tickers_have_percentage: bool
|
||||
|
||||
@@ -28,6 +28,7 @@ class Hyperliquid(Exchange):
|
||||
"stoploss_on_exchange": False,
|
||||
"exchange_has_overrides": {"fetchTrades": False},
|
||||
"marketOrderRequiresPrice": True,
|
||||
"download_data_parallel_quick": False,
|
||||
"ws_enabled": True,
|
||||
}
|
||||
_ft_has_futures: FtHas = {
|
||||
|
||||
@@ -18,6 +18,7 @@ from freqtrade.data.converter import ohlcv_to_dataframe
|
||||
from freqtrade.data.history import get_datahandler
|
||||
from freqtrade.data.history.datahandlers.jsondatahandler import JsonDataHandler, JsonGzDataHandler
|
||||
from freqtrade.data.history.history_utils import (
|
||||
_download_all_pairs_history_parallel,
|
||||
_download_pair_history,
|
||||
_download_trades_history,
|
||||
_load_cached_data_for_updating,
|
||||
@@ -545,6 +546,14 @@ def test_refresh_backtest_ohlcv_data(
|
||||
):
|
||||
caplog.set_level(logging.DEBUG)
|
||||
dl_mock = mocker.patch("freqtrade.data.history.history_utils._download_pair_history")
|
||||
|
||||
def parallel_mock(pairs, timeframe, candle_type, **kwargs):
|
||||
return {(pair, timeframe, candle_type): DataFrame() for pair in pairs}
|
||||
|
||||
parallel_mock = mocker.patch(
|
||||
"freqtrade.data.history.history_utils._download_all_pairs_history_parallel",
|
||||
side_effect=parallel_mock,
|
||||
)
|
||||
mocker.patch(f"{EXMS}.markets", PropertyMock(return_value=markets))
|
||||
|
||||
mocker.patch.object(Path, "exists", MagicMock(return_value=True))
|
||||
@@ -559,10 +568,12 @@ def test_refresh_backtest_ohlcv_data(
|
||||
timeframes=["1m", "5m"],
|
||||
datadir=testdatadir,
|
||||
timerange=timerange,
|
||||
erase=True,
|
||||
erase=False,
|
||||
trading_mode=trademode,
|
||||
)
|
||||
|
||||
# Called once per timeframe (as we return an empty dataframe)
|
||||
assert parallel_mock.call_count == 2
|
||||
assert dl_mock.call_count == callcount
|
||||
assert dl_mock.call_args[1]["timerange"].starttype == "date"
|
||||
|
||||
@@ -699,3 +710,256 @@ def test_download_trades_history(
|
||||
assert ght_mock.call_count == 0
|
||||
|
||||
_clean_test_file(file2)
|
||||
|
||||
|
||||
def test_download_all_pairs_history_parallel(mocker, default_conf_usdt):
|
||||
pairs = ["PAIR1/BTC", "PAIR2/USDT"]
|
||||
timeframe = "5m"
|
||||
candle_type = CandleType.SPOT
|
||||
|
||||
df1 = DataFrame(
|
||||
{
|
||||
"date": [1, 2],
|
||||
"open": [1, 2],
|
||||
"close": [1, 2],
|
||||
"high": [1, 2],
|
||||
"low": [1, 2],
|
||||
"volume": [1, 2],
|
||||
}
|
||||
)
|
||||
df2 = DataFrame(
|
||||
{
|
||||
"date": [3, 4],
|
||||
"open": [3, 4],
|
||||
"close": [3, 4],
|
||||
"high": [3, 4],
|
||||
"low": [3, 4],
|
||||
"volume": [3, 4],
|
||||
}
|
||||
)
|
||||
expected = {
|
||||
("PAIR1/BTC", timeframe, candle_type): df1,
|
||||
("PAIR2/USDT", timeframe, candle_type): df2,
|
||||
}
|
||||
# Mock exchange
|
||||
mocker.patch.multiple(
|
||||
EXMS,
|
||||
exchange_has=MagicMock(return_value=True),
|
||||
ohlcv_candle_limit=MagicMock(return_value=1000),
|
||||
refresh_latest_ohlcv=MagicMock(return_value=expected),
|
||||
)
|
||||
exchange = get_patched_exchange(mocker, default_conf_usdt)
|
||||
# timerange with starttype 'date' and startts far in the future to trigger parallel download
|
||||
|
||||
timerange = TimeRange("date", None, 9999999999, 0)
|
||||
result = _download_all_pairs_history_parallel(
|
||||
exchange=exchange,
|
||||
pairs=pairs,
|
||||
timeframe=timeframe,
|
||||
candle_type=candle_type,
|
||||
timerange=timerange,
|
||||
)
|
||||
assert result == expected
|
||||
|
||||
assert exchange.ohlcv_candle_limit.call_args[0] == (timeframe, candle_type)
|
||||
assert exchange.refresh_latest_ohlcv.call_count == 1
|
||||
|
||||
# If since is not after one_call_min_time_dt, should not call refresh_latest_ohlcv
|
||||
exchange.refresh_latest_ohlcv.reset_mock()
|
||||
timerange2 = TimeRange("date", None, 0, 0)
|
||||
result2 = _download_all_pairs_history_parallel(
|
||||
exchange=exchange,
|
||||
pairs=pairs,
|
||||
timeframe=timeframe,
|
||||
candle_type=candle_type,
|
||||
timerange=timerange2,
|
||||
)
|
||||
assert result2 == {}
|
||||
assert exchange.refresh_latest_ohlcv.call_count == 0
|
||||
|
||||
exchange.refresh_latest_ohlcv.reset_mock()
|
||||
|
||||
# Test without timerange
|
||||
result3 = _download_all_pairs_history_parallel(
|
||||
exchange=exchange,
|
||||
pairs=pairs,
|
||||
timeframe=timeframe,
|
||||
candle_type=candle_type,
|
||||
timerange=None,
|
||||
)
|
||||
assert result3 == {}
|
||||
assert exchange.refresh_latest_ohlcv.call_count == 0
|
||||
|
||||
|
||||
def test_download_pair_history_with_pair_candles(mocker, default_conf, tmp_path, caplog) -> None:
|
||||
"""
|
||||
Test _download_pair_history with pair_candles parameter (parallel method).
|
||||
"""
|
||||
exchange = get_patched_exchange(mocker, default_conf)
|
||||
|
||||
# Create test data for existing cached data
|
||||
existing_data = DataFrame(
|
||||
{
|
||||
"date": [dt_utc(2018, 1, 10, 10, 0), dt_utc(2018, 1, 10, 10, 5)],
|
||||
"open": [1.0, 1.15],
|
||||
"high": [1.1, 1.2],
|
||||
"low": [0.9, 1.1],
|
||||
"close": [1.05, 1.15],
|
||||
"volume": [100, 150],
|
||||
}
|
||||
)
|
||||
|
||||
# Create pair_candles data that will be used instead of exchange download
|
||||
# This data should start before or at the same time as since_ms to trigger the else branch
|
||||
pair_candles_data = DataFrame(
|
||||
{
|
||||
"date": [
|
||||
dt_utc(2018, 1, 10, 10, 5),
|
||||
dt_utc(2018, 1, 10, 10, 10),
|
||||
dt_utc(2018, 1, 10, 10, 15),
|
||||
],
|
||||
"open": [1.15, 1.2, 1.25],
|
||||
"high": [1.25, 1.3, 1.35],
|
||||
"low": [1.1, 1.15, 1.2],
|
||||
"close": [1.2, 1.25, 1.3],
|
||||
"volume": [200, 250, 300],
|
||||
}
|
||||
)
|
||||
|
||||
# Mock the data handler to return existing cached data
|
||||
data_handler_mock = MagicMock()
|
||||
data_handler_mock.ohlcv_load.return_value = existing_data
|
||||
data_handler_mock.ohlcv_store = MagicMock()
|
||||
mocker.patch(
|
||||
"freqtrade.data.history.history_utils.get_datahandler", return_value=data_handler_mock
|
||||
)
|
||||
|
||||
# Mock _load_cached_data_for_updating to return existing data and since_ms
|
||||
since_ms = dt_ts(dt_utc(2018, 1, 10, 10, 5)) # Time of last existing candle
|
||||
mocker.patch(
|
||||
"freqtrade.data.history.history_utils._load_cached_data_for_updating",
|
||||
return_value=(existing_data, since_ms, None),
|
||||
)
|
||||
|
||||
# Mock clean_ohlcv_dataframe to return concatenated data
|
||||
expected_result = DataFrame(
|
||||
{
|
||||
"date": [
|
||||
dt_utc(2018, 1, 10, 10, 0),
|
||||
dt_utc(2018, 1, 10, 10, 5),
|
||||
dt_utc(2018, 1, 10, 10, 10),
|
||||
dt_utc(2018, 1, 10, 10, 15),
|
||||
],
|
||||
"open": [1.0, 1.15, 1.2, 1.25],
|
||||
"high": [1.1, 1.25, 1.3, 1.35],
|
||||
"low": [0.9, 1.1, 1.15, 1.2],
|
||||
"close": [1.05, 1.2, 1.25, 1.3],
|
||||
"volume": [100, 200, 250, 300],
|
||||
}
|
||||
)
|
||||
|
||||
get_historic_ohlcv_mock = MagicMock()
|
||||
mocker.patch.object(exchange, "get_historic_ohlcv", get_historic_ohlcv_mock)
|
||||
|
||||
# Call _download_pair_history with pre-loaded pair_candles
|
||||
result = _download_pair_history(
|
||||
datadir=tmp_path,
|
||||
exchange=exchange,
|
||||
pair="TEST/BTC",
|
||||
timeframe="5m",
|
||||
candle_type=CandleType.SPOT,
|
||||
pair_candles=pair_candles_data,
|
||||
)
|
||||
|
||||
# Verify the function succeeded
|
||||
assert result is True
|
||||
|
||||
# Verify that exchange.get_historic_ohlcv was NOT called (parallel method was used)
|
||||
assert get_historic_ohlcv_mock.call_count == 0
|
||||
|
||||
# Verify the log message indicating parallel method was used (line 315-316)
|
||||
assert log_has("Downloaded data for TEST/BTC with length 3. Parallel Method.", caplog)
|
||||
|
||||
# Verify data was stored
|
||||
assert data_handler_mock.ohlcv_store.call_count == 1
|
||||
stored_data = data_handler_mock.ohlcv_store.call_args_list[0][1]["data"]
|
||||
assert stored_data.equals(expected_result)
|
||||
assert len(stored_data) == 4
|
||||
|
||||
|
||||
def test_download_pair_history_with_pair_candles_no_overlap(
|
||||
mocker, default_conf, tmp_path, caplog
|
||||
) -> None:
|
||||
exchange = get_patched_exchange(mocker, default_conf)
|
||||
|
||||
# Create test data for existing cached data
|
||||
existing_data = DataFrame(
|
||||
{
|
||||
"date": [dt_utc(2018, 1, 10, 10, 0), dt_utc(2018, 1, 10, 10, 5)],
|
||||
"open": [1.0, 1.1],
|
||||
"high": [1.1, 1.2],
|
||||
"low": [0.9, 1.0],
|
||||
"close": [1.05, 1.15],
|
||||
"volume": [100, 150],
|
||||
}
|
||||
)
|
||||
|
||||
# Create pair_candles data that will be used instead of exchange download
|
||||
# This data should start before or at the same time as since_ms to trigger the else branch
|
||||
pair_candles_data = DataFrame(
|
||||
{
|
||||
"date": [
|
||||
dt_utc(2018, 1, 10, 10, 10),
|
||||
dt_utc(2018, 1, 10, 10, 15),
|
||||
dt_utc(2018, 1, 10, 10, 20),
|
||||
],
|
||||
"open": [1.15, 1.2, 1.25],
|
||||
"high": [1.25, 1.3, 1.35],
|
||||
"low": [1.1, 1.15, 1.2],
|
||||
"close": [1.2, 1.25, 1.3],
|
||||
"volume": [200, 250, 300],
|
||||
}
|
||||
)
|
||||
|
||||
# Mock the data handler to return existing cached data
|
||||
data_handler_mock = MagicMock()
|
||||
data_handler_mock.ohlcv_load.return_value = existing_data
|
||||
data_handler_mock.ohlcv_store = MagicMock()
|
||||
mocker.patch(
|
||||
"freqtrade.data.history.history_utils.get_datahandler", return_value=data_handler_mock
|
||||
)
|
||||
|
||||
# Mock _load_cached_data_for_updating to return existing data and since_ms
|
||||
since_ms = dt_ts(dt_utc(2018, 1, 10, 10, 5)) # Time of last existing candle
|
||||
mocker.patch(
|
||||
"freqtrade.data.history.history_utils._load_cached_data_for_updating",
|
||||
return_value=(existing_data, since_ms, None),
|
||||
)
|
||||
|
||||
get_historic_ohlcv_mock = MagicMock(return_value=DataFrame())
|
||||
mocker.patch.object(exchange, "get_historic_ohlcv", get_historic_ohlcv_mock)
|
||||
|
||||
# Call _download_pair_history with pre-loaded pair_candles
|
||||
result = _download_pair_history(
|
||||
datadir=tmp_path,
|
||||
exchange=exchange,
|
||||
pair="TEST/BTC",
|
||||
timeframe="5m",
|
||||
candle_type=CandleType.SPOT,
|
||||
pair_candles=pair_candles_data,
|
||||
)
|
||||
|
||||
# Verify the function succeeded
|
||||
assert result is True
|
||||
|
||||
# Verify that exchange.get_historic_ohlcv was NOT called (parallel method was used)
|
||||
assert get_historic_ohlcv_mock.call_count == 1
|
||||
|
||||
# Verify the log message indicating parallel method was used (line 315-316)
|
||||
assert not log_has_re(r"Downloaded .* Parallel Method.", caplog)
|
||||
|
||||
# Verify data was stored
|
||||
assert data_handler_mock.ohlcv_store.call_count == 1
|
||||
stored_data = data_handler_mock.ohlcv_store.call_args_list[0][1]["data"]
|
||||
assert stored_data.equals(existing_data)
|
||||
assert len(stored_data) == 2
|
||||
|
||||
Reference in New Issue
Block a user