Merge pull request #12126 from stash86/main-stash

Improve lookahead analysis to use full dataframe comparison instead of just the last row
This commit is contained in:
Matthias
2025-08-20 06:45:13 +02:00
committed by GitHub
2 changed files with 32 additions and 32 deletions

View File

@@ -50,8 +50,7 @@ After this initial backtest runs, it will look if the `minimum-trade-amount` is
If this happens, use a wider timerange to get more trades for the analysis, or use a timerange where more trades occur. If this happens, use a wider timerange to get more trades for the analysis, or use a timerange where more trades occur.
After setting the baseline it will then do additional backtest runs for every entry and exit separately. After setting the baseline it will then do additional backtest runs for every entry and exit separately.
When these verification backtests complete, it will compare the indicators at the signal candles (both entry or exit) When these verification backtests complete, it will compare both dataframes (baseline and sliced) for any difference in columns' value and report the bias.
and report the bias.
After all signals have been verified or falsified a result table will be generated for the user to see. After all signals have been verified or falsified a result table will be generated for the user to see.
### How to find and remove bias? How can I salvage a biased strategy? ### How to find and remove bias? How can I salvage a biased strategy?

View File

@@ -70,34 +70,29 @@ class LookaheadAnalysis(BaseAnalysis):
cut_df: DataFrame = cut_vars.indicators[current_pair] cut_df: DataFrame = cut_vars.indicators[current_pair]
full_df: DataFrame = full_vars.indicators[current_pair] full_df: DataFrame = full_vars.indicators[current_pair]
# cut longer dataframe to length of the shorter # trim full_df to the same index and length as cut_df
full_df_cut = full_df[(full_df.date == cut_vars.compared_dt)].reset_index(drop=True) cut_full_df = full_df.loc[cut_df.index]
cut_df_cut = cut_df[(cut_df.date == cut_vars.compared_dt)].reset_index(drop=True) compare_df = cut_full_df.compare(cut_df)
# check if dataframes are not empty if compare_df.shape[0] > 0:
if full_df_cut.shape[0] != 0 and cut_df_cut.shape[0] != 0: for col_name in compare_df:
# compare dataframes col_idx = compare_df.columns.get_loc(col_name)
compare_df = full_df_cut.compare(cut_df_cut) compare_df_row = compare_df.iloc[0]
# compare_df now comprises tuples with [1] having either 'self' or 'other'
if "other" in col_name[1]:
continue
self_value = compare_df_row.iloc[col_idx]
other_value = compare_df_row.iloc[col_idx + 1]
if compare_df.shape[0] > 0: # output differences
for col_name, values in compare_df.items(): if self_value != other_value:
col_idx = compare_df.columns.get_loc(col_name) if not self.current_analysis.false_indicators.__contains__(col_name[0]):
compare_df_row = compare_df.iloc[0] self.current_analysis.false_indicators.append(col_name[0])
# compare_df now comprises tuples with [1] having either 'self' or 'other' logger.info(
if "other" in col_name[1]: f"=> found look ahead bias in column "
continue f"{col_name[0]}. "
self_value = compare_df_row.iloc[col_idx] f"{str(self_value)} != {str(other_value)}"
other_value = compare_df_row.iloc[col_idx + 1] )
# output differences
if self_value != other_value:
if not self.current_analysis.false_indicators.__contains__(col_name[0]):
self.current_analysis.false_indicators.append(col_name[0])
logger.info(
f"=> found look ahead bias in indicator "
f"{col_name[0]}. "
f"{str(self_value)} != {str(other_value)}"
)
def prepare_data(self, varholder: VarHolder, pairs_to_load: list[DataFrame]): def prepare_data(self, varholder: VarHolder, pairs_to_load: list[DataFrame]):
if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]: if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]:
@@ -132,7 +127,13 @@ class LookaheadAnalysis(BaseAnalysis):
varholder.data, varholder.timerange = backtesting.load_bt_data() varholder.data, varholder.timerange = backtesting.load_bt_data()
varholder.timeframe = backtesting.timeframe varholder.timeframe = backtesting.timeframe
varholder.indicators = backtesting.strategy.advise_all_indicators(varholder.data) temp_indicators = backtesting.strategy.advise_all_indicators(varholder.data)
filled_indicators = dict()
for pair, dataframe in temp_indicators.items():
filled_indicators[pair] = backtesting.strategy.ft_advise_signals(
dataframe, {"pair": pair}
)
varholder.indicators = filled_indicators
varholder.result = self.get_result(backtesting, varholder.indicators) varholder.result = self.get_result(backtesting, varholder.indicators)
def fill_entry_and_exit_varHolders(self, result_row): def fill_entry_and_exit_varHolders(self, result_row):