mirror of
https://github.com/freqtrade/freqtrade.git
synced 2026-02-05 22:00:25 +00:00
adjusted code to matthias' specifications
did not change the code so that it only loads data once yet.
This commit is contained in:
@@ -999,9 +999,9 @@ Common arguments:
|
||||
Path to userdata directory.
|
||||
|
||||
```
|
||||
### Backtest lookahead bias checker
|
||||
### Lookahead - analysis
|
||||
#### Summary
|
||||
Checks a given strategy for look ahead bias
|
||||
Checks a given strategy for look ahead bias via backtest-analysis
|
||||
Look ahead bias means that the backtest uses data from future candles thereby not making it viable beyond backtesting
|
||||
and producing false hopes for the one backtesting.
|
||||
|
||||
|
||||
@@ -19,10 +19,10 @@ from freqtrade.commands.list_commands import (start_list_exchanges, start_list_f
|
||||
start_list_markets, start_list_strategies,
|
||||
start_list_timeframes, start_show_trades)
|
||||
from freqtrade.commands.optimize_commands import (start_backtesting, start_backtesting_show,
|
||||
start_edge, start_hyperopt)
|
||||
start_edge, start_hyperopt,
|
||||
start_lookahead_analysis)
|
||||
from freqtrade.commands.pairlist_commands import start_test_pairlist
|
||||
from freqtrade.commands.plot_commands import start_plot_dataframe, start_plot_profit
|
||||
from freqtrade.commands.strategy_utils_commands import (start_backtest_lookahead_bias_checker,
|
||||
start_strategy_update)
|
||||
from freqtrade.commands.strategy_utils_commands import start_strategy_update
|
||||
from freqtrade.commands.trade_commands import start_trading
|
||||
from freqtrade.commands.webserver_commands import start_webserver
|
||||
|
||||
@@ -118,9 +118,9 @@ NO_CONF_ALLOWED = ["create-userdir", "list-exchanges", "new-strategy"]
|
||||
|
||||
ARGS_STRATEGY_UPDATER = ["strategy_list", "strategy_path", "recursive_strategy_search"]
|
||||
|
||||
ARGS_BACKTEST_LOOKAHEAD_BIAS_CHECKER = ARGS_BACKTEST + ["minimum_trade_amount",
|
||||
"targeted_trade_amount",
|
||||
"overwrite_existing_exportfilename_content"]
|
||||
ARGS_LOOKAHEAD_ANALYSIS = ARGS_BACKTEST + ["minimum_trade_amount",
|
||||
"targeted_trade_amount",
|
||||
"overwrite_existing_exportfilename_content"]
|
||||
|
||||
|
||||
# + ["target_trades", "minimum_trades",
|
||||
@@ -200,8 +200,7 @@ class Arguments:
|
||||
self.parser = argparse.ArgumentParser(description='Free, open source crypto trading bot')
|
||||
self._build_args(optionlist=['version'], parser=self.parser)
|
||||
|
||||
from freqtrade.commands import (start_analysis_entries_exits,
|
||||
start_backtest_lookahead_bias_checker, start_backtesting,
|
||||
from freqtrade.commands import (start_analysis_entries_exits, start_backtesting,
|
||||
start_backtesting_show, start_convert_data,
|
||||
start_convert_db, start_convert_trades,
|
||||
start_create_userdir, start_download_data, start_edge,
|
||||
@@ -209,8 +208,9 @@ class Arguments:
|
||||
start_install_ui, start_list_data, start_list_exchanges,
|
||||
start_list_freqAI_models, start_list_markets,
|
||||
start_list_strategies, start_list_timeframes,
|
||||
start_new_config, start_new_strategy, start_plot_dataframe,
|
||||
start_plot_profit, start_show_trades, start_strategy_update,
|
||||
start_lookahead_analysis, start_new_config,
|
||||
start_new_strategy, start_plot_dataframe, start_plot_profit,
|
||||
start_show_trades, start_strategy_update,
|
||||
start_test_pairlist, start_trading, start_webserver)
|
||||
|
||||
subparsers = self.parser.add_subparsers(dest='command',
|
||||
@@ -462,12 +462,12 @@ class Arguments:
|
||||
self._build_args(optionlist=ARGS_STRATEGY_UPDATER,
|
||||
parser=strategy_updater_cmd)
|
||||
|
||||
# Add backtest lookahead bias checker subcommand
|
||||
backtest_lookahead_bias_checker_cmd = \
|
||||
subparsers.add_parser('backtest-lookahead-bias-checker',
|
||||
# Add lookahead_analysis subcommand
|
||||
lookahead_analayis_cmd = \
|
||||
subparsers.add_parser('lookahead-analysis',
|
||||
help="checks for potential look ahead bias",
|
||||
parents=[_common_parser, _strategy_parser])
|
||||
backtest_lookahead_bias_checker_cmd.set_defaults(func=start_backtest_lookahead_bias_checker)
|
||||
lookahead_analayis_cmd.set_defaults(func=start_lookahead_analysis)
|
||||
|
||||
self._build_args(optionlist=ARGS_BACKTEST_LOOKAHEAD_BIAS_CHECKER,
|
||||
parser=backtest_lookahead_bias_checker_cmd)
|
||||
self._build_args(optionlist=ARGS_LOOKAHEAD_ANALYSIS,
|
||||
parser=lookahead_analayis_cmd)
|
||||
|
||||
@@ -6,6 +6,8 @@ from freqtrade.configuration import setup_utils_configuration
|
||||
from freqtrade.enums import RunMode
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.misc import round_coin_value
|
||||
from freqtrade.optimize.lookahead_analysis import LookaheadAnalysisSubFunctions
|
||||
from freqtrade.resolvers import StrategyResolver
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -132,3 +134,51 @@ def start_edge(args: Dict[str, Any]) -> None:
|
||||
# Initialize Edge object
|
||||
edge_cli = EdgeCli(config)
|
||||
edge_cli.start()
|
||||
|
||||
|
||||
def start_lookahead_analysis(args: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Start the backtest bias tester script
|
||||
:param args: Cli args from Arguments()
|
||||
:return: None
|
||||
"""
|
||||
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
|
||||
|
||||
if args['targeted_trade_amount'] < args['minimum_trade_amount']:
|
||||
# add logic that tells the user to check the configuration
|
||||
# since this combo doesn't make any sense.
|
||||
pass
|
||||
|
||||
strategy_objs = StrategyResolver.search_all_objects(
|
||||
config, enum_failed=False, recursive=config.get('recursive_strategy_search', False))
|
||||
|
||||
lookaheadAnalysis_instances = []
|
||||
strategy_list = []
|
||||
|
||||
# unify --strategy and --strategy_list to one list
|
||||
if 'strategy' in args and args['strategy'] is not None:
|
||||
strategy_list = [args['strategy']]
|
||||
else:
|
||||
strategy_list = args['strategy_list']
|
||||
|
||||
# check if strategies can be properly loaded, only check them if they can be.
|
||||
if strategy_list is not None:
|
||||
for strat in strategy_list:
|
||||
for strategy_obj in strategy_objs:
|
||||
if strategy_obj['name'] == strat and strategy_obj not in strategy_list:
|
||||
lookaheadAnalysis_instances.append(
|
||||
LookaheadAnalysisSubFunctions.initialize_single_lookahead_analysis(
|
||||
strategy_obj, config, args))
|
||||
break
|
||||
|
||||
# report the results
|
||||
if lookaheadAnalysis_instances:
|
||||
LookaheadAnalysisSubFunctions.text_table_lookahead_analysis_instances(
|
||||
lookaheadAnalysis_instances)
|
||||
if args['exportfilename'] is not None:
|
||||
LookaheadAnalysisSubFunctions.export_to_csv(args, lookaheadAnalysis_instances)
|
||||
else:
|
||||
logger.error("There were no strategies specified neither through "
|
||||
"--strategy nor through "
|
||||
"--strategy_list "
|
||||
"or timeframe was not specified.")
|
||||
|
||||
@@ -4,13 +4,9 @@ import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
import pandas as pd
|
||||
from tabulate import tabulate
|
||||
|
||||
from freqtrade.configuration import setup_utils_configuration
|
||||
from freqtrade.enums import RunMode
|
||||
from freqtrade.resolvers import StrategyResolver
|
||||
from freqtrade.strategy.backtest_lookahead_bias_checker import BacktestLookaheadBiasChecker
|
||||
from freqtrade.strategy.strategyupdater import StrategyUpdater
|
||||
|
||||
|
||||
@@ -57,135 +53,3 @@ def start_conversion(strategy_obj, config):
|
||||
instance_strategy_updater.start(config, strategy_obj)
|
||||
elapsed = time.perf_counter() - start
|
||||
print(f"Conversion of {Path(strategy_obj['location']).name} took {elapsed:.1f} seconds.")
|
||||
|
||||
# except:
|
||||
# pass
|
||||
|
||||
|
||||
def start_backtest_lookahead_bias_checker(args: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Start the backtest bias tester script
|
||||
:param args: Cli args from Arguments()
|
||||
:return: None
|
||||
"""
|
||||
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
|
||||
|
||||
if args['targeted_trade_amount'] < args['minimum_trade_amount']:
|
||||
# add logic that tells the user to check the configuration
|
||||
# since this combo doesn't make any sense.
|
||||
pass
|
||||
|
||||
strategy_objs = StrategyResolver.search_all_objects(
|
||||
config, enum_failed=False, recursive=config.get('recursive_strategy_search', False))
|
||||
|
||||
bias_checker_instances = []
|
||||
filtered_strategy_objs = []
|
||||
if 'strategy_list' in args and args['strategy_list'] is not None:
|
||||
for args_strategy in args['strategy_list']:
|
||||
for strategy_obj in strategy_objs:
|
||||
if (strategy_obj['name'] == args_strategy
|
||||
and strategy_obj not in filtered_strategy_objs):
|
||||
filtered_strategy_objs.append(strategy_obj)
|
||||
break
|
||||
|
||||
for filtered_strategy_obj in filtered_strategy_objs:
|
||||
bias_checker_instances.append(
|
||||
initialize_single_lookahead_bias_checker(filtered_strategy_obj, config, args))
|
||||
elif 'strategy' in args and args['strategy'] is not None:
|
||||
for strategy_obj in strategy_objs:
|
||||
if strategy_obj['name'] == args['strategy']:
|
||||
bias_checker_instances.append(
|
||||
initialize_single_lookahead_bias_checker(strategy_obj, config, args))
|
||||
break
|
||||
else:
|
||||
processed_locations = set()
|
||||
for strategy_obj in strategy_objs:
|
||||
if strategy_obj['location'] not in processed_locations:
|
||||
processed_locations.add(strategy_obj['location'])
|
||||
bias_checker_instances.append(
|
||||
initialize_single_lookahead_bias_checker(strategy_obj, config, args))
|
||||
text_table_bias_checker_instances(bias_checker_instances)
|
||||
export_to_csv(args, bias_checker_instances)
|
||||
|
||||
|
||||
def text_table_bias_checker_instances(bias_checker_instances):
|
||||
headers = ['filename', 'strategy', 'has_bias',
|
||||
'total_signals', 'biased_entry_signals', 'biased_exit_signals', 'biased_indicators']
|
||||
data = []
|
||||
for current_instance in bias_checker_instances:
|
||||
if current_instance.failed_bias_check:
|
||||
data.append(
|
||||
[
|
||||
current_instance.strategy_obj['location'].parts[-1],
|
||||
current_instance.strategy_obj['name'],
|
||||
'error while checking'
|
||||
]
|
||||
)
|
||||
else:
|
||||
data.append(
|
||||
[
|
||||
current_instance.strategy_obj['location'].parts[-1],
|
||||
current_instance.strategy_obj['name'],
|
||||
current_instance.current_analysis.has_bias,
|
||||
current_instance.current_analysis.total_signals,
|
||||
current_instance.current_analysis.false_entry_signals,
|
||||
current_instance.current_analysis.false_exit_signals,
|
||||
", ".join(current_instance.current_analysis.false_indicators)
|
||||
]
|
||||
)
|
||||
table = tabulate(data, headers=headers, tablefmt="orgtbl")
|
||||
print(table)
|
||||
|
||||
|
||||
def export_to_csv(args, bias_checker_instances):
|
||||
def add_or_update_row(df, row_data):
|
||||
if (
|
||||
(df['filename'] == row_data['filename']) &
|
||||
(df['strategy'] == row_data['strategy'])
|
||||
).any():
|
||||
# Update existing row
|
||||
pd_series = pd.DataFrame([row_data])
|
||||
df.loc[
|
||||
(df['filename'] == row_data['filename']) &
|
||||
(df['strategy'] == row_data['strategy'])
|
||||
] = pd_series
|
||||
else:
|
||||
# Add new row
|
||||
df = pd.concat([df, pd.DataFrame([row_data], columns=df.columns)])
|
||||
|
||||
return df
|
||||
|
||||
if Path(args['exportfilename']).exists():
|
||||
# Read CSV file into a pandas dataframe
|
||||
csv_df = pd.read_csv(args['exportfilename'])
|
||||
else:
|
||||
# Create a new empty DataFrame with the desired column names and set the index
|
||||
csv_df = pd.DataFrame(columns=[
|
||||
'filename', 'strategy', 'has_bias', 'total_signals',
|
||||
'biased_entry_signals', 'biased_exit_signals', 'biased_indicators'
|
||||
],
|
||||
index=None)
|
||||
|
||||
for inst in bias_checker_instances:
|
||||
new_row_data = {'filename': inst.strategy_obj['location'].parts[-1],
|
||||
'strategy': inst.strategy_obj['name'],
|
||||
'has_bias': inst.current_analysis.has_bias,
|
||||
'total_signals': inst.current_analysis.total_signals,
|
||||
'biased_entry_signals': inst.current_analysis.false_entry_signals,
|
||||
'biased_exit_signals': inst.current_analysis.false_exit_signals,
|
||||
'biased_indicators': ",".join(inst.current_analysis.false_indicators)}
|
||||
csv_df = add_or_update_row(csv_df, new_row_data)
|
||||
|
||||
print(f"saving {args['exportfilename']}")
|
||||
csv_df.to_csv(args['exportfilename'], index=False)
|
||||
|
||||
|
||||
def initialize_single_lookahead_bias_checker(strategy_obj, config, args):
|
||||
print(f"Bias test of {Path(strategy_obj['location']).name} started.")
|
||||
start = time.perf_counter()
|
||||
current_instance = BacktestLookaheadBiasChecker()
|
||||
current_instance.start(config, strategy_obj, args)
|
||||
elapsed = time.perf_counter() - start
|
||||
print(f"checking look ahead bias via backtests of {Path(strategy_obj['location']).name} "
|
||||
f"took {elapsed:.1f} seconds.")
|
||||
return current_instance
|
||||
|
||||
347
freqtrade/optimize/lookahead_analysis.py
Executable file
347
freqtrade/optimize/lookahead_analysis.py
Executable file
@@ -0,0 +1,347 @@
|
||||
import copy
|
||||
import logging
|
||||
import pathlib
|
||||
import shutil
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.data.history import get_timerange
|
||||
from freqtrade.exchange import timeframe_to_minutes
|
||||
from freqtrade.optimize.backtesting import Backtesting
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VarHolder:
|
||||
timerange: TimeRange
|
||||
data: pd.DataFrame
|
||||
indicators: pd.DataFrame
|
||||
result: pd.DataFrame
|
||||
compared: pd.DataFrame
|
||||
from_dt: datetime
|
||||
to_dt: datetime
|
||||
compared_dt: datetime
|
||||
timeframe: str
|
||||
|
||||
|
||||
class Analysis:
|
||||
def __init__(self) -> None:
|
||||
self.total_signals = 0
|
||||
self.false_entry_signals = 0
|
||||
self.false_exit_signals = 0
|
||||
self.false_indicators: List[str] = []
|
||||
self.has_bias = False
|
||||
|
||||
|
||||
class LookaheadAnalysis:
|
||||
|
||||
def __init__(self, config: Dict[str, Any], strategy_obj: dict, args: Dict[str, Any]):
|
||||
self.failed_bias_check = True
|
||||
self.full_varHolder = VarHolder
|
||||
|
||||
self.entry_varHolders: List[VarHolder] = []
|
||||
self.exit_varHolders: List[VarHolder] = []
|
||||
|
||||
# pull variables the scope of the lookahead_analysis-instance
|
||||
self.local_config = deepcopy(config)
|
||||
self.local_config['strategy'] = strategy_obj['name']
|
||||
self.current_analysis = Analysis()
|
||||
self.minimum_trade_amount = args['minimum_trade_amount']
|
||||
self.targeted_trade_amount = args['targeted_trade_amount']
|
||||
self.exportfilename = args['exportfilename']
|
||||
self.strategy_obj = strategy_obj
|
||||
|
||||
@staticmethod
|
||||
def dt_to_timestamp(dt: datetime):
|
||||
timestamp = int(dt.replace(tzinfo=timezone.utc).timestamp())
|
||||
return timestamp
|
||||
|
||||
@staticmethod
|
||||
def get_result(backtesting, processed: pd.DataFrame):
|
||||
min_date, max_date = get_timerange(processed)
|
||||
|
||||
result = backtesting.backtest(
|
||||
processed=deepcopy(processed),
|
||||
start_date=min_date,
|
||||
end_date=max_date
|
||||
)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def report_signal(result: dict, column_name: str, checked_timestamp: datetime):
|
||||
df = result['results']
|
||||
row_count = df[column_name].shape[0]
|
||||
|
||||
if row_count == 0:
|
||||
return False
|
||||
else:
|
||||
|
||||
df_cut = df[(df[column_name] == checked_timestamp)]
|
||||
if df_cut[column_name].shape[0] == 0:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
return False
|
||||
|
||||
# analyzes two data frames with processed indicators and shows differences between them.
|
||||
def analyze_indicators(self, full_vars: VarHolder, cut_vars: VarHolder, current_pair):
|
||||
# extract dataframes
|
||||
cut_df = cut_vars.indicators[current_pair]
|
||||
full_df = full_vars.indicators[current_pair]
|
||||
|
||||
# cut longer dataframe to length of the shorter
|
||||
full_df_cut = full_df[
|
||||
(full_df.date == cut_vars.compared_dt)
|
||||
].reset_index(drop=True)
|
||||
cut_df_cut = cut_df[
|
||||
(cut_df.date == cut_vars.compared_dt)
|
||||
].reset_index(drop=True)
|
||||
|
||||
# compare dataframes
|
||||
if full_df_cut.shape[0] != 0:
|
||||
if cut_df_cut.shape[0] != 0:
|
||||
compare_df = full_df_cut.compare(cut_df_cut)
|
||||
|
||||
if compare_df.shape[0] > 0:
|
||||
for col_name, values in compare_df.items():
|
||||
col_idx = compare_df.columns.get_loc(col_name)
|
||||
compare_df_row = compare_df.iloc[0]
|
||||
# compare_df now comprises tuples with [1] having either 'self' or 'other'
|
||||
if 'other' in col_name[1]:
|
||||
continue
|
||||
self_value = compare_df_row[col_idx]
|
||||
other_value = compare_df_row[col_idx + 1]
|
||||
|
||||
# output differences
|
||||
if self_value != other_value:
|
||||
|
||||
if not self.current_analysis.false_indicators.__contains__(col_name[0]):
|
||||
self.current_analysis.false_indicators.append(col_name[0])
|
||||
logging.info(f"=> found look ahead bias in indicator "
|
||||
f"{col_name[0]}. "
|
||||
f"{str(self_value)} != {str(other_value)}")
|
||||
|
||||
def prepare_data(self, varholder: VarHolder, pairs_to_load: List[pd.DataFrame]):
|
||||
|
||||
# purge previous data
|
||||
abs_folder_path = pathlib.Path("user_data/models/uniqe-id").resolve()
|
||||
# remove folder and its contents
|
||||
if pathlib.Path.exists(abs_folder_path):
|
||||
shutil.rmtree(abs_folder_path)
|
||||
|
||||
prepare_data_config = copy.deepcopy(self.local_config)
|
||||
prepare_data_config['timerange'] = (str(self.dt_to_timestamp(varholder.from_dt)) + "-" +
|
||||
str(self.dt_to_timestamp(varholder.to_dt)))
|
||||
prepare_data_config['exchange']['pair_whitelist'] = pairs_to_load
|
||||
|
||||
self.backtesting = Backtesting(prepare_data_config)
|
||||
self.backtesting._set_strategy(self.backtesting.strategylist[0])
|
||||
varholder.data, varholder.timerange = self.backtesting.load_bt_data()
|
||||
self.backtesting.load_bt_data_detail()
|
||||
varholder.timeframe = self.backtesting.timeframe
|
||||
|
||||
varholder.indicators = self.backtesting.strategy.advise_all_indicators(varholder.data)
|
||||
varholder.result = self.get_result(self.backtesting, varholder.indicators)
|
||||
|
||||
def fill_full_varholder(self):
|
||||
self.full_varHolder = VarHolder()
|
||||
|
||||
# define datetime in human-readable format
|
||||
parsed_timerange = TimeRange.parse_timerange(self.local_config['timerange'])
|
||||
|
||||
if parsed_timerange.startdt is None:
|
||||
self.full_varHolder.from_dt = datetime.fromtimestamp(0, tz=timezone.utc)
|
||||
else:
|
||||
self.full_varHolder.from_dt = parsed_timerange.startdt
|
||||
|
||||
if parsed_timerange.stopdt is None:
|
||||
self.full_varHolder.to_dt = datetime.utcnow()
|
||||
else:
|
||||
self.full_varHolder.to_dt = parsed_timerange.stopdt
|
||||
|
||||
self.prepare_data(self.full_varHolder, self.local_config['pairs'])
|
||||
|
||||
def fill_entry_and_exit_varHolders(self, idx, result_row):
|
||||
# entry_varHolder
|
||||
entry_varHolder = VarHolder()
|
||||
self.entry_varHolders.append(entry_varHolder)
|
||||
entry_varHolder.from_dt = self.full_varHolder.from_dt
|
||||
entry_varHolder.compared_dt = result_row['open_date']
|
||||
# to_dt needs +1 candle since it won't buy on the last candle
|
||||
entry_varHolder.to_dt = (
|
||||
result_row['open_date'] +
|
||||
timedelta(minutes=timeframe_to_minutes(self.full_varHolder.timeframe)))
|
||||
self.prepare_data(entry_varHolder, [result_row['pair']])
|
||||
|
||||
# exit_varHolder
|
||||
exit_varHolder = VarHolder()
|
||||
self.exit_varHolders.append(exit_varHolder)
|
||||
# to_dt needs +1 candle since it will always exit/force-exit trades on the last candle
|
||||
exit_varHolder.from_dt = self.full_varHolder.from_dt
|
||||
exit_varHolder.to_dt = (
|
||||
result_row['close_date'] +
|
||||
timedelta(minutes=timeframe_to_minutes(self.full_varHolder.timeframe)))
|
||||
exit_varHolder.compared_dt = result_row['close_date']
|
||||
self.prepare_data(exit_varHolder, [result_row['pair']])
|
||||
|
||||
# now we analyze a full trade of full_varholder and look for analyze its bias
|
||||
def analyze_row(self, idx, result_row):
|
||||
# if force-sold, ignore this signal since here it will unconditionally exit.
|
||||
if result_row.close_date == self.dt_to_timestamp(self.full_varHolder.to_dt):
|
||||
return
|
||||
|
||||
# keep track of how many signals are processed at total
|
||||
self.current_analysis.total_signals += 1
|
||||
|
||||
# fill entry_varHolder and exit_varHolder
|
||||
self.fill_entry_and_exit_varHolders(idx, result_row)
|
||||
|
||||
# register if buy signal is broken
|
||||
if not self.report_signal(
|
||||
self.entry_varHolders[idx].result,
|
||||
"open_date",
|
||||
self.entry_varHolders[idx].compared_dt):
|
||||
self.current_analysis.false_entry_signals += 1
|
||||
|
||||
# register if buy or sell signal is broken
|
||||
if not self.report_signal(
|
||||
self.exit_varHolders[idx].result,
|
||||
"close_date",
|
||||
self.exit_varHolders[idx].compared_dt):
|
||||
self.current_analysis.false_exit_signals += 1
|
||||
|
||||
# check if the indicators themselves contain biased data
|
||||
self.analyze_indicators(self.full_varHolder, self.entry_varHolders[idx], result_row['pair'])
|
||||
self.analyze_indicators(self.full_varHolder, self.exit_varHolders[idx], result_row['pair'])
|
||||
|
||||
def start(self) -> None:
|
||||
|
||||
# first make a single backtest
|
||||
self.fill_full_varholder()
|
||||
|
||||
# check if requirements have been met of full_varholder
|
||||
found_signals: int = self.full_varHolder.result['results'].shape[0] + 1
|
||||
if found_signals >= self.targeted_trade_amount:
|
||||
logging.info(f"Found {found_signals} trades, "
|
||||
f"calculating {self.targeted_trade_amount} trades.")
|
||||
elif self.targeted_trade_amount >= found_signals >= self.minimum_trade_amount:
|
||||
logging.info(f"Only found {found_signals} trades. Calculating all available trades.")
|
||||
else:
|
||||
logging.info(f"found {found_signals} trades "
|
||||
f"which is less than minimum_trade_amount {self.minimum_trade_amount}. "
|
||||
f"Cancelling this backtest lookahead bias test.")
|
||||
return
|
||||
|
||||
# now we loop through all signals
|
||||
# starting from the same datetime to avoid miss-reports of bias
|
||||
for idx, result_row in self.full_varHolder.result['results'].iterrows():
|
||||
if self.current_analysis.total_signals == self.targeted_trade_amount:
|
||||
break
|
||||
self.analyze_row(idx, result_row)
|
||||
|
||||
# check and report signals
|
||||
if (self.current_analysis.false_entry_signals > 0 or
|
||||
self.current_analysis.false_exit_signals > 0 or
|
||||
len(self.current_analysis.false_indicators) > 0):
|
||||
logging.info(f" => {self.local_config['strategy']} + : bias detected!")
|
||||
self.current_analysis.has_bias = True
|
||||
else:
|
||||
logging.info(self.local_config['strategy'] + ": no bias detected")
|
||||
|
||||
self.failed_bias_check = False
|
||||
|
||||
|
||||
class LookaheadAnalysisSubFunctions:
|
||||
@staticmethod
|
||||
def text_table_lookahead_analysis_instances(lookahead_instances: List[LookaheadAnalysis]):
|
||||
headers = ['filename', 'strategy', 'has_bias', 'total_signals',
|
||||
'biased_entry_signals', 'biased_exit_signals', 'biased_indicators']
|
||||
data = []
|
||||
for inst in lookahead_instances:
|
||||
if inst.failed_bias_check:
|
||||
data.append(
|
||||
[
|
||||
inst.strategy_obj['location'].parts[-1],
|
||||
inst.strategy_obj['name'],
|
||||
'error while checking'
|
||||
]
|
||||
)
|
||||
else:
|
||||
data.append(
|
||||
[
|
||||
inst.strategy_obj['location'].parts[-1],
|
||||
inst.strategy_obj['name'],
|
||||
inst.current_analysis.has_bias,
|
||||
inst.current_analysis.total_signals,
|
||||
inst.current_analysis.false_entry_signals,
|
||||
inst.current_analysis.false_exit_signals,
|
||||
", ".join(inst.current_analysis.false_indicators)
|
||||
]
|
||||
)
|
||||
from tabulate import tabulate
|
||||
table = tabulate(data, headers=headers, tablefmt="orgtbl")
|
||||
print(table)
|
||||
|
||||
@staticmethod
|
||||
def export_to_csv(args: Dict[str, Any], lookahead_analysis: List[LookaheadAnalysis]):
|
||||
def add_or_update_row(df, row_data):
|
||||
if (
|
||||
(df['filename'] == row_data['filename']) &
|
||||
(df['strategy'] == row_data['strategy'])
|
||||
).any():
|
||||
# Update existing row
|
||||
pd_series = pd.DataFrame([row_data])
|
||||
df.loc[
|
||||
(df['filename'] == row_data['filename']) &
|
||||
(df['strategy'] == row_data['strategy'])
|
||||
] = pd_series
|
||||
else:
|
||||
# Add new row
|
||||
df = pd.concat([df, pd.DataFrame([row_data], columns=df.columns)])
|
||||
|
||||
return df
|
||||
|
||||
if Path(args['exportfilename']).exists():
|
||||
# Read CSV file into a pandas dataframe
|
||||
csv_df = pd.read_csv(args['exportfilename'])
|
||||
else:
|
||||
# Create a new empty DataFrame with the desired column names and set the index
|
||||
csv_df = pd.DataFrame(columns=[
|
||||
'filename', 'strategy', 'has_bias', 'total_signals',
|
||||
'biased_entry_signals', 'biased_exit_signals', 'biased_indicators'
|
||||
],
|
||||
index=None)
|
||||
|
||||
for inst in lookahead_analysis:
|
||||
new_row_data = {'filename': inst.strategy_obj['location'].parts[-1],
|
||||
'strategy': inst.strategy_obj['name'],
|
||||
'has_bias': inst.current_analysis.has_bias,
|
||||
'total_signals': inst.current_analysis.total_signals,
|
||||
'biased_entry_signals': inst.current_analysis.false_entry_signals,
|
||||
'biased_exit_signals': inst.current_analysis.false_exit_signals,
|
||||
'biased_indicators': ",".join(inst.current_analysis.false_indicators)}
|
||||
csv_df = add_or_update_row(csv_df, new_row_data)
|
||||
|
||||
logger.info(f"saving {args['exportfilename']}")
|
||||
csv_df.to_csv(args['exportfilename'], index=False)
|
||||
|
||||
@staticmethod
|
||||
def initialize_single_lookahead_analysis(strategy_obj: Dict[str, Any], config: Dict[str, Any],
|
||||
args: Dict[str, Any]):
|
||||
|
||||
logger.info(f"Bias test of {Path(strategy_obj['location']).name} started.")
|
||||
start = time.perf_counter()
|
||||
current_instance = LookaheadAnalysis(config, strategy_obj, args)
|
||||
current_instance.start()
|
||||
elapsed = time.perf_counter() - start
|
||||
logger.info(f"checking look ahead bias via backtests "
|
||||
f"of {Path(strategy_obj['location']).name} "
|
||||
f"took {elapsed:.0f} seconds.")
|
||||
return current_instance
|
||||
Reference in New Issue
Block a user