mirror of
https://github.com/freqtrade/freqtrade.git
synced 2025-11-29 00:23:07 +00:00
Merge pull request #8369 from hippocritical/develop
backtest - lookahead_analysis
This commit is contained in:
100
docs/lookahead-analysis.md
Normal file
100
docs/lookahead-analysis.md
Normal file
@@ -0,0 +1,100 @@
|
||||
# Lookahead analysis
|
||||
|
||||
This page explains how to validate your strategy in terms of look ahead bias.
|
||||
|
||||
Checking look ahead bias is the bane of any strategy since it is sometimes very easy to introduce backtest bias -
|
||||
but very hard to detect.
|
||||
|
||||
Backtesting initializes all timestamps at once and calculates all indicators in the beginning.
|
||||
This means that if your indicators or entry/exit signals could look into future candles and falsify your backtest.
|
||||
|
||||
Lookahead-analysis requires historic data to be available.
|
||||
To learn how to get data for the pairs and exchange you're interested in,
|
||||
head over to the [Data Downloading](data-download.md) section of the documentation.
|
||||
|
||||
This command is built upon backtesting since it internally chains backtests and pokes at the strategy to provoke it to show look ahead bias.
|
||||
This is done by not looking at the strategy itself - but at the results it returned.
|
||||
The results are things like changed indicator-values and moved entries/exits compared to the full backtest.
|
||||
|
||||
You can use commands of [Backtesting](backtesting.md).
|
||||
It also supports the lookahead-analysis of freqai strategies.
|
||||
|
||||
- `--cache` is forced to "none".
|
||||
- `--max-open-trades` is forced to be at least equal to the number of pairs.
|
||||
- `--dry-run-wallet` is forced to be basically infinite.
|
||||
|
||||
## Lookahead-analysis command reference
|
||||
|
||||
```
|
||||
usage: freqtrade lookahead-analysis [-h] [-v] [--logfile FILE] [-V] [-c PATH]
|
||||
[-d PATH] [--userdir PATH] [-s NAME]
|
||||
[--strategy-path PATH]
|
||||
[--recursive-strategy-search]
|
||||
[--freqaimodel NAME]
|
||||
[--freqaimodel-path PATH] [-i TIMEFRAME]
|
||||
[--timerange TIMERANGE]
|
||||
[--data-format-ohlcv {json,jsongz,hdf5,feather,parquet}]
|
||||
[--max-open-trades INT]
|
||||
[--stake-amount STAKE_AMOUNT]
|
||||
[--fee FLOAT] [-p PAIRS [PAIRS ...]]
|
||||
[--enable-protections]
|
||||
[--dry-run-wallet DRY_RUN_WALLET]
|
||||
[--timeframe-detail TIMEFRAME_DETAIL]
|
||||
[--strategy-list STRATEGY_LIST [STRATEGY_LIST ...]]
|
||||
[--export {none,trades,signals}]
|
||||
[--export-filename PATH]
|
||||
[--breakdown {day,week,month} [{day,week,month} ...]]
|
||||
[--cache {none,day,week,month}]
|
||||
[--freqai-backtest-live-models]
|
||||
[--minimum-trade-amount INT]
|
||||
[--targeted-trade-amount INT]
|
||||
[--lookahead-analysis-exportfilename LOOKAHEAD_ANALYSIS_EXPORTFILENAME]
|
||||
|
||||
options:
|
||||
--minimum-trade-amount INT
|
||||
Minimum trade amount for lookahead-analysis
|
||||
--targeted-trade-amount INT
|
||||
Targeted trade amount for lookahead analysis
|
||||
--lookahead-analysis-exportfilename LOOKAHEAD_ANALYSIS_EXPORTFILENAME
|
||||
Use this csv-filename to store lookahead-analysis-
|
||||
results
|
||||
```
|
||||
|
||||
!!! Note ""
|
||||
The above Output was reduced to options `lookahead-analysis` adds on top of regular backtesting commands.
|
||||
|
||||
### Summary
|
||||
|
||||
Checks a given strategy for look ahead bias via lookahead-analysis
|
||||
Look ahead bias means that the backtest uses data from future candles thereby not making it viable beyond backtesting
|
||||
and producing false hopes for the one backtesting.
|
||||
|
||||
### Introduction
|
||||
|
||||
Many strategies - without the programmer knowing - have fallen prey to look ahead bias.
|
||||
|
||||
Any backtest will populate the full dataframe including all time stamps at the beginning.
|
||||
If the programmer is not careful or oblivious how things work internally
|
||||
(which sometimes can be really hard to find out) then it will just look into the future making the strategy amazing
|
||||
but not realistic.
|
||||
|
||||
This command is made to try to verify the validity in the form of the aforementioned look ahead bias.
|
||||
|
||||
### How does the command work?
|
||||
|
||||
It will start with a backtest of all pairs to generate a baseline for indicators and entries/exits.
|
||||
After the backtest ran, it will look if the `minimum-trade-amount` is met
|
||||
and if not cancel the lookahead-analysis for this strategy.
|
||||
|
||||
After setting the baseline it will then do additional runs for every entry and exit separately.
|
||||
When a verification-backtest is done, it will compare the indicators as the signal (either entry or exit) and report the bias.
|
||||
After all signals have been verified or falsified a result-table will be generated for the user to see.
|
||||
|
||||
### Caveats
|
||||
|
||||
- `lookahead-analysis` can only verify / falsify the trades it calculated and verified.
|
||||
If the strategy has many different signals / signal types, it's up to you to select appropriate parameters to ensure that all signals have triggered at least once. Not triggered signals will not have been verified.
|
||||
This could lead to a false-negative (the strategy will then be reported as non-biased).
|
||||
- `lookahead-analysis` has access to everything that backtesting has too.
|
||||
Please don't provoke any configs like enabling position stacking.
|
||||
If you decide to do so, then make doubly sure that you won't ever run out of `max_open_trades` amount and neither leftover money in your wallet.
|
||||
@@ -19,7 +19,8 @@ from freqtrade.commands.list_commands import (start_list_exchanges, start_list_f
|
||||
start_list_markets, start_list_strategies,
|
||||
start_list_timeframes, start_show_trades)
|
||||
from freqtrade.commands.optimize_commands import (start_backtesting, start_backtesting_show,
|
||||
start_edge, start_hyperopt)
|
||||
start_edge, start_hyperopt,
|
||||
start_lookahead_analysis)
|
||||
from freqtrade.commands.pairlist_commands import start_test_pairlist
|
||||
from freqtrade.commands.plot_commands import start_plot_dataframe, start_plot_profit
|
||||
from freqtrade.commands.strategy_utils_commands import start_strategy_update
|
||||
|
||||
24
freqtrade/commands/arguments.py
Normal file → Executable file
24
freqtrade/commands/arguments.py
Normal file → Executable file
@@ -117,7 +117,11 @@ NO_CONF_REQURIED = ["convert-data", "convert-trade-data", "download-data", "list
|
||||
|
||||
NO_CONF_ALLOWED = ["create-userdir", "list-exchanges", "new-strategy"]
|
||||
|
||||
ARGS_STRATEGY_UTILS = ["strategy_list", "strategy_path", "recursive_strategy_search"]
|
||||
ARGS_STRATEGY_UPDATER = ["strategy_list", "strategy_path", "recursive_strategy_search"]
|
||||
|
||||
ARGS_LOOKAHEAD_ANALYSIS = [
|
||||
a for a in ARGS_BACKTEST if a not in ("position_stacking", "use_max_market_positions", 'cache')
|
||||
] + ["minimum_trade_amount", "targeted_trade_amount", "lookahead_analysis_exportfilename"]
|
||||
|
||||
|
||||
class Arguments:
|
||||
@@ -201,8 +205,9 @@ class Arguments:
|
||||
start_install_ui, start_list_data, start_list_exchanges,
|
||||
start_list_freqAI_models, start_list_markets,
|
||||
start_list_strategies, start_list_timeframes,
|
||||
start_new_config, start_new_strategy, start_plot_dataframe,
|
||||
start_plot_profit, start_show_trades, start_strategy_update,
|
||||
start_lookahead_analysis, start_new_config,
|
||||
start_new_strategy, start_plot_dataframe, start_plot_profit,
|
||||
start_show_trades, start_strategy_update,
|
||||
start_test_pairlist, start_trading, start_webserver)
|
||||
|
||||
subparsers = self.parser.add_subparsers(dest='command',
|
||||
@@ -451,4 +456,15 @@ class Arguments:
|
||||
'files to the current version',
|
||||
parents=[_common_parser])
|
||||
strategy_updater_cmd.set_defaults(func=start_strategy_update)
|
||||
self._build_args(optionlist=ARGS_STRATEGY_UTILS, parser=strategy_updater_cmd)
|
||||
self._build_args(optionlist=ARGS_STRATEGY_UPDATER, parser=strategy_updater_cmd)
|
||||
|
||||
# Add lookahead_analysis subcommand
|
||||
lookahead_analayis_cmd = subparsers.add_parser(
|
||||
'lookahead-analysis',
|
||||
help="Check for potential look ahead bias.",
|
||||
parents=[_common_parser, _strategy_parser])
|
||||
|
||||
lookahead_analayis_cmd.set_defaults(func=start_lookahead_analysis)
|
||||
|
||||
self._build_args(optionlist=ARGS_LOOKAHEAD_ANALYSIS,
|
||||
parser=lookahead_analayis_cmd)
|
||||
|
||||
17
freqtrade/commands/cli_options.py
Normal file → Executable file
17
freqtrade/commands/cli_options.py
Normal file → Executable file
@@ -690,4 +690,21 @@ AVAILABLE_CLI_OPTIONS = {
|
||||
help='Run backtest with ready models.',
|
||||
action='store_true'
|
||||
),
|
||||
"minimum_trade_amount": Arg(
|
||||
'--minimum-trade-amount',
|
||||
help='Minimum trade amount for lookahead-analysis',
|
||||
type=check_int_positive,
|
||||
metavar='INT',
|
||||
),
|
||||
"targeted_trade_amount": Arg(
|
||||
'--targeted-trade-amount',
|
||||
help='Targeted trade amount for lookahead analysis',
|
||||
type=check_int_positive,
|
||||
metavar='INT',
|
||||
),
|
||||
"lookahead_analysis_exportfilename": Arg(
|
||||
'--lookahead-analysis-exportfilename',
|
||||
help="Use this csv-filename to store lookahead-analysis-results",
|
||||
type=str
|
||||
),
|
||||
}
|
||||
|
||||
@@ -132,3 +132,15 @@ def start_edge(args: Dict[str, Any]) -> None:
|
||||
# Initialize Edge object
|
||||
edge_cli = EdgeCli(config)
|
||||
edge_cli.start()
|
||||
|
||||
|
||||
def start_lookahead_analysis(args: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Start the backtest bias tester script
|
||||
:param args: Cli args from Arguments()
|
||||
:return: None
|
||||
"""
|
||||
from freqtrade.optimize.lookahead_analysis_helpers import LookaheadAnalysisSubFunctions
|
||||
|
||||
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
|
||||
LookaheadAnalysisSubFunctions.start(config)
|
||||
|
||||
@@ -203,7 +203,7 @@ class Configuration:
|
||||
# This will override the strategy configuration
|
||||
self._args_to_config(config, argname='timeframe',
|
||||
logstring='Parameter -i/--timeframe detected ... '
|
||||
'Using timeframe: {} ...')
|
||||
'Using timeframe: {} ...')
|
||||
|
||||
self._args_to_config(config, argname='position_stacking',
|
||||
logstring='Parameter --enable-position-stacking detected ...')
|
||||
@@ -300,6 +300,9 @@ class Configuration:
|
||||
self._args_to_config(config, argname='hyperoptexportfilename',
|
||||
logstring='Using hyperopt file: {}')
|
||||
|
||||
self._args_to_config(config, argname='lookahead_analysis_exportfilename',
|
||||
logstring='Saving lookahead analysis results into {} ...')
|
||||
|
||||
self._args_to_config(config, argname='epochs',
|
||||
logstring='Parameter --epochs detected ... '
|
||||
'Will run Hyperopt with for {} epochs ...'
|
||||
@@ -474,6 +477,19 @@ class Configuration:
|
||||
self._args_to_config(config, argname='analysis_csv_path',
|
||||
logstring='Path to store analysis CSVs: {}')
|
||||
|
||||
self._args_to_config(config, argname='analysis_csv_path',
|
||||
logstring='Path to store analysis CSVs: {}')
|
||||
|
||||
# Lookahead analysis results
|
||||
self._args_to_config(config, argname='targeted_trade_amount',
|
||||
logstring='Targeted Trade amount: {}')
|
||||
|
||||
self._args_to_config(config, argname='minimum_trade_amount',
|
||||
logstring='Minimum Trade amount: {}')
|
||||
|
||||
self._args_to_config(config, argname='lookahead_analysis_exportfilename',
|
||||
logstring='Path to store lookahead-analysis-results: {}')
|
||||
|
||||
def _process_runmode(self, config: Config) -> None:
|
||||
|
||||
self._args_to_config(config, argname='dry_run',
|
||||
|
||||
@@ -163,6 +163,9 @@ CONF_SCHEMA = {
|
||||
'trading_mode': {'type': 'string', 'enum': TRADING_MODES},
|
||||
'margin_mode': {'type': 'string', 'enum': MARGIN_MODES},
|
||||
'reduce_df_footprint': {'type': 'boolean', 'default': False},
|
||||
'minimum_trade_amount': {'type': 'number', 'default': 10},
|
||||
'targeted_trade_amount': {'type': 'number', 'default': 20},
|
||||
'lookahead_analysis_exportfilename': {'type': 'string'},
|
||||
'liquidation_buffer': {'type': 'number', 'minimum': 0.0, 'maximum': 0.99},
|
||||
'backtest_breakdown': {
|
||||
'type': 'array',
|
||||
|
||||
@@ -2,6 +2,9 @@
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def set_loggers(verbosity: int = 0, api_verbosity: str = 'info') -> None:
|
||||
"""
|
||||
Set the logging level for third party libraries
|
||||
@@ -23,3 +26,30 @@ def set_loggers(verbosity: int = 0, api_verbosity: str = 'info') -> None:
|
||||
logging.getLogger('werkzeug').setLevel(
|
||||
logging.ERROR if api_verbosity == 'error' else logging.INFO
|
||||
)
|
||||
|
||||
|
||||
__BIAS_TESTER_LOGGERS = [
|
||||
'freqtrade.resolvers',
|
||||
'freqtrade.strategy.hyper',
|
||||
'freqtrade.configuration.config_validation',
|
||||
]
|
||||
|
||||
|
||||
def reduce_verbosity_for_bias_tester() -> None:
|
||||
"""
|
||||
Reduce verbosity for bias tester.
|
||||
It loads the same strategy several times, which would spam the log.
|
||||
"""
|
||||
logger.info("Reducing verbosity for bias tester.")
|
||||
for logger_name in __BIAS_TESTER_LOGGERS:
|
||||
logging.getLogger(logger_name).setLevel(logging.WARNING)
|
||||
|
||||
|
||||
def restore_verbosity_for_bias_tester() -> None:
|
||||
"""
|
||||
Restore verbosity after bias tester.
|
||||
"""
|
||||
logger.info("Restoring log verbosity.")
|
||||
log_level = logging.NOTSET
|
||||
for logger_name in __BIAS_TESTER_LOGGERS:
|
||||
logging.getLogger(logger_name).setLevel(log_level)
|
||||
|
||||
@@ -24,6 +24,7 @@ from freqtrade.enums import (BacktestState, CandleType, ExitCheckTuple, ExitType
|
||||
from freqtrade.exceptions import DependencyException, OperationalException
|
||||
from freqtrade.exchange import (amount_to_contract_precision, price_to_precision,
|
||||
timeframe_to_minutes, timeframe_to_seconds)
|
||||
from freqtrade.exchange.exchange import Exchange
|
||||
from freqtrade.mixins import LoggingMixin
|
||||
from freqtrade.optimize.backtest_caching import get_strategy_run_id
|
||||
from freqtrade.optimize.bt_progress import BTProgress
|
||||
@@ -72,7 +73,7 @@ class Backtesting:
|
||||
backtesting.start()
|
||||
"""
|
||||
|
||||
def __init__(self, config: Config) -> None:
|
||||
def __init__(self, config: Config, exchange: Optional[Exchange] = None) -> None:
|
||||
|
||||
LoggingMixin.show_output = False
|
||||
self.config = config
|
||||
@@ -89,7 +90,10 @@ class Backtesting:
|
||||
self.rejected_df: Dict[str, Dict] = {}
|
||||
|
||||
self._exchange_name = self.config['exchange']['name']
|
||||
self.exchange = ExchangeResolver.load_exchange(self.config, load_leverage_tiers=True)
|
||||
if not exchange:
|
||||
exchange = ExchangeResolver.load_exchange(self.config, load_leverage_tiers=True)
|
||||
self.exchange = exchange
|
||||
|
||||
self.dataprovider = DataProvider(self.config, self.exchange)
|
||||
|
||||
if self.config.get('strategy_list'):
|
||||
@@ -114,16 +118,7 @@ class Backtesting:
|
||||
self.timeframe_min = timeframe_to_minutes(self.timeframe)
|
||||
self.init_backtest_detail()
|
||||
self.pairlists = PairListManager(self.exchange, self.config, self.dataprovider)
|
||||
if 'VolumePairList' in self.pairlists.name_list:
|
||||
raise OperationalException("VolumePairList not allowed for backtesting. "
|
||||
"Please use StaticPairList instead.")
|
||||
if 'PerformanceFilter' in self.pairlists.name_list:
|
||||
raise OperationalException("PerformanceFilter not allowed for backtesting.")
|
||||
|
||||
if len(self.strategylist) > 1 and 'PrecisionFilter' in self.pairlists.name_list:
|
||||
raise OperationalException(
|
||||
"PrecisionFilter not allowed for backtesting multiple strategies."
|
||||
)
|
||||
self._validate_pairlists_for_backtesting()
|
||||
|
||||
self.dataprovider.add_pairlisthandler(self.pairlists)
|
||||
self.pairlists.refresh_pairlist()
|
||||
@@ -164,6 +159,18 @@ class Backtesting:
|
||||
|
||||
self.init_backtest()
|
||||
|
||||
def _validate_pairlists_for_backtesting(self):
|
||||
if 'VolumePairList' in self.pairlists.name_list:
|
||||
raise OperationalException("VolumePairList not allowed for backtesting. "
|
||||
"Please use StaticPairList instead.")
|
||||
if 'PerformanceFilter' in self.pairlists.name_list:
|
||||
raise OperationalException("PerformanceFilter not allowed for backtesting.")
|
||||
|
||||
if len(self.strategylist) > 1 and 'PrecisionFilter' in self.pairlists.name_list:
|
||||
raise OperationalException(
|
||||
"PrecisionFilter not allowed for backtesting multiple strategies."
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def cleanup():
|
||||
LoggingMixin.show_output = True
|
||||
|
||||
275
freqtrade/optimize/lookahead_analysis.py
Executable file
275
freqtrade/optimize/lookahead_analysis.py
Executable file
@@ -0,0 +1,275 @@
|
||||
import logging
|
||||
import shutil
|
||||
from copy import deepcopy
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pandas import DataFrame
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.data.history import get_timerange
|
||||
from freqtrade.exchange import timeframe_to_minutes
|
||||
from freqtrade.loggers.set_log_levels import (reduce_verbosity_for_bias_tester,
|
||||
restore_verbosity_for_bias_tester)
|
||||
from freqtrade.optimize.backtesting import Backtesting
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VarHolder:
|
||||
timerange: TimeRange
|
||||
data: DataFrame
|
||||
indicators: Dict[str, DataFrame]
|
||||
result: DataFrame
|
||||
compared: DataFrame
|
||||
from_dt: datetime
|
||||
to_dt: datetime
|
||||
compared_dt: datetime
|
||||
timeframe: str
|
||||
|
||||
|
||||
class Analysis:
|
||||
def __init__(self) -> None:
|
||||
self.total_signals = 0
|
||||
self.false_entry_signals = 0
|
||||
self.false_exit_signals = 0
|
||||
self.false_indicators: List[str] = []
|
||||
self.has_bias = False
|
||||
|
||||
|
||||
class LookaheadAnalysis:
|
||||
|
||||
def __init__(self, config: Dict[str, Any], strategy_obj: Dict):
|
||||
self.failed_bias_check = True
|
||||
self.full_varHolder = VarHolder()
|
||||
|
||||
self.entry_varHolders: List[VarHolder] = []
|
||||
self.exit_varHolders: List[VarHolder] = []
|
||||
self.exchange: Optional[Any] = None
|
||||
|
||||
# pull variables the scope of the lookahead_analysis-instance
|
||||
self.local_config = deepcopy(config)
|
||||
self.local_config['strategy'] = strategy_obj['name']
|
||||
self.current_analysis = Analysis()
|
||||
self.minimum_trade_amount = config['minimum_trade_amount']
|
||||
self.targeted_trade_amount = config['targeted_trade_amount']
|
||||
self.strategy_obj = strategy_obj
|
||||
|
||||
@staticmethod
|
||||
def dt_to_timestamp(dt: datetime):
|
||||
timestamp = int(dt.replace(tzinfo=timezone.utc).timestamp())
|
||||
return timestamp
|
||||
|
||||
@staticmethod
|
||||
def get_result(backtesting: Backtesting, processed: DataFrame):
|
||||
min_date, max_date = get_timerange(processed)
|
||||
|
||||
result = backtesting.backtest(
|
||||
processed=deepcopy(processed),
|
||||
start_date=min_date,
|
||||
end_date=max_date
|
||||
)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def report_signal(result: dict, column_name: str, checked_timestamp: datetime):
|
||||
df = result['results']
|
||||
row_count = df[column_name].shape[0]
|
||||
|
||||
if row_count == 0:
|
||||
return False
|
||||
else:
|
||||
|
||||
df_cut = df[(df[column_name] == checked_timestamp)]
|
||||
if df_cut[column_name].shape[0] == 0:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
return False
|
||||
|
||||
# analyzes two data frames with processed indicators and shows differences between them.
|
||||
def analyze_indicators(self, full_vars: VarHolder, cut_vars: VarHolder, current_pair: str):
|
||||
# extract dataframes
|
||||
cut_df: DataFrame = cut_vars.indicators[current_pair]
|
||||
full_df: DataFrame = full_vars.indicators[current_pair]
|
||||
|
||||
# cut longer dataframe to length of the shorter
|
||||
full_df_cut = full_df[
|
||||
(full_df.date == cut_vars.compared_dt)
|
||||
].reset_index(drop=True)
|
||||
cut_df_cut = cut_df[
|
||||
(cut_df.date == cut_vars.compared_dt)
|
||||
].reset_index(drop=True)
|
||||
|
||||
# check if dataframes are not empty
|
||||
if full_df_cut.shape[0] != 0 and cut_df_cut.shape[0] != 0:
|
||||
|
||||
# compare dataframes
|
||||
compare_df = full_df_cut.compare(cut_df_cut)
|
||||
|
||||
if compare_df.shape[0] > 0:
|
||||
for col_name, values in compare_df.items():
|
||||
col_idx = compare_df.columns.get_loc(col_name)
|
||||
compare_df_row = compare_df.iloc[0]
|
||||
# compare_df now comprises tuples with [1] having either 'self' or 'other'
|
||||
if 'other' in col_name[1]:
|
||||
continue
|
||||
self_value = compare_df_row[col_idx]
|
||||
other_value = compare_df_row[col_idx + 1]
|
||||
|
||||
# output differences
|
||||
if self_value != other_value:
|
||||
|
||||
if not self.current_analysis.false_indicators.__contains__(col_name[0]):
|
||||
self.current_analysis.false_indicators.append(col_name[0])
|
||||
logger.info(f"=> found look ahead bias in indicator "
|
||||
f"{col_name[0]}. "
|
||||
f"{str(self_value)} != {str(other_value)}")
|
||||
|
||||
def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]):
|
||||
|
||||
if 'freqai' in self.local_config and 'identifier' in self.local_config['freqai']:
|
||||
# purge previous data if the freqai model is defined
|
||||
# (to be sure nothing is carried over from older backtests)
|
||||
path_to_current_identifier = (
|
||||
Path(f"{self.local_config['user_data_dir']}/models/"
|
||||
f"{self.local_config['freqai']['identifier']}").resolve())
|
||||
# remove folder and its contents
|
||||
if Path.exists(path_to_current_identifier):
|
||||
shutil.rmtree(path_to_current_identifier)
|
||||
|
||||
prepare_data_config = deepcopy(self.local_config)
|
||||
prepare_data_config['timerange'] = (str(self.dt_to_timestamp(varholder.from_dt)) + "-" +
|
||||
str(self.dt_to_timestamp(varholder.to_dt)))
|
||||
prepare_data_config['exchange']['pair_whitelist'] = pairs_to_load
|
||||
|
||||
backtesting = Backtesting(prepare_data_config, self.exchange)
|
||||
self.exchange = backtesting.exchange
|
||||
backtesting._set_strategy(backtesting.strategylist[0])
|
||||
|
||||
varholder.data, varholder.timerange = backtesting.load_bt_data()
|
||||
backtesting.load_bt_data_detail()
|
||||
varholder.timeframe = backtesting.timeframe
|
||||
|
||||
varholder.indicators = backtesting.strategy.advise_all_indicators(varholder.data)
|
||||
varholder.result = self.get_result(backtesting, varholder.indicators)
|
||||
|
||||
def fill_full_varholder(self):
|
||||
self.full_varHolder = VarHolder()
|
||||
|
||||
# define datetime in human-readable format
|
||||
parsed_timerange = TimeRange.parse_timerange(self.local_config['timerange'])
|
||||
|
||||
if parsed_timerange.startdt is None:
|
||||
self.full_varHolder.from_dt = datetime.fromtimestamp(0, tz=timezone.utc)
|
||||
else:
|
||||
self.full_varHolder.from_dt = parsed_timerange.startdt
|
||||
|
||||
if parsed_timerange.stopdt is None:
|
||||
self.full_varHolder.to_dt = datetime.utcnow()
|
||||
else:
|
||||
self.full_varHolder.to_dt = parsed_timerange.stopdt
|
||||
|
||||
self.prepare_data(self.full_varHolder, self.local_config['pairs'])
|
||||
|
||||
def fill_entry_and_exit_varHolders(self, result_row):
|
||||
# entry_varHolder
|
||||
entry_varHolder = VarHolder()
|
||||
self.entry_varHolders.append(entry_varHolder)
|
||||
entry_varHolder.from_dt = self.full_varHolder.from_dt
|
||||
entry_varHolder.compared_dt = result_row['open_date']
|
||||
# to_dt needs +1 candle since it won't buy on the last candle
|
||||
entry_varHolder.to_dt = (
|
||||
result_row['open_date'] +
|
||||
timedelta(minutes=timeframe_to_minutes(self.full_varHolder.timeframe)))
|
||||
self.prepare_data(entry_varHolder, [result_row['pair']])
|
||||
|
||||
# exit_varHolder
|
||||
exit_varHolder = VarHolder()
|
||||
self.exit_varHolders.append(exit_varHolder)
|
||||
# to_dt needs +1 candle since it will always exit/force-exit trades on the last candle
|
||||
exit_varHolder.from_dt = self.full_varHolder.from_dt
|
||||
exit_varHolder.to_dt = (
|
||||
result_row['close_date'] +
|
||||
timedelta(minutes=timeframe_to_minutes(self.full_varHolder.timeframe)))
|
||||
exit_varHolder.compared_dt = result_row['close_date']
|
||||
self.prepare_data(exit_varHolder, [result_row['pair']])
|
||||
|
||||
# now we analyze a full trade of full_varholder and look for analyze its bias
|
||||
def analyze_row(self, idx, result_row):
|
||||
# if force-sold, ignore this signal since here it will unconditionally exit.
|
||||
if result_row.close_date == self.dt_to_timestamp(self.full_varHolder.to_dt):
|
||||
return
|
||||
|
||||
# keep track of how many signals are processed at total
|
||||
self.current_analysis.total_signals += 1
|
||||
|
||||
# fill entry_varHolder and exit_varHolder
|
||||
self.fill_entry_and_exit_varHolders(result_row)
|
||||
|
||||
# register if buy signal is broken
|
||||
if not self.report_signal(
|
||||
self.entry_varHolders[idx].result,
|
||||
"open_date",
|
||||
self.entry_varHolders[idx].compared_dt):
|
||||
self.current_analysis.false_entry_signals += 1
|
||||
|
||||
# register if buy or sell signal is broken
|
||||
if not self.report_signal(
|
||||
self.exit_varHolders[idx].result,
|
||||
"close_date",
|
||||
self.exit_varHolders[idx].compared_dt):
|
||||
self.current_analysis.false_exit_signals += 1
|
||||
|
||||
# check if the indicators themselves contain biased data
|
||||
self.analyze_indicators(self.full_varHolder, self.entry_varHolders[idx], result_row['pair'])
|
||||
self.analyze_indicators(self.full_varHolder, self.exit_varHolders[idx], result_row['pair'])
|
||||
|
||||
def start(self) -> None:
|
||||
|
||||
# first make a single backtest
|
||||
self.fill_full_varholder()
|
||||
|
||||
reduce_verbosity_for_bias_tester()
|
||||
|
||||
# check if requirements have been met of full_varholder
|
||||
found_signals: int = self.full_varHolder.result['results'].shape[0] + 1
|
||||
if found_signals >= self.targeted_trade_amount:
|
||||
logger.info(f"Found {found_signals} trades, "
|
||||
f"calculating {self.targeted_trade_amount} trades.")
|
||||
elif self.targeted_trade_amount >= found_signals >= self.minimum_trade_amount:
|
||||
logger.info(f"Only found {found_signals} trades. Calculating all available trades.")
|
||||
else:
|
||||
logger.info(f"found {found_signals} trades "
|
||||
f"which is less than minimum_trade_amount {self.minimum_trade_amount}. "
|
||||
f"Cancelling this backtest lookahead bias test.")
|
||||
return
|
||||
|
||||
# now we loop through all signals
|
||||
# starting from the same datetime to avoid miss-reports of bias
|
||||
for idx, result_row in self.full_varHolder.result['results'].iterrows():
|
||||
if self.current_analysis.total_signals == self.targeted_trade_amount:
|
||||
break
|
||||
self.analyze_row(idx, result_row)
|
||||
|
||||
# Restore verbosity, so it's not too quiet for the next strategy
|
||||
restore_verbosity_for_bias_tester()
|
||||
# check and report signals
|
||||
if self.current_analysis.total_signals < self.local_config['minimum_trade_amount']:
|
||||
logger.info(f" -> {self.local_config['strategy']} : too few trades. "
|
||||
f"We only found {self.current_analysis.total_signals} trades. "
|
||||
f"Hint: Extend the timerange "
|
||||
f"to get at least {self.local_config['minimum_trade_amount']} "
|
||||
f"or lower the value of minimum_trade_amount.")
|
||||
self.failed_bias_check = True
|
||||
elif (self.current_analysis.false_entry_signals > 0 or
|
||||
self.current_analysis.false_exit_signals > 0 or
|
||||
len(self.current_analysis.false_indicators) > 0):
|
||||
logger.info(f" => {self.local_config['strategy']} : bias detected!")
|
||||
self.current_analysis.has_bias = True
|
||||
self.failed_bias_check = False
|
||||
else:
|
||||
logger.info(self.local_config['strategy'] + ": no bias detected")
|
||||
self.failed_bias_check = False
|
||||
202
freqtrade/optimize/lookahead_analysis_helpers.py
Normal file
202
freqtrade/optimize/lookahead_analysis_helpers.py
Normal file
@@ -0,0 +1,202 @@
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from freqtrade.constants import Config
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.optimize.lookahead_analysis import LookaheadAnalysis
|
||||
from freqtrade.resolvers import StrategyResolver
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LookaheadAnalysisSubFunctions:
|
||||
|
||||
@staticmethod
|
||||
def text_table_lookahead_analysis_instances(
|
||||
config: Dict[str, Any],
|
||||
lookahead_instances: List[LookaheadAnalysis]):
|
||||
headers = ['filename', 'strategy', 'has_bias', 'total_signals',
|
||||
'biased_entry_signals', 'biased_exit_signals', 'biased_indicators']
|
||||
data = []
|
||||
for inst in lookahead_instances:
|
||||
if config['minimum_trade_amount'] > inst.current_analysis.total_signals:
|
||||
data.append(
|
||||
[
|
||||
inst.strategy_obj['location'].parts[-1],
|
||||
inst.strategy_obj['name'],
|
||||
"too few trades caught "
|
||||
f"({inst.current_analysis.total_signals}/{config['minimum_trade_amount']})."
|
||||
f"Test failed."
|
||||
]
|
||||
)
|
||||
elif inst.failed_bias_check:
|
||||
data.append(
|
||||
[
|
||||
inst.strategy_obj['location'].parts[-1],
|
||||
inst.strategy_obj['name'],
|
||||
'error while checking'
|
||||
]
|
||||
)
|
||||
else:
|
||||
data.append(
|
||||
[
|
||||
inst.strategy_obj['location'].parts[-1],
|
||||
inst.strategy_obj['name'],
|
||||
inst.current_analysis.has_bias,
|
||||
inst.current_analysis.total_signals,
|
||||
inst.current_analysis.false_entry_signals,
|
||||
inst.current_analysis.false_exit_signals,
|
||||
", ".join(inst.current_analysis.false_indicators)
|
||||
]
|
||||
)
|
||||
from tabulate import tabulate
|
||||
table = tabulate(data, headers=headers, tablefmt="orgtbl")
|
||||
print(table)
|
||||
return table, headers, data
|
||||
|
||||
@staticmethod
|
||||
def export_to_csv(config: Dict[str, Any], lookahead_analysis: List[LookaheadAnalysis]):
|
||||
def add_or_update_row(df, row_data):
|
||||
if (
|
||||
(df['filename'] == row_data['filename']) &
|
||||
(df['strategy'] == row_data['strategy'])
|
||||
).any():
|
||||
# Update existing row
|
||||
pd_series = pd.DataFrame([row_data])
|
||||
df.loc[
|
||||
(df['filename'] == row_data['filename']) &
|
||||
(df['strategy'] == row_data['strategy'])
|
||||
] = pd_series
|
||||
else:
|
||||
# Add new row
|
||||
df = pd.concat([df, pd.DataFrame([row_data], columns=df.columns)])
|
||||
|
||||
return df
|
||||
|
||||
if Path(config['lookahead_analysis_exportfilename']).exists():
|
||||
# Read CSV file into a pandas dataframe
|
||||
csv_df = pd.read_csv(config['lookahead_analysis_exportfilename'])
|
||||
else:
|
||||
# Create a new empty DataFrame with the desired column names and set the index
|
||||
csv_df = pd.DataFrame(columns=[
|
||||
'filename', 'strategy', 'has_bias', 'total_signals',
|
||||
'biased_entry_signals', 'biased_exit_signals', 'biased_indicators'
|
||||
],
|
||||
index=None)
|
||||
|
||||
for inst in lookahead_analysis:
|
||||
# only update if
|
||||
if (inst.current_analysis.total_signals > config['minimum_trade_amount']
|
||||
and inst.failed_bias_check is not True):
|
||||
new_row_data = {'filename': inst.strategy_obj['location'].parts[-1],
|
||||
'strategy': inst.strategy_obj['name'],
|
||||
'has_bias': inst.current_analysis.has_bias,
|
||||
'total_signals':
|
||||
int(inst.current_analysis.total_signals),
|
||||
'biased_entry_signals':
|
||||
int(inst.current_analysis.false_entry_signals),
|
||||
'biased_exit_signals':
|
||||
int(inst.current_analysis.false_exit_signals),
|
||||
'biased_indicators':
|
||||
",".join(inst.current_analysis.false_indicators)}
|
||||
csv_df = add_or_update_row(csv_df, new_row_data)
|
||||
|
||||
# Fill NaN values with a default value (e.g., 0)
|
||||
csv_df['total_signals'] = csv_df['total_signals'].fillna(0)
|
||||
csv_df['biased_entry_signals'] = csv_df['biased_entry_signals'].fillna(0)
|
||||
csv_df['biased_exit_signals'] = csv_df['biased_exit_signals'].fillna(0)
|
||||
|
||||
# Convert columns to integers
|
||||
csv_df['total_signals'] = csv_df['total_signals'].astype(int)
|
||||
csv_df['biased_entry_signals'] = csv_df['biased_entry_signals'].astype(int)
|
||||
csv_df['biased_exit_signals'] = csv_df['biased_exit_signals'].astype(int)
|
||||
|
||||
logger.info(f"saving {config['lookahead_analysis_exportfilename']}")
|
||||
csv_df.to_csv(config['lookahead_analysis_exportfilename'], index=False)
|
||||
|
||||
@staticmethod
|
||||
def calculate_config_overrides(config: Config):
|
||||
if config['targeted_trade_amount'] < config['minimum_trade_amount']:
|
||||
# this combo doesn't make any sense.
|
||||
raise OperationalException(
|
||||
"Targeted trade amount can't be smaller than minimum trade amount."
|
||||
)
|
||||
if len(config['pairs']) > config['max_open_trades']:
|
||||
logger.info('Max_open_trades were less than amount of pairs. '
|
||||
'Set max_open_trades to amount of pairs just to avoid false positives.')
|
||||
config['max_open_trades'] = len(config['pairs'])
|
||||
|
||||
min_dry_run_wallet = 1000000000
|
||||
if config['dry_run_wallet'] < min_dry_run_wallet:
|
||||
logger.info('Dry run wallet was not set to 1 billion, pushing it up there '
|
||||
'just to avoid false positives')
|
||||
config['dry_run_wallet'] = min_dry_run_wallet
|
||||
|
||||
# enforce cache to be 'none', shift it to 'none' if not already
|
||||
# (since the default value is 'day')
|
||||
if config.get('backtest_cache') is None:
|
||||
config['backtest_cache'] = 'none'
|
||||
elif config['backtest_cache'] != 'none':
|
||||
logger.info(f"backtest_cache = "
|
||||
f"{config['backtest_cache']} detected. "
|
||||
f"Inside lookahead-analysis it is enforced to be 'none'. "
|
||||
f"Changed it to 'none'")
|
||||
config['backtest_cache'] = 'none'
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def initialize_single_lookahead_analysis(config: Config, strategy_obj: Dict[str, Any]):
|
||||
|
||||
logger.info(f"Bias test of {Path(strategy_obj['location']).name} started.")
|
||||
start = time.perf_counter()
|
||||
current_instance = LookaheadAnalysis(config, strategy_obj)
|
||||
current_instance.start()
|
||||
elapsed = time.perf_counter() - start
|
||||
logger.info(f"Checking look ahead bias via backtests "
|
||||
f"of {Path(strategy_obj['location']).name} "
|
||||
f"took {elapsed:.0f} seconds.")
|
||||
return current_instance
|
||||
|
||||
@staticmethod
|
||||
def start(config: Config):
|
||||
config = LookaheadAnalysisSubFunctions.calculate_config_overrides(config)
|
||||
|
||||
strategy_objs = StrategyResolver.search_all_objects(
|
||||
config, enum_failed=False, recursive=config.get('recursive_strategy_search', False))
|
||||
|
||||
lookaheadAnalysis_instances = []
|
||||
|
||||
# unify --strategy and --strategy_list to one list
|
||||
if not (strategy_list := config.get('strategy_list', [])):
|
||||
if config.get('strategy') is None:
|
||||
raise OperationalException(
|
||||
"No Strategy specified. Please specify a strategy via --strategy or "
|
||||
"--strategy_list"
|
||||
)
|
||||
strategy_list = [config['strategy']]
|
||||
|
||||
# check if strategies can be properly loaded, only check them if they can be.
|
||||
for strat in strategy_list:
|
||||
for strategy_obj in strategy_objs:
|
||||
if strategy_obj['name'] == strat and strategy_obj not in strategy_list:
|
||||
lookaheadAnalysis_instances.append(
|
||||
LookaheadAnalysisSubFunctions.initialize_single_lookahead_analysis(
|
||||
config, strategy_obj))
|
||||
break
|
||||
|
||||
# report the results
|
||||
if lookaheadAnalysis_instances:
|
||||
LookaheadAnalysisSubFunctions.text_table_lookahead_analysis_instances(
|
||||
config, lookaheadAnalysis_instances)
|
||||
if config.get('lookahead_analysis_exportfilename') is not None:
|
||||
LookaheadAnalysisSubFunctions.export_to_csv(config, lookaheadAnalysis_instances)
|
||||
else:
|
||||
logger.error("There were no strategies specified neither through "
|
||||
"--strategy nor through "
|
||||
"--strategy_list "
|
||||
"or timeframe was not specified.")
|
||||
@@ -22,6 +22,7 @@ nav:
|
||||
- Web Hook: webhook-config.md
|
||||
- Data Downloading: data-download.md
|
||||
- Backtesting: backtesting.md
|
||||
- Lookahead analysis: lookahead-analysis.md
|
||||
- Hyperopt: hyperopt.md
|
||||
- FreqAI:
|
||||
- Introduction: freqai.md
|
||||
|
||||
366
tests/optimize/test_lookahead_analysis.py
Normal file
366
tests/optimize/test_lookahead_analysis.py
Normal file
@@ -0,0 +1,366 @@
|
||||
# pragma pylint: disable=missing-docstring, W0212, line-too-long, C0103, unused-argument
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, PropertyMock
|
||||
|
||||
import pytest
|
||||
|
||||
from freqtrade.commands.optimize_commands import start_lookahead_analysis
|
||||
from freqtrade.data.history import get_timerange
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.optimize.lookahead_analysis import Analysis, LookaheadAnalysis
|
||||
from freqtrade.optimize.lookahead_analysis_helpers import LookaheadAnalysisSubFunctions
|
||||
from tests.conftest import EXMS, get_args, log_has_re, patch_exchange
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lookahead_conf(default_conf_usdt):
|
||||
default_conf_usdt['minimum_trade_amount'] = 10
|
||||
default_conf_usdt['targeted_trade_amount'] = 20
|
||||
default_conf_usdt['strategy_path'] = str(
|
||||
Path(__file__).parent.parent / "strategy/strats/lookahead_bias")
|
||||
default_conf_usdt['strategy'] = 'strategy_test_v3_with_lookahead_bias'
|
||||
default_conf_usdt['max_open_trades'] = 1
|
||||
default_conf_usdt['dry_run_wallet'] = 1000000000
|
||||
default_conf_usdt['pairs'] = ['UNITTEST/USDT']
|
||||
return default_conf_usdt
|
||||
|
||||
|
||||
def test_start_lookahead_analysis(mocker):
|
||||
single_mock = MagicMock()
|
||||
text_table_mock = MagicMock()
|
||||
mocker.patch.multiple(
|
||||
'freqtrade.optimize.lookahead_analysis_helpers.LookaheadAnalysisSubFunctions',
|
||||
initialize_single_lookahead_analysis=single_mock,
|
||||
text_table_lookahead_analysis_instances=text_table_mock,
|
||||
)
|
||||
args = [
|
||||
"lookahead-analysis",
|
||||
"--strategy",
|
||||
"strategy_test_v3_with_lookahead_bias",
|
||||
"--strategy-path",
|
||||
str(Path(__file__).parent.parent / "strategy/strats/lookahead_bias"),
|
||||
"--pairs",
|
||||
"UNITTEST/BTC",
|
||||
"--max-open-trades",
|
||||
"1"
|
||||
]
|
||||
pargs = get_args(args)
|
||||
pargs['config'] = None
|
||||
|
||||
start_lookahead_analysis(pargs)
|
||||
assert single_mock.call_count == 1
|
||||
assert text_table_mock.call_count == 1
|
||||
|
||||
single_mock.reset_mock()
|
||||
|
||||
# Test invalid config
|
||||
args = [
|
||||
"lookahead-analysis",
|
||||
"--strategy",
|
||||
"strategy_test_v3_with_lookahead_bias",
|
||||
"--strategy-path",
|
||||
str(Path(__file__).parent.parent / "strategy/strats/lookahead_bias"),
|
||||
"--targeted-trade-amount",
|
||||
"10",
|
||||
"--minimum-trade-amount",
|
||||
"20",
|
||||
]
|
||||
pargs = get_args(args)
|
||||
pargs['config'] = None
|
||||
with pytest.raises(OperationalException,
|
||||
match=r"Targeted trade amount can't be smaller than minimum trade amount.*"):
|
||||
start_lookahead_analysis(pargs)
|
||||
|
||||
|
||||
def test_lookahead_helper_invalid_config(lookahead_conf) -> None:
|
||||
conf = deepcopy(lookahead_conf)
|
||||
conf['targeted_trade_amount'] = 10
|
||||
conf['minimum_trade_amount'] = 40
|
||||
with pytest.raises(OperationalException,
|
||||
match=r"Targeted trade amount can't be smaller than minimum trade amount.*"):
|
||||
LookaheadAnalysisSubFunctions.start(conf)
|
||||
|
||||
|
||||
def test_lookahead_helper_no_strategy_defined(lookahead_conf):
|
||||
conf = deepcopy(lookahead_conf)
|
||||
conf['pairs'] = ['UNITTEST/USDT']
|
||||
del conf['strategy']
|
||||
with pytest.raises(OperationalException,
|
||||
match=r"No Strategy specified"):
|
||||
LookaheadAnalysisSubFunctions.start(conf)
|
||||
|
||||
|
||||
def test_lookahead_helper_start(lookahead_conf, mocker) -> None:
|
||||
single_mock = MagicMock()
|
||||
text_table_mock = MagicMock()
|
||||
mocker.patch.multiple(
|
||||
'freqtrade.optimize.lookahead_analysis_helpers.LookaheadAnalysisSubFunctions',
|
||||
initialize_single_lookahead_analysis=single_mock,
|
||||
text_table_lookahead_analysis_instances=text_table_mock,
|
||||
)
|
||||
LookaheadAnalysisSubFunctions.start(lookahead_conf)
|
||||
assert single_mock.call_count == 1
|
||||
assert text_table_mock.call_count == 1
|
||||
|
||||
single_mock.reset_mock()
|
||||
text_table_mock.reset_mock()
|
||||
|
||||
|
||||
def test_lookahead_helper_text_table_lookahead_analysis_instances(lookahead_conf):
|
||||
analysis = Analysis()
|
||||
analysis.has_bias = True
|
||||
analysis.total_signals = 5
|
||||
analysis.false_entry_signals = 4
|
||||
analysis.false_exit_signals = 3
|
||||
|
||||
strategy_obj = {
|
||||
'name': "strategy_test_v3_with_lookahead_bias",
|
||||
'location': Path(lookahead_conf['strategy_path'], f"{lookahead_conf['strategy']}.py")
|
||||
}
|
||||
|
||||
instance = LookaheadAnalysis(lookahead_conf, strategy_obj)
|
||||
instance.current_analysis = analysis
|
||||
table, headers, data = (LookaheadAnalysisSubFunctions.
|
||||
text_table_lookahead_analysis_instances(lookahead_conf, [instance]))
|
||||
|
||||
# check row contents for a try that has too few signals
|
||||
assert data[0][0] == 'strategy_test_v3_with_lookahead_bias.py'
|
||||
assert data[0][1] == 'strategy_test_v3_with_lookahead_bias'
|
||||
assert data[0][2].__contains__('too few trades')
|
||||
assert len(data[0]) == 3
|
||||
|
||||
# now check for an error which occured after enough trades
|
||||
analysis.total_signals = 12
|
||||
analysis.false_entry_signals = 11
|
||||
analysis.false_exit_signals = 10
|
||||
instance = LookaheadAnalysis(lookahead_conf, strategy_obj)
|
||||
instance.current_analysis = analysis
|
||||
table, headers, data = (LookaheadAnalysisSubFunctions.
|
||||
text_table_lookahead_analysis_instances(lookahead_conf, [instance]))
|
||||
assert data[0][2].__contains__("error")
|
||||
|
||||
# edit it into not showing an error
|
||||
instance.failed_bias_check = False
|
||||
table, headers, data = (LookaheadAnalysisSubFunctions.
|
||||
text_table_lookahead_analysis_instances(lookahead_conf, [instance]))
|
||||
assert data[0][0] == 'strategy_test_v3_with_lookahead_bias.py'
|
||||
assert data[0][1] == 'strategy_test_v3_with_lookahead_bias'
|
||||
assert data[0][2] # True
|
||||
assert data[0][3] == 12
|
||||
assert data[0][4] == 11
|
||||
assert data[0][5] == 10
|
||||
assert data[0][6] == ''
|
||||
|
||||
analysis.false_indicators.append('falseIndicator1')
|
||||
analysis.false_indicators.append('falseIndicator2')
|
||||
table, headers, data = (LookaheadAnalysisSubFunctions.
|
||||
text_table_lookahead_analysis_instances(lookahead_conf, [instance]))
|
||||
|
||||
assert data[0][6] == 'falseIndicator1, falseIndicator2'
|
||||
|
||||
# check amount of returning rows
|
||||
assert len(data) == 1
|
||||
|
||||
# check amount of multiple rows
|
||||
table, headers, data = (LookaheadAnalysisSubFunctions.text_table_lookahead_analysis_instances(
|
||||
lookahead_conf, [instance, instance, instance]))
|
||||
assert len(data) == 3
|
||||
|
||||
|
||||
def test_lookahead_helper_export_to_csv(lookahead_conf):
|
||||
import pandas as pd
|
||||
lookahead_conf['lookahead_analysis_exportfilename'] = "temp_csv_lookahead_analysis.csv"
|
||||
|
||||
# just to be sure the test won't fail: remove file if exists for some reason
|
||||
# (repeat this at the end once again to clean up)
|
||||
if Path(lookahead_conf['lookahead_analysis_exportfilename']).exists():
|
||||
Path(lookahead_conf['lookahead_analysis_exportfilename']).unlink()
|
||||
|
||||
# before we can start we have to delete the
|
||||
|
||||
# 1st check: create a new file and verify its contents
|
||||
analysis1 = Analysis()
|
||||
analysis1.has_bias = True
|
||||
analysis1.total_signals = 12
|
||||
analysis1.false_entry_signals = 11
|
||||
analysis1.false_exit_signals = 10
|
||||
analysis1.false_indicators.append('falseIndicator1')
|
||||
analysis1.false_indicators.append('falseIndicator2')
|
||||
lookahead_conf['lookahead_analysis_exportfilename'] = "temp_csv_lookahead_analysis.csv"
|
||||
|
||||
strategy_obj1 = {
|
||||
'name': "strat1",
|
||||
'location': Path("file1.py"),
|
||||
}
|
||||
|
||||
instance1 = LookaheadAnalysis(lookahead_conf, strategy_obj1)
|
||||
instance1.failed_bias_check = False
|
||||
instance1.current_analysis = analysis1
|
||||
|
||||
LookaheadAnalysisSubFunctions.export_to_csv(lookahead_conf, [instance1])
|
||||
saved_data1 = pd.read_csv(lookahead_conf['lookahead_analysis_exportfilename'])
|
||||
|
||||
expected_values1 = [
|
||||
[
|
||||
'file1.py', 'strat1', True,
|
||||
12, 11, 10,
|
||||
"falseIndicator1,falseIndicator2"
|
||||
],
|
||||
]
|
||||
expected_columns = ['filename', 'strategy', 'has_bias',
|
||||
'total_signals', 'biased_entry_signals', 'biased_exit_signals',
|
||||
'biased_indicators']
|
||||
expected_data1 = pd.DataFrame(expected_values1, columns=expected_columns)
|
||||
|
||||
assert Path(lookahead_conf['lookahead_analysis_exportfilename']).exists()
|
||||
assert expected_data1.equals(saved_data1)
|
||||
|
||||
# 2nd check: update the same strategy (which internally changed or is being retested)
|
||||
expected_values2 = [
|
||||
[
|
||||
'file1.py', 'strat1', False,
|
||||
22, 21, 20,
|
||||
"falseIndicator3,falseIndicator4"
|
||||
],
|
||||
]
|
||||
expected_data2 = pd.DataFrame(expected_values2, columns=expected_columns)
|
||||
|
||||
analysis2 = Analysis()
|
||||
analysis2.has_bias = False
|
||||
analysis2.total_signals = 22
|
||||
analysis2.false_entry_signals = 21
|
||||
analysis2.false_exit_signals = 20
|
||||
analysis2.false_indicators.append('falseIndicator3')
|
||||
analysis2.false_indicators.append('falseIndicator4')
|
||||
|
||||
strategy_obj2 = {
|
||||
'name': "strat1",
|
||||
'location': Path("file1.py"),
|
||||
}
|
||||
|
||||
instance2 = LookaheadAnalysis(lookahead_conf, strategy_obj2)
|
||||
instance2.failed_bias_check = False
|
||||
instance2.current_analysis = analysis2
|
||||
|
||||
LookaheadAnalysisSubFunctions.export_to_csv(lookahead_conf, [instance2])
|
||||
saved_data2 = pd.read_csv(lookahead_conf['lookahead_analysis_exportfilename'])
|
||||
|
||||
assert expected_data2.equals(saved_data2)
|
||||
|
||||
# 3rd check: now we add a new row to an already existing file
|
||||
expected_values3 = [
|
||||
[
|
||||
'file1.py', 'strat1', False,
|
||||
22, 21, 20,
|
||||
"falseIndicator3,falseIndicator4"
|
||||
],
|
||||
[
|
||||
'file3.py', 'strat3', True,
|
||||
32, 31, 30, "falseIndicator5,falseIndicator6"
|
||||
],
|
||||
]
|
||||
|
||||
expected_data3 = pd.DataFrame(expected_values3, columns=expected_columns)
|
||||
|
||||
analysis3 = Analysis()
|
||||
analysis3.has_bias = True
|
||||
analysis3.total_signals = 32
|
||||
analysis3.false_entry_signals = 31
|
||||
analysis3.false_exit_signals = 30
|
||||
analysis3.false_indicators.append('falseIndicator5')
|
||||
analysis3.false_indicators.append('falseIndicator6')
|
||||
lookahead_conf['lookahead_analysis_exportfilename'] = "temp_csv_lookahead_analysis.csv"
|
||||
|
||||
strategy_obj3 = {
|
||||
'name': "strat3",
|
||||
'location': Path("file3.py"),
|
||||
}
|
||||
|
||||
instance3 = LookaheadAnalysis(lookahead_conf, strategy_obj3)
|
||||
instance3.failed_bias_check = False
|
||||
instance3.current_analysis = analysis3
|
||||
|
||||
LookaheadAnalysisSubFunctions.export_to_csv(lookahead_conf, [instance3])
|
||||
saved_data3 = pd.read_csv(lookahead_conf['lookahead_analysis_exportfilename'])
|
||||
assert expected_data3.equals(saved_data3)
|
||||
|
||||
# remove csv file after the test is done
|
||||
if Path(lookahead_conf['lookahead_analysis_exportfilename']).exists():
|
||||
Path(lookahead_conf['lookahead_analysis_exportfilename']).unlink()
|
||||
|
||||
|
||||
def test_initialize_single_lookahead_analysis(lookahead_conf, mocker, caplog):
|
||||
mocker.patch('freqtrade.data.history.get_timerange', get_timerange)
|
||||
mocker.patch(f'{EXMS}.get_fee', return_value=0.0)
|
||||
mocker.patch(f'{EXMS}.get_min_pair_stake_amount', return_value=0.00001)
|
||||
mocker.patch(f'{EXMS}.get_max_pair_stake_amount', return_value=float('inf'))
|
||||
patch_exchange(mocker)
|
||||
mocker.patch('freqtrade.plugins.pairlistmanager.PairListManager.whitelist',
|
||||
PropertyMock(return_value=['UNITTEST/BTC']))
|
||||
lookahead_conf['pairs'] = ['UNITTEST/USDT']
|
||||
|
||||
lookahead_conf['timeframe'] = '5m'
|
||||
lookahead_conf['timerange'] = '20180119-20180122'
|
||||
start_mock = mocker.patch('freqtrade.optimize.lookahead_analysis.LookaheadAnalysis.start')
|
||||
strategy_obj = {
|
||||
'name': "strategy_test_v3_with_lookahead_bias",
|
||||
'location': Path(lookahead_conf['strategy_path'], f"{lookahead_conf['strategy']}.py")
|
||||
}
|
||||
|
||||
instance = LookaheadAnalysisSubFunctions.initialize_single_lookahead_analysis(
|
||||
lookahead_conf, strategy_obj)
|
||||
assert log_has_re(r"Bias test of .* started\.", caplog)
|
||||
assert start_mock.call_count == 1
|
||||
|
||||
assert instance.strategy_obj['name'] == "strategy_test_v3_with_lookahead_bias"
|
||||
|
||||
|
||||
@pytest.mark.parametrize('scenario', [
|
||||
'no_bias', 'bias1'
|
||||
])
|
||||
def test_biased_strategy(lookahead_conf, mocker, caplog, scenario) -> None:
|
||||
mocker.patch('freqtrade.data.history.get_timerange', get_timerange)
|
||||
mocker.patch(f'{EXMS}.get_fee', return_value=0.0)
|
||||
mocker.patch(f'{EXMS}.get_min_pair_stake_amount', return_value=0.00001)
|
||||
mocker.patch(f'{EXMS}.get_max_pair_stake_amount', return_value=float('inf'))
|
||||
patch_exchange(mocker)
|
||||
mocker.patch('freqtrade.plugins.pairlistmanager.PairListManager.whitelist',
|
||||
PropertyMock(return_value=['UNITTEST/BTC']))
|
||||
lookahead_conf['pairs'] = ['UNITTEST/USDT']
|
||||
|
||||
lookahead_conf['timeframe'] = '5m'
|
||||
lookahead_conf['timerange'] = '20180119-20180122'
|
||||
|
||||
# Patch scenario Parameter to allow for easy selection
|
||||
mocker.patch('freqtrade.strategy.hyper.HyperStrategyMixin.load_params_from_file',
|
||||
return_value={
|
||||
'params': {
|
||||
"buy": {
|
||||
"scenario": scenario
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
strategy_obj = {'name': "strategy_test_v3_with_lookahead_bias"}
|
||||
instance = LookaheadAnalysis(lookahead_conf, strategy_obj)
|
||||
instance.start()
|
||||
# Assert init correct
|
||||
assert log_has_re(f"Strategy Parameter: scenario = {scenario}", caplog)
|
||||
|
||||
# check non-biased strategy
|
||||
if scenario == "no_bias":
|
||||
assert not instance.current_analysis.has_bias
|
||||
# check biased strategy
|
||||
elif scenario == "bias1":
|
||||
assert instance.current_analysis.has_bias
|
||||
|
||||
|
||||
def test_config_overrides(lookahead_conf):
|
||||
lookahead_conf['max_open_trades'] = 0
|
||||
lookahead_conf['dry_run_wallet'] = 1
|
||||
lookahead_conf['pairs'] = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT']
|
||||
lookahead_conf = LookaheadAnalysisSubFunctions.calculate_config_overrides(lookahead_conf)
|
||||
|
||||
assert lookahead_conf['dry_run_wallet'] == 1000000000
|
||||
assert lookahead_conf['max_open_trades'] == 3
|
||||
@@ -0,0 +1,58 @@
|
||||
# pragma pylint: disable=missing-docstring, invalid-name, pointless-string-statement
|
||||
from pandas import DataFrame
|
||||
from technical.indicators import ichimoku
|
||||
|
||||
from freqtrade.strategy import IStrategy
|
||||
from freqtrade.strategy.parameters import CategoricalParameter
|
||||
|
||||
|
||||
class strategy_test_v3_with_lookahead_bias(IStrategy):
|
||||
INTERFACE_VERSION = 3
|
||||
|
||||
# Minimal ROI designed for the strategy
|
||||
minimal_roi = {
|
||||
"40": 0.0,
|
||||
"30": 0.01,
|
||||
"20": 0.02,
|
||||
"0": 0.04
|
||||
}
|
||||
|
||||
# Optimal stoploss designed for the strategy
|
||||
stoploss = -0.10
|
||||
|
||||
# Optimal timeframe for the strategy
|
||||
timeframe = '5m'
|
||||
scenario = CategoricalParameter(['no_bias', 'bias1'], default='bias1', space="buy")
|
||||
|
||||
# Number of candles the strategy requires before producing valid signals
|
||||
startup_candle_count: int = 20
|
||||
|
||||
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||||
# bias is introduced here
|
||||
if self.scenario.value != 'no_bias':
|
||||
ichi = ichimoku(dataframe,
|
||||
conversion_line_period=20,
|
||||
base_line_periods=60,
|
||||
laggin_span=120,
|
||||
displacement=30)
|
||||
dataframe['chikou_span'] = ichi['chikou_span']
|
||||
|
||||
return dataframe
|
||||
|
||||
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||||
if self.scenario.value == 'no_bias':
|
||||
dataframe.loc[dataframe['close'].shift(10) < dataframe['close'], 'enter_long'] = 1
|
||||
else:
|
||||
dataframe.loc[dataframe['close'].shift(-10) > dataframe['close'], 'enter_long'] = 1
|
||||
|
||||
return dataframe
|
||||
|
||||
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||||
if self.scenario.value == 'no_bias':
|
||||
dataframe.loc[
|
||||
dataframe['close'].shift(10) < dataframe['close'], 'exit'] = 1
|
||||
else:
|
||||
dataframe.loc[
|
||||
dataframe['close'].shift(-10) > dataframe['close'], 'exit'] = 1
|
||||
|
||||
return dataframe
|
||||
@@ -7,6 +7,8 @@ import pytest
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.loggers import (FTBufferingHandler, FTStdErrStreamHandler, set_loggers,
|
||||
setup_logging, setup_logging_pre)
|
||||
from freqtrade.loggers.set_log_levels import (reduce_verbosity_for_bias_tester,
|
||||
restore_verbosity_for_bias_tester)
|
||||
|
||||
|
||||
def test_set_loggers() -> None:
|
||||
@@ -128,3 +130,21 @@ def test_set_loggers_journald_importerror(import_fails):
|
||||
match=r'You need the cysystemd python package.*'):
|
||||
setup_logging(config)
|
||||
logger.handlers = orig_handlers
|
||||
|
||||
|
||||
def test_reduce_verbosity():
|
||||
setup_logging_pre()
|
||||
reduce_verbosity_for_bias_tester()
|
||||
prior_level = logging.getLogger('freqtrade').getEffectiveLevel()
|
||||
|
||||
assert logging.getLogger('freqtrade.resolvers').getEffectiveLevel() == logging.WARNING
|
||||
assert logging.getLogger('freqtrade.strategy.hyper').getEffectiveLevel() == logging.WARNING
|
||||
# base level wasn't changed
|
||||
assert logging.getLogger('freqtrade').getEffectiveLevel() == prior_level
|
||||
|
||||
restore_verbosity_for_bias_tester()
|
||||
|
||||
assert logging.getLogger('freqtrade.resolvers').getEffectiveLevel() == prior_level
|
||||
assert logging.getLogger('freqtrade.strategy.hyper').getEffectiveLevel() == prior_level
|
||||
assert logging.getLogger('freqtrade').getEffectiveLevel() == prior_level
|
||||
# base level wasn't changed
|
||||
|
||||
Reference in New Issue
Block a user