From 2c60985e2d3a9de37b1acf382c88f4cab7450665 Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 12 May 2024 17:16:55 +0200 Subject: [PATCH] ruff format: optimize analysis --- freqtrade/optimize/analysis/lookahead.py | 171 +++++++------- .../optimize/analysis/lookahead_helpers.py | 208 ++++++++++-------- freqtrade/optimize/analysis/recursive.py | 52 +++-- .../optimize/analysis/recursive_helpers.py | 62 +++--- 4 files changed, 267 insertions(+), 226 deletions(-) diff --git a/freqtrade/optimize/analysis/lookahead.py b/freqtrade/optimize/analysis/lookahead.py index bb2ee0899..a8eb0258e 100755 --- a/freqtrade/optimize/analysis/lookahead.py +++ b/freqtrade/optimize/analysis/lookahead.py @@ -30,38 +30,33 @@ class Analysis: class LookaheadAnalysis(BaseAnalysis): - def __init__(self, config: Dict[str, Any], strategy_obj: Dict): - super().__init__(config, strategy_obj) self.entry_varHolders: List[VarHolder] = [] self.exit_varHolders: List[VarHolder] = [] self.current_analysis = Analysis() - self.minimum_trade_amount = config['minimum_trade_amount'] - self.targeted_trade_amount = config['targeted_trade_amount'] + self.minimum_trade_amount = config["minimum_trade_amount"] + self.targeted_trade_amount = config["targeted_trade_amount"] @staticmethod def get_result(backtesting: Backtesting, processed: DataFrame): min_date, max_date = get_timerange(processed) result = backtesting.backtest( - processed=deepcopy(processed), - start_date=min_date, - end_date=max_date + processed=deepcopy(processed), start_date=min_date, end_date=max_date ) return result @staticmethod def report_signal(result: dict, column_name: str, checked_timestamp: datetime): - df = result['results'] + df = result["results"] row_count = df[column_name].shape[0] if row_count == 0: return False else: - df_cut = df[(df[column_name] == checked_timestamp)] if df_cut[column_name].shape[0] == 0: return False @@ -76,16 +71,11 @@ class LookaheadAnalysis(BaseAnalysis): full_df: DataFrame = full_vars.indicators[current_pair] # cut longer dataframe to length of the shorter - full_df_cut = full_df[ - (full_df.date == cut_vars.compared_dt) - ].reset_index(drop=True) - cut_df_cut = cut_df[ - (cut_df.date == cut_vars.compared_dt) - ].reset_index(drop=True) + full_df_cut = full_df[(full_df.date == cut_vars.compared_dt)].reset_index(drop=True) + cut_df_cut = cut_df[(cut_df.date == cut_vars.compared_dt)].reset_index(drop=True) # check if dataframes are not empty if full_df_cut.shape[0] != 0 and cut_df_cut.shape[0] != 0: - # compare dataframes compare_df = full_df_cut.compare(cut_df_cut) @@ -94,40 +84,44 @@ class LookaheadAnalysis(BaseAnalysis): col_idx = compare_df.columns.get_loc(col_name) compare_df_row = compare_df.iloc[0] # compare_df now comprises tuples with [1] having either 'self' or 'other' - if 'other' in col_name[1]: + if "other" in col_name[1]: continue self_value = compare_df_row.iloc[col_idx] other_value = compare_df_row.iloc[col_idx + 1] # output differences if self_value != other_value: - if not self.current_analysis.false_indicators.__contains__(col_name[0]): self.current_analysis.false_indicators.append(col_name[0]) - logger.info(f"=> found look ahead bias in indicator " - f"{col_name[0]}. " - f"{str(self_value)} != {str(other_value)}") + logger.info( + f"=> found look ahead bias in indicator " + f"{col_name[0]}. " + f"{str(self_value)} != {str(other_value)}" + ) def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]): - - if 'freqai' in self.local_config and 'identifier' in self.local_config['freqai']: + if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]: # purge previous data if the freqai model is defined # (to be sure nothing is carried over from older backtests) - path_to_current_identifier = ( - Path(f"{self.local_config['user_data_dir']}/models/" - f"{self.local_config['freqai']['identifier']}").resolve()) + path_to_current_identifier = Path( + f"{self.local_config['user_data_dir']}/models/" + f"{self.local_config['freqai']['identifier']}" + ).resolve() # remove folder and its contents if Path.exists(path_to_current_identifier): shutil.rmtree(path_to_current_identifier) prepare_data_config = deepcopy(self.local_config) - prepare_data_config['timerange'] = (str(self.dt_to_timestamp(varholder.from_dt)) + "-" + - str(self.dt_to_timestamp(varholder.to_dt))) - prepare_data_config['exchange']['pair_whitelist'] = pairs_to_load + prepare_data_config["timerange"] = ( + str(self.dt_to_timestamp(varholder.from_dt)) + + "-" + + str(self.dt_to_timestamp(varholder.to_dt)) + ) + prepare_data_config["exchange"]["pair_whitelist"] = pairs_to_load if self._fee is not None: # Don't re-calculate fee per pair, as fee might differ per pair. - prepare_data_config['fee'] = self._fee + prepare_data_config["fee"] = self._fee backtesting = Backtesting(prepare_data_config, self.exchange) self.exchange = backtesting.exchange @@ -146,23 +140,23 @@ class LookaheadAnalysis(BaseAnalysis): entry_varHolder = VarHolder() self.entry_varHolders.append(entry_varHolder) entry_varHolder.from_dt = self.full_varHolder.from_dt - entry_varHolder.compared_dt = result_row['open_date'] + entry_varHolder.compared_dt = result_row["open_date"] # to_dt needs +1 candle since it won't buy on the last candle - entry_varHolder.to_dt = ( - result_row['open_date'] + - timedelta(minutes=timeframe_to_minutes(self.full_varHolder.timeframe))) - self.prepare_data(entry_varHolder, [result_row['pair']]) + entry_varHolder.to_dt = result_row["open_date"] + timedelta( + minutes=timeframe_to_minutes(self.full_varHolder.timeframe) + ) + self.prepare_data(entry_varHolder, [result_row["pair"]]) # exit_varHolder exit_varHolder = VarHolder() self.exit_varHolders.append(exit_varHolder) # to_dt needs +1 candle since it will always exit/force-exit trades on the last candle exit_varHolder.from_dt = self.full_varHolder.from_dt - exit_varHolder.to_dt = ( - result_row['close_date'] + - timedelta(minutes=timeframe_to_minutes(self.full_varHolder.timeframe))) - exit_varHolder.compared_dt = result_row['close_date'] - self.prepare_data(exit_varHolder, [result_row['pair']]) + exit_varHolder.to_dt = result_row["close_date"] + timedelta( + minutes=timeframe_to_minutes(self.full_varHolder.timeframe) + ) + exit_varHolder.compared_dt = result_row["close_date"] + self.prepare_data(exit_varHolder, [result_row["pair"]]) # now we analyze a full trade of full_varholder and look for analyze its bias def analyze_row(self, idx: int, result_row): @@ -181,65 +175,72 @@ class LookaheadAnalysis(BaseAnalysis): # register if buy signal is broken if not self.report_signal( - self.entry_varHolders[idx].result, - "open_date", - self.entry_varHolders[idx].compared_dt): + self.entry_varHolders[idx].result, "open_date", self.entry_varHolders[idx].compared_dt + ): self.current_analysis.false_entry_signals += 1 buy_or_sell_biased = True # register if buy or sell signal is broken if not self.report_signal( - self.exit_varHolders[idx].result, - "close_date", - self.exit_varHolders[idx].compared_dt): + self.exit_varHolders[idx].result, "close_date", self.exit_varHolders[idx].compared_dt + ): self.current_analysis.false_exit_signals += 1 buy_or_sell_biased = True if buy_or_sell_biased: - logger.info(f"found lookahead-bias in trade " - f"pair: {result_row['pair']}, " - f"timerange:{result_row['open_date']} - {result_row['close_date']}, " - f"idx: {idx}") + logger.info( + f"found lookahead-bias in trade " + f"pair: {result_row['pair']}, " + f"timerange:{result_row['open_date']} - {result_row['close_date']}, " + f"idx: {idx}" + ) # check if the indicators themselves contain biased data - self.analyze_indicators(self.full_varHolder, self.entry_varHolders[idx], result_row['pair']) - self.analyze_indicators(self.full_varHolder, self.exit_varHolders[idx], result_row['pair']) + self.analyze_indicators(self.full_varHolder, self.entry_varHolders[idx], result_row["pair"]) + self.analyze_indicators(self.full_varHolder, self.exit_varHolders[idx], result_row["pair"]) def start(self) -> None: - super().start() reduce_verbosity_for_bias_tester() # check if requirements have been met of full_varholder - found_signals: int = self.full_varHolder.result['results'].shape[0] + 1 + found_signals: int = self.full_varHolder.result["results"].shape[0] + 1 if found_signals >= self.targeted_trade_amount: - logger.info(f"Found {found_signals} trades, " - f"calculating {self.targeted_trade_amount} trades.") + logger.info( + f"Found {found_signals} trades, " + f"calculating {self.targeted_trade_amount} trades." + ) elif self.targeted_trade_amount >= found_signals >= self.minimum_trade_amount: logger.info(f"Only found {found_signals} trades. Calculating all available trades.") else: - logger.info(f"found {found_signals} trades " - f"which is less than minimum_trade_amount {self.minimum_trade_amount}. " - f"Cancelling this backtest lookahead bias test.") + logger.info( + f"found {found_signals} trades " + f"which is less than minimum_trade_amount {self.minimum_trade_amount}. " + f"Cancelling this backtest lookahead bias test." + ) return # now we loop through all signals # starting from the same datetime to avoid miss-reports of bias - for idx, result_row in self.full_varHolder.result['results'].iterrows(): + for idx, result_row in self.full_varHolder.result["results"].iterrows(): if self.current_analysis.total_signals == self.targeted_trade_amount: logger.info(f"Found targeted trade amount = {self.targeted_trade_amount} signals.") break if found_signals < self.minimum_trade_amount: - logger.info(f"only found {found_signals} " - f"which is smaller than " - f"minimum trade amount = {self.minimum_trade_amount}. " - f"Exiting this lookahead-analysis") + logger.info( + f"only found {found_signals} " + f"which is smaller than " + f"minimum trade amount = {self.minimum_trade_amount}. " + f"Exiting this lookahead-analysis" + ) return None - if "force_exit" in result_row['exit_reason']: - logger.info("found force-exit in pair: {result_row['pair']}, " - f"timerange:{result_row['open_date']}-{result_row['close_date']}, " - f"idx: {idx}, skipping this one to avoid a false-positive.") + if "force_exit" in result_row["exit_reason"]: + logger.info( + "found force-exit in pair: {result_row['pair']}, " + f"timerange:{result_row['open_date']}-{result_row['close_date']}, " + f"idx: {idx}, skipping this one to avoid a false-positive." + ) # just to keep the IDs of both full, entry and exit varholders the same # to achieve a better debugging experience @@ -250,27 +251,33 @@ class LookaheadAnalysis(BaseAnalysis): self.analyze_row(idx, result_row) if len(self.entry_varHolders) < self.minimum_trade_amount: - logger.info(f"only found {found_signals} after skipping forced exits " - f"which is smaller than " - f"minimum trade amount = {self.minimum_trade_amount}. " - f"Exiting this lookahead-analysis") + logger.info( + f"only found {found_signals} after skipping forced exits " + f"which is smaller than " + f"minimum trade amount = {self.minimum_trade_amount}. " + f"Exiting this lookahead-analysis" + ) # Restore verbosity, so it's not too quiet for the next strategy restore_verbosity_for_bias_tester() # check and report signals - if self.current_analysis.total_signals < self.local_config['minimum_trade_amount']: - logger.info(f" -> {self.local_config['strategy']} : too few trades. " - f"We only found {self.current_analysis.total_signals} trades. " - f"Hint: Extend the timerange " - f"to get at least {self.local_config['minimum_trade_amount']} " - f"or lower the value of minimum_trade_amount.") + if self.current_analysis.total_signals < self.local_config["minimum_trade_amount"]: + logger.info( + f" -> {self.local_config['strategy']} : too few trades. " + f"We only found {self.current_analysis.total_signals} trades. " + f"Hint: Extend the timerange " + f"to get at least {self.local_config['minimum_trade_amount']} " + f"or lower the value of minimum_trade_amount." + ) self.failed_bias_check = True - elif (self.current_analysis.false_entry_signals > 0 or - self.current_analysis.false_exit_signals > 0 or - len(self.current_analysis.false_indicators) > 0): + elif ( + self.current_analysis.false_entry_signals > 0 + or self.current_analysis.false_exit_signals > 0 + or len(self.current_analysis.false_indicators) > 0 + ): logger.info(f" => {self.local_config['strategy']} : bias detected!") self.current_analysis.has_bias = True self.failed_bias_check = False else: - logger.info(self.local_config['strategy'] + ": no bias detected") + logger.info(self.local_config["strategy"] + ": no bias detected") self.failed_bias_check = False diff --git a/freqtrade/optimize/analysis/lookahead_helpers.py b/freqtrade/optimize/analysis/lookahead_helpers.py index d2cc541f2..c0e6fa1ba 100644 --- a/freqtrade/optimize/analysis/lookahead_helpers.py +++ b/freqtrade/optimize/analysis/lookahead_helpers.py @@ -15,46 +15,53 @@ logger = logging.getLogger(__name__) class LookaheadAnalysisSubFunctions: - @staticmethod def text_table_lookahead_analysis_instances( - config: Dict[str, Any], - lookahead_instances: List[LookaheadAnalysis]): - headers = ['filename', 'strategy', 'has_bias', 'total_signals', - 'biased_entry_signals', 'biased_exit_signals', 'biased_indicators'] + config: Dict[str, Any], lookahead_instances: List[LookaheadAnalysis] + ): + headers = [ + "filename", + "strategy", + "has_bias", + "total_signals", + "biased_entry_signals", + "biased_exit_signals", + "biased_indicators", + ] data = [] for inst in lookahead_instances: - if config['minimum_trade_amount'] > inst.current_analysis.total_signals: + if config["minimum_trade_amount"] > inst.current_analysis.total_signals: data.append( [ - inst.strategy_obj['location'].parts[-1], - inst.strategy_obj['name'], + inst.strategy_obj["location"].parts[-1], + inst.strategy_obj["name"], "too few trades caught " f"({inst.current_analysis.total_signals}/{config['minimum_trade_amount']})." - f"Test failed." + f"Test failed.", ] ) elif inst.failed_bias_check: data.append( [ - inst.strategy_obj['location'].parts[-1], - inst.strategy_obj['name'], - 'error while checking' + inst.strategy_obj["location"].parts[-1], + inst.strategy_obj["name"], + "error while checking", ] ) else: data.append( [ - inst.strategy_obj['location'].parts[-1], - inst.strategy_obj['name'], + inst.strategy_obj["location"].parts[-1], + inst.strategy_obj["name"], inst.current_analysis.has_bias, inst.current_analysis.total_signals, inst.current_analysis.false_entry_signals, inst.current_analysis.false_exit_signals, - ", ".join(inst.current_analysis.false_indicators) + ", ".join(inst.current_analysis.false_indicators), ] ) from tabulate import tabulate + table = tabulate(data, headers=headers, tablefmt="orgtbl") print(table) return table, headers, data @@ -63,89 +70,101 @@ class LookaheadAnalysisSubFunctions: def export_to_csv(config: Dict[str, Any], lookahead_analysis: List[LookaheadAnalysis]): def add_or_update_row(df, row_data): if ( - (df['filename'] == row_data['filename']) & - (df['strategy'] == row_data['strategy']) + (df["filename"] == row_data["filename"]) & (df["strategy"] == row_data["strategy"]) ).any(): # Update existing row pd_series = pd.DataFrame([row_data]) df.loc[ - (df['filename'] == row_data['filename']) & - (df['strategy'] == row_data['strategy']) - ] = pd_series + (df["filename"] == row_data["filename"]) + & (df["strategy"] == row_data["strategy"]) + ] = pd_series else: # Add new row df = pd.concat([df, pd.DataFrame([row_data], columns=df.columns)]) return df - if Path(config['lookahead_analysis_exportfilename']).exists(): + if Path(config["lookahead_analysis_exportfilename"]).exists(): # Read CSV file into a pandas dataframe - csv_df = pd.read_csv(config['lookahead_analysis_exportfilename']) + csv_df = pd.read_csv(config["lookahead_analysis_exportfilename"]) else: # Create a new empty DataFrame with the desired column names and set the index - csv_df = pd.DataFrame(columns=[ - 'filename', 'strategy', 'has_bias', 'total_signals', - 'biased_entry_signals', 'biased_exit_signals', 'biased_indicators' - ], - index=None) + csv_df = pd.DataFrame( + columns=[ + "filename", + "strategy", + "has_bias", + "total_signals", + "biased_entry_signals", + "biased_exit_signals", + "biased_indicators", + ], + index=None, + ) for inst in lookahead_analysis: # only update if - if (inst.current_analysis.total_signals > config['minimum_trade_amount'] - and inst.failed_bias_check is not True): - new_row_data = {'filename': inst.strategy_obj['location'].parts[-1], - 'strategy': inst.strategy_obj['name'], - 'has_bias': inst.current_analysis.has_bias, - 'total_signals': - int(inst.current_analysis.total_signals), - 'biased_entry_signals': - int(inst.current_analysis.false_entry_signals), - 'biased_exit_signals': - int(inst.current_analysis.false_exit_signals), - 'biased_indicators': - ",".join(inst.current_analysis.false_indicators)} + if ( + inst.current_analysis.total_signals > config["minimum_trade_amount"] + and inst.failed_bias_check is not True + ): + new_row_data = { + "filename": inst.strategy_obj["location"].parts[-1], + "strategy": inst.strategy_obj["name"], + "has_bias": inst.current_analysis.has_bias, + "total_signals": int(inst.current_analysis.total_signals), + "biased_entry_signals": int(inst.current_analysis.false_entry_signals), + "biased_exit_signals": int(inst.current_analysis.false_exit_signals), + "biased_indicators": ",".join(inst.current_analysis.false_indicators), + } csv_df = add_or_update_row(csv_df, new_row_data) # Fill NaN values with a default value (e.g., 0) - csv_df['total_signals'] = csv_df['total_signals'].astype(int).fillna(0) - csv_df['biased_entry_signals'] = csv_df['biased_entry_signals'].astype(int).fillna(0) - csv_df['biased_exit_signals'] = csv_df['biased_exit_signals'].astype(int).fillna(0) + csv_df["total_signals"] = csv_df["total_signals"].astype(int).fillna(0) + csv_df["biased_entry_signals"] = csv_df["biased_entry_signals"].astype(int).fillna(0) + csv_df["biased_exit_signals"] = csv_df["biased_exit_signals"].astype(int).fillna(0) # Convert columns to integers - csv_df['total_signals'] = csv_df['total_signals'].astype(int) - csv_df['biased_entry_signals'] = csv_df['biased_entry_signals'].astype(int) - csv_df['biased_exit_signals'] = csv_df['biased_exit_signals'].astype(int) + csv_df["total_signals"] = csv_df["total_signals"].astype(int) + csv_df["biased_entry_signals"] = csv_df["biased_entry_signals"].astype(int) + csv_df["biased_exit_signals"] = csv_df["biased_exit_signals"].astype(int) logger.info(f"saving {config['lookahead_analysis_exportfilename']}") - csv_df.to_csv(config['lookahead_analysis_exportfilename'], index=False) + csv_df.to_csv(config["lookahead_analysis_exportfilename"], index=False) @staticmethod def calculate_config_overrides(config: Config): - if config.get('enable_protections', False): + if config.get("enable_protections", False): # if protections are used globally, they can produce false positives. - config['enable_protections'] = False - logger.info('Protections were enabled. ' - 'Disabling protections now ' - 'since they could otherwise produce false positives.') - if config['targeted_trade_amount'] < config['minimum_trade_amount']: + config["enable_protections"] = False + logger.info( + "Protections were enabled. " + "Disabling protections now " + "since they could otherwise produce false positives." + ) + if config["targeted_trade_amount"] < config["minimum_trade_amount"]: # this combo doesn't make any sense. raise OperationalException( "Targeted trade amount can't be smaller than minimum trade amount." ) - if len(config['pairs']) > config.get('max_open_trades', 0): - logger.info('Max_open_trades were less than amount of pairs ' - 'or defined in the strategy. ' - 'Set max_open_trades to amount of pairs ' - 'just to avoid false positives.') - config['max_open_trades'] = len(config['pairs']) + if len(config["pairs"]) > config.get("max_open_trades", 0): + logger.info( + "Max_open_trades were less than amount of pairs " + "or defined in the strategy. " + "Set max_open_trades to amount of pairs " + "just to avoid false positives." + ) + config["max_open_trades"] = len(config["pairs"]) min_dry_run_wallet = 1000000000 - if config['dry_run_wallet'] < min_dry_run_wallet: - logger.info('Dry run wallet was not set to 1 billion, pushing it up there ' - 'just to avoid false positives') - config['dry_run_wallet'] = min_dry_run_wallet + if config["dry_run_wallet"] < min_dry_run_wallet: + logger.info( + "Dry run wallet was not set to 1 billion, pushing it up there " + "just to avoid false positives" + ) + config["dry_run_wallet"] = min_dry_run_wallet - if 'timerange' not in config: + if "timerange" not in config: # setting a timerange is enforced here raise OperationalException( "Please set a timerange. " @@ -155,32 +174,35 @@ class LookaheadAnalysisSubFunctions: # in a combination with a wallet size of 1 billion it should always be able to trade # no matter if they use custom_stake_amount as a small percentage of wallet size # or fixate custom_stake_amount to a certain value. - logger.info('fixing stake_amount to 10k') - config['stake_amount'] = 10000 + logger.info("fixing stake_amount to 10k") + config["stake_amount"] = 10000 # enforce cache to be 'none', shift it to 'none' if not already # (since the default value is 'day') - if config.get('backtest_cache') is None: - config['backtest_cache'] = 'none' - elif config['backtest_cache'] != 'none': - logger.info(f"backtest_cache = " - f"{config['backtest_cache']} detected. " - f"Inside lookahead-analysis it is enforced to be 'none'. " - f"Changed it to 'none'") - config['backtest_cache'] = 'none' + if config.get("backtest_cache") is None: + config["backtest_cache"] = "none" + elif config["backtest_cache"] != "none": + logger.info( + f"backtest_cache = " + f"{config['backtest_cache']} detected. " + f"Inside lookahead-analysis it is enforced to be 'none'. " + f"Changed it to 'none'" + ) + config["backtest_cache"] = "none" return config @staticmethod def initialize_single_lookahead_analysis(config: Config, strategy_obj: Dict[str, Any]): - logger.info(f"Bias test of {Path(strategy_obj['location']).name} started.") start = time.perf_counter() current_instance = LookaheadAnalysis(config, strategy_obj) current_instance.start() elapsed = time.perf_counter() - start - logger.info(f"Checking look ahead bias via backtests " - f"of {Path(strategy_obj['location']).name} " - f"took {elapsed:.0f} seconds.") + logger.info( + f"Checking look ahead bias via backtests " + f"of {Path(strategy_obj['location']).name} " + f"took {elapsed:.0f} seconds." + ) return current_instance @staticmethod @@ -188,36 +210,42 @@ class LookaheadAnalysisSubFunctions: config = LookaheadAnalysisSubFunctions.calculate_config_overrides(config) strategy_objs = StrategyResolver.search_all_objects( - config, enum_failed=False, recursive=config.get('recursive_strategy_search', False)) + config, enum_failed=False, recursive=config.get("recursive_strategy_search", False) + ) lookaheadAnalysis_instances = [] # unify --strategy and --strategy-list to one list - if not (strategy_list := config.get('strategy_list', [])): - if config.get('strategy') is None: + if not (strategy_list := config.get("strategy_list", [])): + if config.get("strategy") is None: raise OperationalException( "No Strategy specified. Please specify a strategy via --strategy or " "--strategy-list" ) - strategy_list = [config['strategy']] + strategy_list = [config["strategy"]] # check if strategies can be properly loaded, only check them if they can be. for strat in strategy_list: for strategy_obj in strategy_objs: - if strategy_obj['name'] == strat and strategy_obj not in strategy_list: + if strategy_obj["name"] == strat and strategy_obj not in strategy_list: lookaheadAnalysis_instances.append( LookaheadAnalysisSubFunctions.initialize_single_lookahead_analysis( - config, strategy_obj)) + config, strategy_obj + ) + ) break # report the results if lookaheadAnalysis_instances: LookaheadAnalysisSubFunctions.text_table_lookahead_analysis_instances( - config, lookaheadAnalysis_instances) - if config.get('lookahead_analysis_exportfilename') is not None: + config, lookaheadAnalysis_instances + ) + if config.get("lookahead_analysis_exportfilename") is not None: LookaheadAnalysisSubFunctions.export_to_csv(config, lookaheadAnalysis_instances) else: - logger.error("There were no strategies specified neither through " - "--strategy nor through " - "--strategy-list " - "or timeframe was not specified.") + logger.error( + "There were no strategies specified neither through " + "--strategy nor through " + "--strategy-list " + "or timeframe was not specified." + ) diff --git a/freqtrade/optimize/analysis/recursive.py b/freqtrade/optimize/analysis/recursive.py index e359d358b..f6e4fa3a9 100644 --- a/freqtrade/optimize/analysis/recursive.py +++ b/freqtrade/optimize/analysis/recursive.py @@ -20,10 +20,8 @@ logger = logging.getLogger(__name__) class RecursiveAnalysis(BaseAnalysis): - def __init__(self, config: Dict[str, Any], strategy_obj: Dict): - - self._startup_candle = config.get('startup_candle', [199, 399, 499, 999, 1999]) + self._startup_candle = config.get("startup_candle", [199, 399, 499, 999, 1999]) super().__init__(config, strategy_obj) @@ -35,8 +33,7 @@ class RecursiveAnalysis(BaseAnalysis): # For recursive bias check # analyzes two data frames with processed indicators and shows differences between them. def analyze_indicators(self): - - pair_to_check = self.local_config['pairs'][0] + pair_to_check = self.local_config["pairs"][0] logger.info("Start checking for recursive bias") # check and report signals @@ -50,17 +47,17 @@ class RecursiveAnalysis(BaseAnalysis): # print(compare_df) for col_name, values in compare_df.items(): # print(col_name) - if 'other' == col_name: + if "other" == col_name: continue indicators = values.index for indicator in indicators: - if (indicator not in self.dict_recursive): + if indicator not in self.dict_recursive: self.dict_recursive[indicator] = {} values_diff = compare_df.loc[indicator] - values_diff_self = values_diff.loc['self'] - values_diff_other = values_diff.loc['other'] + values_diff_self = values_diff.loc["self"] + values_diff_other = values_diff.loc["other"] diff = (values_diff_other - values_diff_self) / values_diff_self * 100 self.dict_recursive[indicator][part.startup_candle] = f"{diff:.3f}%" @@ -72,17 +69,16 @@ class RecursiveAnalysis(BaseAnalysis): # For lookahead bias check # analyzes two data frames with processed indicators and shows differences between them. def analyze_indicators_lookahead(self): - - pair_to_check = self.local_config['pairs'][0] + pair_to_check = self.local_config["pairs"][0] logger.info("Start checking for lookahead bias on indicators only") part = self.partial_varHolder_lookahead_array[0] part_last_row = part.indicators[pair_to_check].iloc[-1] - date_to_check = part_last_row['date'] - index_to_get = (self.full_varHolder.indicators[pair_to_check]['date'] == date_to_check) + date_to_check = part_last_row["date"] + index_to_get = self.full_varHolder.indicators[pair_to_check]["date"] == date_to_check base_row_check = self.full_varHolder.indicators[pair_to_check].loc[index_to_get].iloc[-1] - check_time = part.to_dt.strftime('%Y-%m-%dT%H:%M:%S') + check_time = part.to_dt.strftime("%Y-%m-%dT%H:%M:%S") logger.info(f"Check indicators at {check_time}") # logger.info(f"vs {part_timerange} with {part.startup_candle} startup candle") @@ -92,7 +88,7 @@ class RecursiveAnalysis(BaseAnalysis): # print(compare_df) for col_name, values in compare_df.items(): # print(col_name) - if 'other' == col_name: + if "other" == col_name: continue indicators = values.index @@ -105,21 +101,24 @@ class RecursiveAnalysis(BaseAnalysis): logger.info("No lookahead bias on indicators found.") def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]): - - if 'freqai' in self.local_config and 'identifier' in self.local_config['freqai']: + if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]: # purge previous data if the freqai model is defined # (to be sure nothing is carried over from older backtests) - path_to_current_identifier = ( - Path(f"{self.local_config['user_data_dir']}/models/" - f"{self.local_config['freqai']['identifier']}").resolve()) + path_to_current_identifier = Path( + f"{self.local_config['user_data_dir']}/models/" + f"{self.local_config['freqai']['identifier']}" + ).resolve() # remove folder and its contents if Path.exists(path_to_current_identifier): shutil.rmtree(path_to_current_identifier) prepare_data_config = deepcopy(self.local_config) - prepare_data_config['timerange'] = (str(self.dt_to_timestamp(varholder.from_dt)) + "-" + - str(self.dt_to_timestamp(varholder.to_dt))) - prepare_data_config['exchange']['pair_whitelist'] = pairs_to_load + prepare_data_config["timerange"] = ( + str(self.dt_to_timestamp(varholder.from_dt)) + + "-" + + str(self.dt_to_timestamp(varholder.to_dt)) + ) + prepare_data_config["exchange"]["pair_whitelist"] = pairs_to_load backtesting = Backtesting(prepare_data_config, self.exchange) self.exchange = backtesting.exchange @@ -139,9 +138,9 @@ class RecursiveAnalysis(BaseAnalysis): partial_varHolder.to_dt = self.full_varHolder.to_dt partial_varHolder.startup_candle = startup_candle - self.local_config['startup_candle_count'] = startup_candle + self.local_config["startup_candle_count"] = startup_candle - self.prepare_data(partial_varHolder, self.local_config['pairs']) + self.prepare_data(partial_varHolder, self.local_config["pairs"]) self.partial_varHolder_array.append(partial_varHolder) @@ -153,12 +152,11 @@ class RecursiveAnalysis(BaseAnalysis): partial_varHolder.from_dt = self.full_varHolder.from_dt partial_varHolder.to_dt = end_date - self.prepare_data(partial_varHolder, self.local_config['pairs']) + self.prepare_data(partial_varHolder, self.local_config["pairs"]) self.partial_varHolder_lookahead_array.append(partial_varHolder) def start(self) -> None: - super().start() reduce_verbosity_for_bias_tester() diff --git a/freqtrade/optimize/analysis/recursive_helpers.py b/freqtrade/optimize/analysis/recursive_helpers.py index 32dbce149..cde1a214e 100644 --- a/freqtrade/optimize/analysis/recursive_helpers.py +++ b/freqtrade/optimize/analysis/recursive_helpers.py @@ -13,12 +13,10 @@ logger = logging.getLogger(__name__) class RecursiveAnalysisSubFunctions: - @staticmethod - def text_table_recursive_analysis_instances( - recursive_instances: List[RecursiveAnalysis]): + def text_table_recursive_analysis_instances(recursive_instances: List[RecursiveAnalysis]): startups = recursive_instances[0]._startup_candle - headers = ['indicators'] + headers = ["indicators"] for candle in startups: headers.append(candle) @@ -28,11 +26,12 @@ class RecursiveAnalysisSubFunctions: for indicator, values in inst.dict_recursive.items(): temp_data = [indicator] for candle in startups: - temp_data.append(values.get(int(candle), '-')) + temp_data.append(values.get(int(candle), "-")) data.append(temp_data) if len(data) > 0: from tabulate import tabulate + table = tabulate(data, headers=headers, tablefmt="orgtbl") print(table) return table, headers, data @@ -41,34 +40,37 @@ class RecursiveAnalysisSubFunctions: @staticmethod def calculate_config_overrides(config: Config): - if 'timerange' not in config: + if "timerange" not in config: # setting a timerange is enforced here raise OperationalException( "Please set a timerange. " "A timerange of 5000 candles are enough for recursive analysis." ) - if config.get('backtest_cache') is None: - config['backtest_cache'] = 'none' - elif config['backtest_cache'] != 'none': - logger.info(f"backtest_cache = " - f"{config['backtest_cache']} detected. " - f"Inside recursive-analysis it is enforced to be 'none'. " - f"Changed it to 'none'") - config['backtest_cache'] = 'none' + if config.get("backtest_cache") is None: + config["backtest_cache"] = "none" + elif config["backtest_cache"] != "none": + logger.info( + f"backtest_cache = " + f"{config['backtest_cache']} detected. " + f"Inside recursive-analysis it is enforced to be 'none'. " + f"Changed it to 'none'" + ) + config["backtest_cache"] = "none" return config @staticmethod def initialize_single_recursive_analysis(config: Config, strategy_obj: Dict[str, Any]): - logger.info(f"Recursive test of {Path(strategy_obj['location']).name} started.") start = time.perf_counter() current_instance = RecursiveAnalysis(config, strategy_obj) current_instance.start() elapsed = time.perf_counter() - start - logger.info(f"Checking recursive and indicator-only lookahead bias of indicators " - f"of {Path(strategy_obj['location']).name} " - f"took {elapsed:.0f} seconds.") + logger.info( + f"Checking recursive and indicator-only lookahead bias of indicators " + f"of {Path(strategy_obj['location']).name} " + f"took {elapsed:.0f} seconds." + ) return current_instance @staticmethod @@ -76,31 +78,37 @@ class RecursiveAnalysisSubFunctions: config = RecursiveAnalysisSubFunctions.calculate_config_overrides(config) strategy_objs = StrategyResolver.search_all_objects( - config, enum_failed=False, recursive=config.get('recursive_strategy_search', False)) + config, enum_failed=False, recursive=config.get("recursive_strategy_search", False) + ) RecursiveAnalysis_instances = [] # unify --strategy and --strategy-list to one list - if not (strategy_list := config.get('strategy_list', [])): - if config.get('strategy') is None: + if not (strategy_list := config.get("strategy_list", [])): + if config.get("strategy") is None: raise OperationalException( "No Strategy specified. Please specify a strategy via --strategy" ) - strategy_list = [config['strategy']] + strategy_list = [config["strategy"]] # check if strategies can be properly loaded, only check them if they can be. for strat in strategy_list: for strategy_obj in strategy_objs: - if strategy_obj['name'] == strat and strategy_obj not in strategy_list: + if strategy_obj["name"] == strat and strategy_obj not in strategy_list: RecursiveAnalysis_instances.append( RecursiveAnalysisSubFunctions.initialize_single_recursive_analysis( - config, strategy_obj)) + config, strategy_obj + ) + ) break # report the results if RecursiveAnalysis_instances: RecursiveAnalysisSubFunctions.text_table_recursive_analysis_instances( - RecursiveAnalysis_instances) + RecursiveAnalysis_instances + ) else: - logger.error("There was no strategy specified through --strategy " - "or timeframe was not specified.") + logger.error( + "There was no strategy specified through --strategy " + "or timeframe was not specified." + )