mirror of
https://github.com/freqtrade/freqtrade.git
synced 2026-01-20 05:50:36 +00:00
ruff format: optimize analysis
This commit is contained in:
@@ -30,38 +30,33 @@ class Analysis:
|
||||
|
||||
|
||||
class LookaheadAnalysis(BaseAnalysis):
|
||||
|
||||
def __init__(self, config: Dict[str, Any], strategy_obj: Dict):
|
||||
|
||||
super().__init__(config, strategy_obj)
|
||||
|
||||
self.entry_varHolders: List[VarHolder] = []
|
||||
self.exit_varHolders: List[VarHolder] = []
|
||||
|
||||
self.current_analysis = Analysis()
|
||||
self.minimum_trade_amount = config['minimum_trade_amount']
|
||||
self.targeted_trade_amount = config['targeted_trade_amount']
|
||||
self.minimum_trade_amount = config["minimum_trade_amount"]
|
||||
self.targeted_trade_amount = config["targeted_trade_amount"]
|
||||
|
||||
@staticmethod
|
||||
def get_result(backtesting: Backtesting, processed: DataFrame):
|
||||
min_date, max_date = get_timerange(processed)
|
||||
|
||||
result = backtesting.backtest(
|
||||
processed=deepcopy(processed),
|
||||
start_date=min_date,
|
||||
end_date=max_date
|
||||
processed=deepcopy(processed), start_date=min_date, end_date=max_date
|
||||
)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def report_signal(result: dict, column_name: str, checked_timestamp: datetime):
|
||||
df = result['results']
|
||||
df = result["results"]
|
||||
row_count = df[column_name].shape[0]
|
||||
|
||||
if row_count == 0:
|
||||
return False
|
||||
else:
|
||||
|
||||
df_cut = df[(df[column_name] == checked_timestamp)]
|
||||
if df_cut[column_name].shape[0] == 0:
|
||||
return False
|
||||
@@ -76,16 +71,11 @@ class LookaheadAnalysis(BaseAnalysis):
|
||||
full_df: DataFrame = full_vars.indicators[current_pair]
|
||||
|
||||
# cut longer dataframe to length of the shorter
|
||||
full_df_cut = full_df[
|
||||
(full_df.date == cut_vars.compared_dt)
|
||||
].reset_index(drop=True)
|
||||
cut_df_cut = cut_df[
|
||||
(cut_df.date == cut_vars.compared_dt)
|
||||
].reset_index(drop=True)
|
||||
full_df_cut = full_df[(full_df.date == cut_vars.compared_dt)].reset_index(drop=True)
|
||||
cut_df_cut = cut_df[(cut_df.date == cut_vars.compared_dt)].reset_index(drop=True)
|
||||
|
||||
# check if dataframes are not empty
|
||||
if full_df_cut.shape[0] != 0 and cut_df_cut.shape[0] != 0:
|
||||
|
||||
# compare dataframes
|
||||
compare_df = full_df_cut.compare(cut_df_cut)
|
||||
|
||||
@@ -94,40 +84,44 @@ class LookaheadAnalysis(BaseAnalysis):
|
||||
col_idx = compare_df.columns.get_loc(col_name)
|
||||
compare_df_row = compare_df.iloc[0]
|
||||
# compare_df now comprises tuples with [1] having either 'self' or 'other'
|
||||
if 'other' in col_name[1]:
|
||||
if "other" in col_name[1]:
|
||||
continue
|
||||
self_value = compare_df_row.iloc[col_idx]
|
||||
other_value = compare_df_row.iloc[col_idx + 1]
|
||||
|
||||
# output differences
|
||||
if self_value != other_value:
|
||||
|
||||
if not self.current_analysis.false_indicators.__contains__(col_name[0]):
|
||||
self.current_analysis.false_indicators.append(col_name[0])
|
||||
logger.info(f"=> found look ahead bias in indicator "
|
||||
f"{col_name[0]}. "
|
||||
f"{str(self_value)} != {str(other_value)}")
|
||||
logger.info(
|
||||
f"=> found look ahead bias in indicator "
|
||||
f"{col_name[0]}. "
|
||||
f"{str(self_value)} != {str(other_value)}"
|
||||
)
|
||||
|
||||
def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]):
|
||||
|
||||
if 'freqai' in self.local_config and 'identifier' in self.local_config['freqai']:
|
||||
if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]:
|
||||
# purge previous data if the freqai model is defined
|
||||
# (to be sure nothing is carried over from older backtests)
|
||||
path_to_current_identifier = (
|
||||
Path(f"{self.local_config['user_data_dir']}/models/"
|
||||
f"{self.local_config['freqai']['identifier']}").resolve())
|
||||
path_to_current_identifier = Path(
|
||||
f"{self.local_config['user_data_dir']}/models/"
|
||||
f"{self.local_config['freqai']['identifier']}"
|
||||
).resolve()
|
||||
# remove folder and its contents
|
||||
if Path.exists(path_to_current_identifier):
|
||||
shutil.rmtree(path_to_current_identifier)
|
||||
|
||||
prepare_data_config = deepcopy(self.local_config)
|
||||
prepare_data_config['timerange'] = (str(self.dt_to_timestamp(varholder.from_dt)) + "-" +
|
||||
str(self.dt_to_timestamp(varholder.to_dt)))
|
||||
prepare_data_config['exchange']['pair_whitelist'] = pairs_to_load
|
||||
prepare_data_config["timerange"] = (
|
||||
str(self.dt_to_timestamp(varholder.from_dt))
|
||||
+ "-"
|
||||
+ str(self.dt_to_timestamp(varholder.to_dt))
|
||||
)
|
||||
prepare_data_config["exchange"]["pair_whitelist"] = pairs_to_load
|
||||
|
||||
if self._fee is not None:
|
||||
# Don't re-calculate fee per pair, as fee might differ per pair.
|
||||
prepare_data_config['fee'] = self._fee
|
||||
prepare_data_config["fee"] = self._fee
|
||||
|
||||
backtesting = Backtesting(prepare_data_config, self.exchange)
|
||||
self.exchange = backtesting.exchange
|
||||
@@ -146,23 +140,23 @@ class LookaheadAnalysis(BaseAnalysis):
|
||||
entry_varHolder = VarHolder()
|
||||
self.entry_varHolders.append(entry_varHolder)
|
||||
entry_varHolder.from_dt = self.full_varHolder.from_dt
|
||||
entry_varHolder.compared_dt = result_row['open_date']
|
||||
entry_varHolder.compared_dt = result_row["open_date"]
|
||||
# to_dt needs +1 candle since it won't buy on the last candle
|
||||
entry_varHolder.to_dt = (
|
||||
result_row['open_date'] +
|
||||
timedelta(minutes=timeframe_to_minutes(self.full_varHolder.timeframe)))
|
||||
self.prepare_data(entry_varHolder, [result_row['pair']])
|
||||
entry_varHolder.to_dt = result_row["open_date"] + timedelta(
|
||||
minutes=timeframe_to_minutes(self.full_varHolder.timeframe)
|
||||
)
|
||||
self.prepare_data(entry_varHolder, [result_row["pair"]])
|
||||
|
||||
# exit_varHolder
|
||||
exit_varHolder = VarHolder()
|
||||
self.exit_varHolders.append(exit_varHolder)
|
||||
# to_dt needs +1 candle since it will always exit/force-exit trades on the last candle
|
||||
exit_varHolder.from_dt = self.full_varHolder.from_dt
|
||||
exit_varHolder.to_dt = (
|
||||
result_row['close_date'] +
|
||||
timedelta(minutes=timeframe_to_minutes(self.full_varHolder.timeframe)))
|
||||
exit_varHolder.compared_dt = result_row['close_date']
|
||||
self.prepare_data(exit_varHolder, [result_row['pair']])
|
||||
exit_varHolder.to_dt = result_row["close_date"] + timedelta(
|
||||
minutes=timeframe_to_minutes(self.full_varHolder.timeframe)
|
||||
)
|
||||
exit_varHolder.compared_dt = result_row["close_date"]
|
||||
self.prepare_data(exit_varHolder, [result_row["pair"]])
|
||||
|
||||
# now we analyze a full trade of full_varholder and look for analyze its bias
|
||||
def analyze_row(self, idx: int, result_row):
|
||||
@@ -181,65 +175,72 @@ class LookaheadAnalysis(BaseAnalysis):
|
||||
|
||||
# register if buy signal is broken
|
||||
if not self.report_signal(
|
||||
self.entry_varHolders[idx].result,
|
||||
"open_date",
|
||||
self.entry_varHolders[idx].compared_dt):
|
||||
self.entry_varHolders[idx].result, "open_date", self.entry_varHolders[idx].compared_dt
|
||||
):
|
||||
self.current_analysis.false_entry_signals += 1
|
||||
buy_or_sell_biased = True
|
||||
|
||||
# register if buy or sell signal is broken
|
||||
if not self.report_signal(
|
||||
self.exit_varHolders[idx].result,
|
||||
"close_date",
|
||||
self.exit_varHolders[idx].compared_dt):
|
||||
self.exit_varHolders[idx].result, "close_date", self.exit_varHolders[idx].compared_dt
|
||||
):
|
||||
self.current_analysis.false_exit_signals += 1
|
||||
buy_or_sell_biased = True
|
||||
|
||||
if buy_or_sell_biased:
|
||||
logger.info(f"found lookahead-bias in trade "
|
||||
f"pair: {result_row['pair']}, "
|
||||
f"timerange:{result_row['open_date']} - {result_row['close_date']}, "
|
||||
f"idx: {idx}")
|
||||
logger.info(
|
||||
f"found lookahead-bias in trade "
|
||||
f"pair: {result_row['pair']}, "
|
||||
f"timerange:{result_row['open_date']} - {result_row['close_date']}, "
|
||||
f"idx: {idx}"
|
||||
)
|
||||
|
||||
# check if the indicators themselves contain biased data
|
||||
self.analyze_indicators(self.full_varHolder, self.entry_varHolders[idx], result_row['pair'])
|
||||
self.analyze_indicators(self.full_varHolder, self.exit_varHolders[idx], result_row['pair'])
|
||||
self.analyze_indicators(self.full_varHolder, self.entry_varHolders[idx], result_row["pair"])
|
||||
self.analyze_indicators(self.full_varHolder, self.exit_varHolders[idx], result_row["pair"])
|
||||
|
||||
def start(self) -> None:
|
||||
|
||||
super().start()
|
||||
|
||||
reduce_verbosity_for_bias_tester()
|
||||
|
||||
# check if requirements have been met of full_varholder
|
||||
found_signals: int = self.full_varHolder.result['results'].shape[0] + 1
|
||||
found_signals: int = self.full_varHolder.result["results"].shape[0] + 1
|
||||
if found_signals >= self.targeted_trade_amount:
|
||||
logger.info(f"Found {found_signals} trades, "
|
||||
f"calculating {self.targeted_trade_amount} trades.")
|
||||
logger.info(
|
||||
f"Found {found_signals} trades, "
|
||||
f"calculating {self.targeted_trade_amount} trades."
|
||||
)
|
||||
elif self.targeted_trade_amount >= found_signals >= self.minimum_trade_amount:
|
||||
logger.info(f"Only found {found_signals} trades. Calculating all available trades.")
|
||||
else:
|
||||
logger.info(f"found {found_signals} trades "
|
||||
f"which is less than minimum_trade_amount {self.minimum_trade_amount}. "
|
||||
f"Cancelling this backtest lookahead bias test.")
|
||||
logger.info(
|
||||
f"found {found_signals} trades "
|
||||
f"which is less than minimum_trade_amount {self.minimum_trade_amount}. "
|
||||
f"Cancelling this backtest lookahead bias test."
|
||||
)
|
||||
return
|
||||
|
||||
# now we loop through all signals
|
||||
# starting from the same datetime to avoid miss-reports of bias
|
||||
for idx, result_row in self.full_varHolder.result['results'].iterrows():
|
||||
for idx, result_row in self.full_varHolder.result["results"].iterrows():
|
||||
if self.current_analysis.total_signals == self.targeted_trade_amount:
|
||||
logger.info(f"Found targeted trade amount = {self.targeted_trade_amount} signals.")
|
||||
break
|
||||
if found_signals < self.minimum_trade_amount:
|
||||
logger.info(f"only found {found_signals} "
|
||||
f"which is smaller than "
|
||||
f"minimum trade amount = {self.minimum_trade_amount}. "
|
||||
f"Exiting this lookahead-analysis")
|
||||
logger.info(
|
||||
f"only found {found_signals} "
|
||||
f"which is smaller than "
|
||||
f"minimum trade amount = {self.minimum_trade_amount}. "
|
||||
f"Exiting this lookahead-analysis"
|
||||
)
|
||||
return None
|
||||
if "force_exit" in result_row['exit_reason']:
|
||||
logger.info("found force-exit in pair: {result_row['pair']}, "
|
||||
f"timerange:{result_row['open_date']}-{result_row['close_date']}, "
|
||||
f"idx: {idx}, skipping this one to avoid a false-positive.")
|
||||
if "force_exit" in result_row["exit_reason"]:
|
||||
logger.info(
|
||||
"found force-exit in pair: {result_row['pair']}, "
|
||||
f"timerange:{result_row['open_date']}-{result_row['close_date']}, "
|
||||
f"idx: {idx}, skipping this one to avoid a false-positive."
|
||||
)
|
||||
|
||||
# just to keep the IDs of both full, entry and exit varholders the same
|
||||
# to achieve a better debugging experience
|
||||
@@ -250,27 +251,33 @@ class LookaheadAnalysis(BaseAnalysis):
|
||||
self.analyze_row(idx, result_row)
|
||||
|
||||
if len(self.entry_varHolders) < self.minimum_trade_amount:
|
||||
logger.info(f"only found {found_signals} after skipping forced exits "
|
||||
f"which is smaller than "
|
||||
f"minimum trade amount = {self.minimum_trade_amount}. "
|
||||
f"Exiting this lookahead-analysis")
|
||||
logger.info(
|
||||
f"only found {found_signals} after skipping forced exits "
|
||||
f"which is smaller than "
|
||||
f"minimum trade amount = {self.minimum_trade_amount}. "
|
||||
f"Exiting this lookahead-analysis"
|
||||
)
|
||||
|
||||
# Restore verbosity, so it's not too quiet for the next strategy
|
||||
restore_verbosity_for_bias_tester()
|
||||
# check and report signals
|
||||
if self.current_analysis.total_signals < self.local_config['minimum_trade_amount']:
|
||||
logger.info(f" -> {self.local_config['strategy']} : too few trades. "
|
||||
f"We only found {self.current_analysis.total_signals} trades. "
|
||||
f"Hint: Extend the timerange "
|
||||
f"to get at least {self.local_config['minimum_trade_amount']} "
|
||||
f"or lower the value of minimum_trade_amount.")
|
||||
if self.current_analysis.total_signals < self.local_config["minimum_trade_amount"]:
|
||||
logger.info(
|
||||
f" -> {self.local_config['strategy']} : too few trades. "
|
||||
f"We only found {self.current_analysis.total_signals} trades. "
|
||||
f"Hint: Extend the timerange "
|
||||
f"to get at least {self.local_config['minimum_trade_amount']} "
|
||||
f"or lower the value of minimum_trade_amount."
|
||||
)
|
||||
self.failed_bias_check = True
|
||||
elif (self.current_analysis.false_entry_signals > 0 or
|
||||
self.current_analysis.false_exit_signals > 0 or
|
||||
len(self.current_analysis.false_indicators) > 0):
|
||||
elif (
|
||||
self.current_analysis.false_entry_signals > 0
|
||||
or self.current_analysis.false_exit_signals > 0
|
||||
or len(self.current_analysis.false_indicators) > 0
|
||||
):
|
||||
logger.info(f" => {self.local_config['strategy']} : bias detected!")
|
||||
self.current_analysis.has_bias = True
|
||||
self.failed_bias_check = False
|
||||
else:
|
||||
logger.info(self.local_config['strategy'] + ": no bias detected")
|
||||
logger.info(self.local_config["strategy"] + ": no bias detected")
|
||||
self.failed_bias_check = False
|
||||
|
||||
@@ -15,46 +15,53 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LookaheadAnalysisSubFunctions:
|
||||
|
||||
@staticmethod
|
||||
def text_table_lookahead_analysis_instances(
|
||||
config: Dict[str, Any],
|
||||
lookahead_instances: List[LookaheadAnalysis]):
|
||||
headers = ['filename', 'strategy', 'has_bias', 'total_signals',
|
||||
'biased_entry_signals', 'biased_exit_signals', 'biased_indicators']
|
||||
config: Dict[str, Any], lookahead_instances: List[LookaheadAnalysis]
|
||||
):
|
||||
headers = [
|
||||
"filename",
|
||||
"strategy",
|
||||
"has_bias",
|
||||
"total_signals",
|
||||
"biased_entry_signals",
|
||||
"biased_exit_signals",
|
||||
"biased_indicators",
|
||||
]
|
||||
data = []
|
||||
for inst in lookahead_instances:
|
||||
if config['minimum_trade_amount'] > inst.current_analysis.total_signals:
|
||||
if config["minimum_trade_amount"] > inst.current_analysis.total_signals:
|
||||
data.append(
|
||||
[
|
||||
inst.strategy_obj['location'].parts[-1],
|
||||
inst.strategy_obj['name'],
|
||||
inst.strategy_obj["location"].parts[-1],
|
||||
inst.strategy_obj["name"],
|
||||
"too few trades caught "
|
||||
f"({inst.current_analysis.total_signals}/{config['minimum_trade_amount']})."
|
||||
f"Test failed."
|
||||
f"Test failed.",
|
||||
]
|
||||
)
|
||||
elif inst.failed_bias_check:
|
||||
data.append(
|
||||
[
|
||||
inst.strategy_obj['location'].parts[-1],
|
||||
inst.strategy_obj['name'],
|
||||
'error while checking'
|
||||
inst.strategy_obj["location"].parts[-1],
|
||||
inst.strategy_obj["name"],
|
||||
"error while checking",
|
||||
]
|
||||
)
|
||||
else:
|
||||
data.append(
|
||||
[
|
||||
inst.strategy_obj['location'].parts[-1],
|
||||
inst.strategy_obj['name'],
|
||||
inst.strategy_obj["location"].parts[-1],
|
||||
inst.strategy_obj["name"],
|
||||
inst.current_analysis.has_bias,
|
||||
inst.current_analysis.total_signals,
|
||||
inst.current_analysis.false_entry_signals,
|
||||
inst.current_analysis.false_exit_signals,
|
||||
", ".join(inst.current_analysis.false_indicators)
|
||||
", ".join(inst.current_analysis.false_indicators),
|
||||
]
|
||||
)
|
||||
from tabulate import tabulate
|
||||
|
||||
table = tabulate(data, headers=headers, tablefmt="orgtbl")
|
||||
print(table)
|
||||
return table, headers, data
|
||||
@@ -63,89 +70,101 @@ class LookaheadAnalysisSubFunctions:
|
||||
def export_to_csv(config: Dict[str, Any], lookahead_analysis: List[LookaheadAnalysis]):
|
||||
def add_or_update_row(df, row_data):
|
||||
if (
|
||||
(df['filename'] == row_data['filename']) &
|
||||
(df['strategy'] == row_data['strategy'])
|
||||
(df["filename"] == row_data["filename"]) & (df["strategy"] == row_data["strategy"])
|
||||
).any():
|
||||
# Update existing row
|
||||
pd_series = pd.DataFrame([row_data])
|
||||
df.loc[
|
||||
(df['filename'] == row_data['filename']) &
|
||||
(df['strategy'] == row_data['strategy'])
|
||||
] = pd_series
|
||||
(df["filename"] == row_data["filename"])
|
||||
& (df["strategy"] == row_data["strategy"])
|
||||
] = pd_series
|
||||
else:
|
||||
# Add new row
|
||||
df = pd.concat([df, pd.DataFrame([row_data], columns=df.columns)])
|
||||
|
||||
return df
|
||||
|
||||
if Path(config['lookahead_analysis_exportfilename']).exists():
|
||||
if Path(config["lookahead_analysis_exportfilename"]).exists():
|
||||
# Read CSV file into a pandas dataframe
|
||||
csv_df = pd.read_csv(config['lookahead_analysis_exportfilename'])
|
||||
csv_df = pd.read_csv(config["lookahead_analysis_exportfilename"])
|
||||
else:
|
||||
# Create a new empty DataFrame with the desired column names and set the index
|
||||
csv_df = pd.DataFrame(columns=[
|
||||
'filename', 'strategy', 'has_bias', 'total_signals',
|
||||
'biased_entry_signals', 'biased_exit_signals', 'biased_indicators'
|
||||
],
|
||||
index=None)
|
||||
csv_df = pd.DataFrame(
|
||||
columns=[
|
||||
"filename",
|
||||
"strategy",
|
||||
"has_bias",
|
||||
"total_signals",
|
||||
"biased_entry_signals",
|
||||
"biased_exit_signals",
|
||||
"biased_indicators",
|
||||
],
|
||||
index=None,
|
||||
)
|
||||
|
||||
for inst in lookahead_analysis:
|
||||
# only update if
|
||||
if (inst.current_analysis.total_signals > config['minimum_trade_amount']
|
||||
and inst.failed_bias_check is not True):
|
||||
new_row_data = {'filename': inst.strategy_obj['location'].parts[-1],
|
||||
'strategy': inst.strategy_obj['name'],
|
||||
'has_bias': inst.current_analysis.has_bias,
|
||||
'total_signals':
|
||||
int(inst.current_analysis.total_signals),
|
||||
'biased_entry_signals':
|
||||
int(inst.current_analysis.false_entry_signals),
|
||||
'biased_exit_signals':
|
||||
int(inst.current_analysis.false_exit_signals),
|
||||
'biased_indicators':
|
||||
",".join(inst.current_analysis.false_indicators)}
|
||||
if (
|
||||
inst.current_analysis.total_signals > config["minimum_trade_amount"]
|
||||
and inst.failed_bias_check is not True
|
||||
):
|
||||
new_row_data = {
|
||||
"filename": inst.strategy_obj["location"].parts[-1],
|
||||
"strategy": inst.strategy_obj["name"],
|
||||
"has_bias": inst.current_analysis.has_bias,
|
||||
"total_signals": int(inst.current_analysis.total_signals),
|
||||
"biased_entry_signals": int(inst.current_analysis.false_entry_signals),
|
||||
"biased_exit_signals": int(inst.current_analysis.false_exit_signals),
|
||||
"biased_indicators": ",".join(inst.current_analysis.false_indicators),
|
||||
}
|
||||
csv_df = add_or_update_row(csv_df, new_row_data)
|
||||
|
||||
# Fill NaN values with a default value (e.g., 0)
|
||||
csv_df['total_signals'] = csv_df['total_signals'].astype(int).fillna(0)
|
||||
csv_df['biased_entry_signals'] = csv_df['biased_entry_signals'].astype(int).fillna(0)
|
||||
csv_df['biased_exit_signals'] = csv_df['biased_exit_signals'].astype(int).fillna(0)
|
||||
csv_df["total_signals"] = csv_df["total_signals"].astype(int).fillna(0)
|
||||
csv_df["biased_entry_signals"] = csv_df["biased_entry_signals"].astype(int).fillna(0)
|
||||
csv_df["biased_exit_signals"] = csv_df["biased_exit_signals"].astype(int).fillna(0)
|
||||
|
||||
# Convert columns to integers
|
||||
csv_df['total_signals'] = csv_df['total_signals'].astype(int)
|
||||
csv_df['biased_entry_signals'] = csv_df['biased_entry_signals'].astype(int)
|
||||
csv_df['biased_exit_signals'] = csv_df['biased_exit_signals'].astype(int)
|
||||
csv_df["total_signals"] = csv_df["total_signals"].astype(int)
|
||||
csv_df["biased_entry_signals"] = csv_df["biased_entry_signals"].astype(int)
|
||||
csv_df["biased_exit_signals"] = csv_df["biased_exit_signals"].astype(int)
|
||||
|
||||
logger.info(f"saving {config['lookahead_analysis_exportfilename']}")
|
||||
csv_df.to_csv(config['lookahead_analysis_exportfilename'], index=False)
|
||||
csv_df.to_csv(config["lookahead_analysis_exportfilename"], index=False)
|
||||
|
||||
@staticmethod
|
||||
def calculate_config_overrides(config: Config):
|
||||
if config.get('enable_protections', False):
|
||||
if config.get("enable_protections", False):
|
||||
# if protections are used globally, they can produce false positives.
|
||||
config['enable_protections'] = False
|
||||
logger.info('Protections were enabled. '
|
||||
'Disabling protections now '
|
||||
'since they could otherwise produce false positives.')
|
||||
if config['targeted_trade_amount'] < config['minimum_trade_amount']:
|
||||
config["enable_protections"] = False
|
||||
logger.info(
|
||||
"Protections were enabled. "
|
||||
"Disabling protections now "
|
||||
"since they could otherwise produce false positives."
|
||||
)
|
||||
if config["targeted_trade_amount"] < config["minimum_trade_amount"]:
|
||||
# this combo doesn't make any sense.
|
||||
raise OperationalException(
|
||||
"Targeted trade amount can't be smaller than minimum trade amount."
|
||||
)
|
||||
if len(config['pairs']) > config.get('max_open_trades', 0):
|
||||
logger.info('Max_open_trades were less than amount of pairs '
|
||||
'or defined in the strategy. '
|
||||
'Set max_open_trades to amount of pairs '
|
||||
'just to avoid false positives.')
|
||||
config['max_open_trades'] = len(config['pairs'])
|
||||
if len(config["pairs"]) > config.get("max_open_trades", 0):
|
||||
logger.info(
|
||||
"Max_open_trades were less than amount of pairs "
|
||||
"or defined in the strategy. "
|
||||
"Set max_open_trades to amount of pairs "
|
||||
"just to avoid false positives."
|
||||
)
|
||||
config["max_open_trades"] = len(config["pairs"])
|
||||
|
||||
min_dry_run_wallet = 1000000000
|
||||
if config['dry_run_wallet'] < min_dry_run_wallet:
|
||||
logger.info('Dry run wallet was not set to 1 billion, pushing it up there '
|
||||
'just to avoid false positives')
|
||||
config['dry_run_wallet'] = min_dry_run_wallet
|
||||
if config["dry_run_wallet"] < min_dry_run_wallet:
|
||||
logger.info(
|
||||
"Dry run wallet was not set to 1 billion, pushing it up there "
|
||||
"just to avoid false positives"
|
||||
)
|
||||
config["dry_run_wallet"] = min_dry_run_wallet
|
||||
|
||||
if 'timerange' not in config:
|
||||
if "timerange" not in config:
|
||||
# setting a timerange is enforced here
|
||||
raise OperationalException(
|
||||
"Please set a timerange. "
|
||||
@@ -155,32 +174,35 @@ class LookaheadAnalysisSubFunctions:
|
||||
# in a combination with a wallet size of 1 billion it should always be able to trade
|
||||
# no matter if they use custom_stake_amount as a small percentage of wallet size
|
||||
# or fixate custom_stake_amount to a certain value.
|
||||
logger.info('fixing stake_amount to 10k')
|
||||
config['stake_amount'] = 10000
|
||||
logger.info("fixing stake_amount to 10k")
|
||||
config["stake_amount"] = 10000
|
||||
|
||||
# enforce cache to be 'none', shift it to 'none' if not already
|
||||
# (since the default value is 'day')
|
||||
if config.get('backtest_cache') is None:
|
||||
config['backtest_cache'] = 'none'
|
||||
elif config['backtest_cache'] != 'none':
|
||||
logger.info(f"backtest_cache = "
|
||||
f"{config['backtest_cache']} detected. "
|
||||
f"Inside lookahead-analysis it is enforced to be 'none'. "
|
||||
f"Changed it to 'none'")
|
||||
config['backtest_cache'] = 'none'
|
||||
if config.get("backtest_cache") is None:
|
||||
config["backtest_cache"] = "none"
|
||||
elif config["backtest_cache"] != "none":
|
||||
logger.info(
|
||||
f"backtest_cache = "
|
||||
f"{config['backtest_cache']} detected. "
|
||||
f"Inside lookahead-analysis it is enforced to be 'none'. "
|
||||
f"Changed it to 'none'"
|
||||
)
|
||||
config["backtest_cache"] = "none"
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def initialize_single_lookahead_analysis(config: Config, strategy_obj: Dict[str, Any]):
|
||||
|
||||
logger.info(f"Bias test of {Path(strategy_obj['location']).name} started.")
|
||||
start = time.perf_counter()
|
||||
current_instance = LookaheadAnalysis(config, strategy_obj)
|
||||
current_instance.start()
|
||||
elapsed = time.perf_counter() - start
|
||||
logger.info(f"Checking look ahead bias via backtests "
|
||||
f"of {Path(strategy_obj['location']).name} "
|
||||
f"took {elapsed:.0f} seconds.")
|
||||
logger.info(
|
||||
f"Checking look ahead bias via backtests "
|
||||
f"of {Path(strategy_obj['location']).name} "
|
||||
f"took {elapsed:.0f} seconds."
|
||||
)
|
||||
return current_instance
|
||||
|
||||
@staticmethod
|
||||
@@ -188,36 +210,42 @@ class LookaheadAnalysisSubFunctions:
|
||||
config = LookaheadAnalysisSubFunctions.calculate_config_overrides(config)
|
||||
|
||||
strategy_objs = StrategyResolver.search_all_objects(
|
||||
config, enum_failed=False, recursive=config.get('recursive_strategy_search', False))
|
||||
config, enum_failed=False, recursive=config.get("recursive_strategy_search", False)
|
||||
)
|
||||
|
||||
lookaheadAnalysis_instances = []
|
||||
|
||||
# unify --strategy and --strategy-list to one list
|
||||
if not (strategy_list := config.get('strategy_list', [])):
|
||||
if config.get('strategy') is None:
|
||||
if not (strategy_list := config.get("strategy_list", [])):
|
||||
if config.get("strategy") is None:
|
||||
raise OperationalException(
|
||||
"No Strategy specified. Please specify a strategy via --strategy or "
|
||||
"--strategy-list"
|
||||
)
|
||||
strategy_list = [config['strategy']]
|
||||
strategy_list = [config["strategy"]]
|
||||
|
||||
# check if strategies can be properly loaded, only check them if they can be.
|
||||
for strat in strategy_list:
|
||||
for strategy_obj in strategy_objs:
|
||||
if strategy_obj['name'] == strat and strategy_obj not in strategy_list:
|
||||
if strategy_obj["name"] == strat and strategy_obj not in strategy_list:
|
||||
lookaheadAnalysis_instances.append(
|
||||
LookaheadAnalysisSubFunctions.initialize_single_lookahead_analysis(
|
||||
config, strategy_obj))
|
||||
config, strategy_obj
|
||||
)
|
||||
)
|
||||
break
|
||||
|
||||
# report the results
|
||||
if lookaheadAnalysis_instances:
|
||||
LookaheadAnalysisSubFunctions.text_table_lookahead_analysis_instances(
|
||||
config, lookaheadAnalysis_instances)
|
||||
if config.get('lookahead_analysis_exportfilename') is not None:
|
||||
config, lookaheadAnalysis_instances
|
||||
)
|
||||
if config.get("lookahead_analysis_exportfilename") is not None:
|
||||
LookaheadAnalysisSubFunctions.export_to_csv(config, lookaheadAnalysis_instances)
|
||||
else:
|
||||
logger.error("There were no strategies specified neither through "
|
||||
"--strategy nor through "
|
||||
"--strategy-list "
|
||||
"or timeframe was not specified.")
|
||||
logger.error(
|
||||
"There were no strategies specified neither through "
|
||||
"--strategy nor through "
|
||||
"--strategy-list "
|
||||
"or timeframe was not specified."
|
||||
)
|
||||
|
||||
@@ -20,10 +20,8 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RecursiveAnalysis(BaseAnalysis):
|
||||
|
||||
def __init__(self, config: Dict[str, Any], strategy_obj: Dict):
|
||||
|
||||
self._startup_candle = config.get('startup_candle', [199, 399, 499, 999, 1999])
|
||||
self._startup_candle = config.get("startup_candle", [199, 399, 499, 999, 1999])
|
||||
|
||||
super().__init__(config, strategy_obj)
|
||||
|
||||
@@ -35,8 +33,7 @@ class RecursiveAnalysis(BaseAnalysis):
|
||||
# For recursive bias check
|
||||
# analyzes two data frames with processed indicators and shows differences between them.
|
||||
def analyze_indicators(self):
|
||||
|
||||
pair_to_check = self.local_config['pairs'][0]
|
||||
pair_to_check = self.local_config["pairs"][0]
|
||||
logger.info("Start checking for recursive bias")
|
||||
|
||||
# check and report signals
|
||||
@@ -50,17 +47,17 @@ class RecursiveAnalysis(BaseAnalysis):
|
||||
# print(compare_df)
|
||||
for col_name, values in compare_df.items():
|
||||
# print(col_name)
|
||||
if 'other' == col_name:
|
||||
if "other" == col_name:
|
||||
continue
|
||||
indicators = values.index
|
||||
|
||||
for indicator in indicators:
|
||||
if (indicator not in self.dict_recursive):
|
||||
if indicator not in self.dict_recursive:
|
||||
self.dict_recursive[indicator] = {}
|
||||
|
||||
values_diff = compare_df.loc[indicator]
|
||||
values_diff_self = values_diff.loc['self']
|
||||
values_diff_other = values_diff.loc['other']
|
||||
values_diff_self = values_diff.loc["self"]
|
||||
values_diff_other = values_diff.loc["other"]
|
||||
diff = (values_diff_other - values_diff_self) / values_diff_self * 100
|
||||
|
||||
self.dict_recursive[indicator][part.startup_candle] = f"{diff:.3f}%"
|
||||
@@ -72,17 +69,16 @@ class RecursiveAnalysis(BaseAnalysis):
|
||||
# For lookahead bias check
|
||||
# analyzes two data frames with processed indicators and shows differences between them.
|
||||
def analyze_indicators_lookahead(self):
|
||||
|
||||
pair_to_check = self.local_config['pairs'][0]
|
||||
pair_to_check = self.local_config["pairs"][0]
|
||||
logger.info("Start checking for lookahead bias on indicators only")
|
||||
|
||||
part = self.partial_varHolder_lookahead_array[0]
|
||||
part_last_row = part.indicators[pair_to_check].iloc[-1]
|
||||
date_to_check = part_last_row['date']
|
||||
index_to_get = (self.full_varHolder.indicators[pair_to_check]['date'] == date_to_check)
|
||||
date_to_check = part_last_row["date"]
|
||||
index_to_get = self.full_varHolder.indicators[pair_to_check]["date"] == date_to_check
|
||||
base_row_check = self.full_varHolder.indicators[pair_to_check].loc[index_to_get].iloc[-1]
|
||||
|
||||
check_time = part.to_dt.strftime('%Y-%m-%dT%H:%M:%S')
|
||||
check_time = part.to_dt.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
logger.info(f"Check indicators at {check_time}")
|
||||
# logger.info(f"vs {part_timerange} with {part.startup_candle} startup candle")
|
||||
@@ -92,7 +88,7 @@ class RecursiveAnalysis(BaseAnalysis):
|
||||
# print(compare_df)
|
||||
for col_name, values in compare_df.items():
|
||||
# print(col_name)
|
||||
if 'other' == col_name:
|
||||
if "other" == col_name:
|
||||
continue
|
||||
indicators = values.index
|
||||
|
||||
@@ -105,21 +101,24 @@ class RecursiveAnalysis(BaseAnalysis):
|
||||
logger.info("No lookahead bias on indicators found.")
|
||||
|
||||
def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]):
|
||||
|
||||
if 'freqai' in self.local_config and 'identifier' in self.local_config['freqai']:
|
||||
if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]:
|
||||
# purge previous data if the freqai model is defined
|
||||
# (to be sure nothing is carried over from older backtests)
|
||||
path_to_current_identifier = (
|
||||
Path(f"{self.local_config['user_data_dir']}/models/"
|
||||
f"{self.local_config['freqai']['identifier']}").resolve())
|
||||
path_to_current_identifier = Path(
|
||||
f"{self.local_config['user_data_dir']}/models/"
|
||||
f"{self.local_config['freqai']['identifier']}"
|
||||
).resolve()
|
||||
# remove folder and its contents
|
||||
if Path.exists(path_to_current_identifier):
|
||||
shutil.rmtree(path_to_current_identifier)
|
||||
|
||||
prepare_data_config = deepcopy(self.local_config)
|
||||
prepare_data_config['timerange'] = (str(self.dt_to_timestamp(varholder.from_dt)) + "-" +
|
||||
str(self.dt_to_timestamp(varholder.to_dt)))
|
||||
prepare_data_config['exchange']['pair_whitelist'] = pairs_to_load
|
||||
prepare_data_config["timerange"] = (
|
||||
str(self.dt_to_timestamp(varholder.from_dt))
|
||||
+ "-"
|
||||
+ str(self.dt_to_timestamp(varholder.to_dt))
|
||||
)
|
||||
prepare_data_config["exchange"]["pair_whitelist"] = pairs_to_load
|
||||
|
||||
backtesting = Backtesting(prepare_data_config, self.exchange)
|
||||
self.exchange = backtesting.exchange
|
||||
@@ -139,9 +138,9 @@ class RecursiveAnalysis(BaseAnalysis):
|
||||
partial_varHolder.to_dt = self.full_varHolder.to_dt
|
||||
partial_varHolder.startup_candle = startup_candle
|
||||
|
||||
self.local_config['startup_candle_count'] = startup_candle
|
||||
self.local_config["startup_candle_count"] = startup_candle
|
||||
|
||||
self.prepare_data(partial_varHolder, self.local_config['pairs'])
|
||||
self.prepare_data(partial_varHolder, self.local_config["pairs"])
|
||||
|
||||
self.partial_varHolder_array.append(partial_varHolder)
|
||||
|
||||
@@ -153,12 +152,11 @@ class RecursiveAnalysis(BaseAnalysis):
|
||||
partial_varHolder.from_dt = self.full_varHolder.from_dt
|
||||
partial_varHolder.to_dt = end_date
|
||||
|
||||
self.prepare_data(partial_varHolder, self.local_config['pairs'])
|
||||
self.prepare_data(partial_varHolder, self.local_config["pairs"])
|
||||
|
||||
self.partial_varHolder_lookahead_array.append(partial_varHolder)
|
||||
|
||||
def start(self) -> None:
|
||||
|
||||
super().start()
|
||||
|
||||
reduce_verbosity_for_bias_tester()
|
||||
|
||||
@@ -13,12 +13,10 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RecursiveAnalysisSubFunctions:
|
||||
|
||||
@staticmethod
|
||||
def text_table_recursive_analysis_instances(
|
||||
recursive_instances: List[RecursiveAnalysis]):
|
||||
def text_table_recursive_analysis_instances(recursive_instances: List[RecursiveAnalysis]):
|
||||
startups = recursive_instances[0]._startup_candle
|
||||
headers = ['indicators']
|
||||
headers = ["indicators"]
|
||||
for candle in startups:
|
||||
headers.append(candle)
|
||||
|
||||
@@ -28,11 +26,12 @@ class RecursiveAnalysisSubFunctions:
|
||||
for indicator, values in inst.dict_recursive.items():
|
||||
temp_data = [indicator]
|
||||
for candle in startups:
|
||||
temp_data.append(values.get(int(candle), '-'))
|
||||
temp_data.append(values.get(int(candle), "-"))
|
||||
data.append(temp_data)
|
||||
|
||||
if len(data) > 0:
|
||||
from tabulate import tabulate
|
||||
|
||||
table = tabulate(data, headers=headers, tablefmt="orgtbl")
|
||||
print(table)
|
||||
return table, headers, data
|
||||
@@ -41,34 +40,37 @@ class RecursiveAnalysisSubFunctions:
|
||||
|
||||
@staticmethod
|
||||
def calculate_config_overrides(config: Config):
|
||||
if 'timerange' not in config:
|
||||
if "timerange" not in config:
|
||||
# setting a timerange is enforced here
|
||||
raise OperationalException(
|
||||
"Please set a timerange. "
|
||||
"A timerange of 5000 candles are enough for recursive analysis."
|
||||
)
|
||||
|
||||
if config.get('backtest_cache') is None:
|
||||
config['backtest_cache'] = 'none'
|
||||
elif config['backtest_cache'] != 'none':
|
||||
logger.info(f"backtest_cache = "
|
||||
f"{config['backtest_cache']} detected. "
|
||||
f"Inside recursive-analysis it is enforced to be 'none'. "
|
||||
f"Changed it to 'none'")
|
||||
config['backtest_cache'] = 'none'
|
||||
if config.get("backtest_cache") is None:
|
||||
config["backtest_cache"] = "none"
|
||||
elif config["backtest_cache"] != "none":
|
||||
logger.info(
|
||||
f"backtest_cache = "
|
||||
f"{config['backtest_cache']} detected. "
|
||||
f"Inside recursive-analysis it is enforced to be 'none'. "
|
||||
f"Changed it to 'none'"
|
||||
)
|
||||
config["backtest_cache"] = "none"
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def initialize_single_recursive_analysis(config: Config, strategy_obj: Dict[str, Any]):
|
||||
|
||||
logger.info(f"Recursive test of {Path(strategy_obj['location']).name} started.")
|
||||
start = time.perf_counter()
|
||||
current_instance = RecursiveAnalysis(config, strategy_obj)
|
||||
current_instance.start()
|
||||
elapsed = time.perf_counter() - start
|
||||
logger.info(f"Checking recursive and indicator-only lookahead bias of indicators "
|
||||
f"of {Path(strategy_obj['location']).name} "
|
||||
f"took {elapsed:.0f} seconds.")
|
||||
logger.info(
|
||||
f"Checking recursive and indicator-only lookahead bias of indicators "
|
||||
f"of {Path(strategy_obj['location']).name} "
|
||||
f"took {elapsed:.0f} seconds."
|
||||
)
|
||||
return current_instance
|
||||
|
||||
@staticmethod
|
||||
@@ -76,31 +78,37 @@ class RecursiveAnalysisSubFunctions:
|
||||
config = RecursiveAnalysisSubFunctions.calculate_config_overrides(config)
|
||||
|
||||
strategy_objs = StrategyResolver.search_all_objects(
|
||||
config, enum_failed=False, recursive=config.get('recursive_strategy_search', False))
|
||||
config, enum_failed=False, recursive=config.get("recursive_strategy_search", False)
|
||||
)
|
||||
|
||||
RecursiveAnalysis_instances = []
|
||||
|
||||
# unify --strategy and --strategy-list to one list
|
||||
if not (strategy_list := config.get('strategy_list', [])):
|
||||
if config.get('strategy') is None:
|
||||
if not (strategy_list := config.get("strategy_list", [])):
|
||||
if config.get("strategy") is None:
|
||||
raise OperationalException(
|
||||
"No Strategy specified. Please specify a strategy via --strategy"
|
||||
)
|
||||
strategy_list = [config['strategy']]
|
||||
strategy_list = [config["strategy"]]
|
||||
|
||||
# check if strategies can be properly loaded, only check them if they can be.
|
||||
for strat in strategy_list:
|
||||
for strategy_obj in strategy_objs:
|
||||
if strategy_obj['name'] == strat and strategy_obj not in strategy_list:
|
||||
if strategy_obj["name"] == strat and strategy_obj not in strategy_list:
|
||||
RecursiveAnalysis_instances.append(
|
||||
RecursiveAnalysisSubFunctions.initialize_single_recursive_analysis(
|
||||
config, strategy_obj))
|
||||
config, strategy_obj
|
||||
)
|
||||
)
|
||||
break
|
||||
|
||||
# report the results
|
||||
if RecursiveAnalysis_instances:
|
||||
RecursiveAnalysisSubFunctions.text_table_recursive_analysis_instances(
|
||||
RecursiveAnalysis_instances)
|
||||
RecursiveAnalysis_instances
|
||||
)
|
||||
else:
|
||||
logger.error("There was no strategy specified through --strategy "
|
||||
"or timeframe was not specified.")
|
||||
logger.error(
|
||||
"There was no strategy specified through --strategy "
|
||||
"or timeframe was not specified."
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user