refactor: move load-analysis-data logic to btanalysis file

This commit is contained in:
Matthias
2024-12-23 15:22:57 +01:00
parent 258b85a860
commit 9f2142135e
2 changed files with 78 additions and 67 deletions

View File

@@ -405,6 +405,78 @@ def load_backtest_data(filename: Path | str, strategy: str | None = None) -> pd.
return df return df
def load_backtest_analysis_data(backtest_dir: Path, name: str):
"""
Load backtest analysis data either from a pickle file or from within a zip file
:param backtest_dir: Directory containing backtest results
:param name: Name of the analysis data to load (signals, rejected, exited)
:return: Analysis data
"""
import joblib
if backtest_dir.is_dir():
lbf = Path(get_latest_backtest_filename(backtest_dir))
zip_path = backtest_dir / lbf
else:
zip_path = backtest_dir
if zip_path.suffix == ".zip":
# Load from zip file
try:
with zipfile.ZipFile(zip_path) as zipf:
# Files in zip are stored with just their base names
analysis_name = f"{zip_path.stem}_{name}.pkl"
try:
with zipf.open(analysis_name) as analysis_file:
loaded_data = joblib.load(analysis_file)
logger.info(
f"Loaded {name} candles from zip: {str(zip_path)}:{analysis_name}"
)
return loaded_data
except KeyError:
logger.exception(f"Cannot find {analysis_name} in {zip_path}")
return None
except zipfile.BadZipFile:
logger.exception(f"Bad zip file: {zip_path}")
return None
else:
# Load from separate pickle file
if backtest_dir.is_dir():
scpf = Path(backtest_dir, f"{zip_path.stem}_{name}.pkl")
else:
scpf = Path(backtest_dir.parent / f"{backtest_dir.stem}_{name}.pkl")
try:
with scpf.open("rb") as scp:
loaded_data = joblib.load(scp)
logger.info(f"Loaded {name} candles: {str(scpf)}")
return loaded_data
except Exception:
logger.exception(f"Cannot load {name} data from pickled results.")
return None
def load_rejected_signals(backtest_dir: Path):
"""
Load rejected signals from backtest directory
"""
return load_backtest_analysis_data(backtest_dir, "rejected")
def load_signal_candles(backtest_dir: Path):
"""
Load signal candles from backtest directory
"""
return load_backtest_analysis_data(backtest_dir, "signals")
def load_exit_signal_candles(backtest_dir: Path) -> dict[str, dict[str, pd.DataFrame]]:
"""
Load exit signal candles from backtest directory
"""
return load_backtest_analysis_data(backtest_dir, "exited")
def analyze_trade_parallelism(results: pd.DataFrame, timeframe: str) -> pd.DataFrame: def analyze_trade_parallelism(results: pd.DataFrame, timeframe: str) -> pd.DataFrame:
""" """
Find overlapping trades by expanding each trade once per period it was open Find overlapping trades by expanding each trade once per period it was open

View File

@@ -1,17 +1,17 @@
import logging import logging
import zipfile
from pathlib import Path from pathlib import Path
import joblib
import pandas as pd import pandas as pd
from freqtrade.configuration import TimeRange from freqtrade.configuration import TimeRange
from freqtrade.constants import Config from freqtrade.constants import Config
from freqtrade.data.btanalysis import ( from freqtrade.data.btanalysis import (
BT_DATA_COLUMNS, BT_DATA_COLUMNS,
get_latest_backtest_filename,
load_backtest_data, load_backtest_data,
load_backtest_stats, load_backtest_stats,
load_exit_signal_candles,
load_rejected_signals,
load_signal_candles,
) )
from freqtrade.exceptions import OperationalException from freqtrade.exceptions import OperationalException
from freqtrade.util import print_df_rich_table from freqtrade.util import print_df_rich_table
@@ -20,67 +20,6 @@ from freqtrade.util import print_df_rich_table
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _load_backtest_analysis_data(backtest_dir: Path, name: str):
"""
Load backtest analysis data either from a pickle file or from within a zip file
:param backtest_dir: Directory containing backtest results
:param name: Name of the analysis data to load (signals, rejected, exited)
:return: Analysis data
"""
if backtest_dir.is_dir():
lbf = Path(get_latest_backtest_filename(backtest_dir))
zip_path = backtest_dir / lbf
else:
zip_path = backtest_dir
if zip_path.suffix == ".zip":
# Load from zip file
try:
with zipfile.ZipFile(zip_path) as zipf:
# Files in zip are stored with just their base names
analysis_name = f"{zip_path.stem}_{name}.pkl"
try:
with zipf.open(analysis_name) as analysis_file:
loaded_data = joblib.load(analysis_file)
logger.info(
f"Loaded {name} candles from zip: {str(zip_path)}:{analysis_name}"
)
return loaded_data
except KeyError:
logger.exception(f"Cannot find {analysis_name} in {zip_path}")
return None
except zipfile.BadZipFile:
logger.exception(f"Bad zip file: {zip_path}")
return None
else:
# Load from separate pickle file
if backtest_dir.is_dir():
scpf = Path(backtest_dir, f"{zip_path.stem}_{name}.pkl")
else:
scpf = Path(backtest_dir.parent / f"{backtest_dir.stem}_{name}.pkl")
try:
with scpf.open("rb") as scp:
loaded_data = joblib.load(scp)
logger.info(f"Loaded {name} candles: {str(scpf)}")
return loaded_data
except Exception:
logger.exception(f"Cannot load {name} data from pickled results.")
return None
def _load_rejected_signals(backtest_dir: Path):
return _load_backtest_analysis_data(backtest_dir, "rejected")
def _load_signal_candles(backtest_dir: Path):
return _load_backtest_analysis_data(backtest_dir, "signals")
def _load_exit_signal_candles(backtest_dir: Path) -> dict[str, dict[str, pd.DataFrame]]:
return _load_backtest_analysis_data(backtest_dir, "exited")
def _process_candles_and_indicators( def _process_candles_and_indicators(
pairlist, strategy_name, trades, signal_candles, date_col: str = "open_date" pairlist, strategy_name, trades, signal_candles, date_col: str = "open_date"
): ):
@@ -411,12 +350,12 @@ def process_entry_exit_reasons(config: Config):
trades = load_backtest_data(config["exportfilename"], strategy_name) trades = load_backtest_data(config["exportfilename"], strategy_name)
if trades is not None and not trades.empty: if trades is not None and not trades.empty:
signal_candles = _load_signal_candles(config["exportfilename"]) signal_candles = load_signal_candles(config["exportfilename"])
exit_signals = _load_exit_signal_candles(config["exportfilename"]) exit_signals = load_exit_signal_candles(config["exportfilename"])
rej_df = None rej_df = None
if do_rejected: if do_rejected:
rejected_signals_dict = _load_rejected_signals(config["exportfilename"]) rejected_signals_dict = load_rejected_signals(config["exportfilename"])
rej_df = prepare_results( rej_df = prepare_results(
rejected_signals_dict, rejected_signals_dict,
strategy_name, strategy_name,