diff --git a/freqtrade/strategy/hyper.py b/freqtrade/strategy/hyper.py index d38110a2a..8362b754a 100644 --- a/freqtrade/strategy/hyper.py +++ b/freqtrade/strategy/hyper.py @@ -2,6 +2,7 @@ IHyperStrategy interface, hyperoptable Parameter class. This module defines a base class for auto-hyperoptable strategies. """ + import logging from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union @@ -32,20 +33,22 @@ class HyperStrategyMixin: self.ft_protection_params: List[BaseParameter] = [] params = self.load_params_from_file() - params = params.get('params', {}) + params = params.get("params", {}) self._ft_params_from_file = params # Init/loading of parameters is done as part of ft_bot_start(). def enumerate_parameters( - self, category: Optional[str] = None) -> Iterator[Tuple[str, BaseParameter]]: + self, category: Optional[str] = None + ) -> Iterator[Tuple[str, BaseParameter]]: """ Find all optimizable parameters and return (name, attr) iterator. :param category: :return: """ - if category not in ('buy', 'sell', 'protection', None): + if category not in ("buy", "sell", "protection", None): raise OperationalException( - 'Category must be one of: "buy", "sell", "protection", None.') + 'Category must be one of: "buy", "sell", "protection", None.' + ) if category is None: params = self.ft_buy_params + self.ft_sell_params + self.ft_protection_params @@ -57,15 +60,13 @@ class HyperStrategyMixin: @classmethod def detect_all_parameters(cls) -> Dict: - """ Detect all parameters and return them as a list""" + """Detect all parameters and return them as a list""" params: Dict[str, Any] = { - 'buy': list(detect_parameters(cls, 'buy')), - 'sell': list(detect_parameters(cls, 'sell')), - 'protection': list(detect_parameters(cls, 'protection')), + "buy": list(detect_parameters(cls, "buy")), + "sell": list(detect_parameters(cls, "sell")), + "protection": list(detect_parameters(cls, "protection")), } - params.update({ - 'count': len(params['buy'] + params['sell'] + params['protection']) - }) + params.update({"count": len(params["buy"] + params["sell"] + params["protection"])}) return params @@ -77,23 +78,28 @@ class HyperStrategyMixin: if self._ft_params_from_file: # Set parameters from Hyperopt results file params = self._ft_params_from_file - self.minimal_roi = params.get('roi', getattr(self, 'minimal_roi', {})) + self.minimal_roi = params.get("roi", getattr(self, "minimal_roi", {})) - self.stoploss = params.get('stoploss', {}).get( - 'stoploss', getattr(self, 'stoploss', -0.1)) - self.max_open_trades = params.get('max_open_trades', {}).get( - 'max_open_trades', getattr(self, 'max_open_trades', -1)) - trailing = params.get('trailing', {}) + self.stoploss = params.get("stoploss", {}).get( + "stoploss", getattr(self, "stoploss", -0.1) + ) + self.max_open_trades = params.get("max_open_trades", {}).get( + "max_open_trades", getattr(self, "max_open_trades", -1) + ) + trailing = params.get("trailing", {}) self.trailing_stop = trailing.get( - 'trailing_stop', getattr(self, 'trailing_stop', False)) + "trailing_stop", getattr(self, "trailing_stop", False) + ) self.trailing_stop_positive = trailing.get( - 'trailing_stop_positive', getattr(self, 'trailing_stop_positive', None)) + "trailing_stop_positive", getattr(self, "trailing_stop_positive", None) + ) self.trailing_stop_positive_offset = trailing.get( - 'trailing_stop_positive_offset', - getattr(self, 'trailing_stop_positive_offset', 0)) + "trailing_stop_positive_offset", getattr(self, "trailing_stop_positive_offset", 0) + ) self.trailing_only_offset_is_reached = trailing.get( - 'trailing_only_offset_is_reached', - getattr(self, 'trailing_only_offset_is_reached', 0.0)) + "trailing_only_offset_is_reached", + getattr(self, "trailing_only_offset_is_reached", 0.0), + ) def ft_load_hyper_params(self, hyperopt: bool = False) -> None: """ @@ -104,29 +110,32 @@ class HyperStrategyMixin: * Parameter defaults """ - buy_params = deep_merge_dicts(self._ft_params_from_file.get('buy', {}), - getattr(self, 'buy_params', {})) - sell_params = deep_merge_dicts(self._ft_params_from_file.get('sell', {}), - getattr(self, 'sell_params', {})) - protection_params = deep_merge_dicts(self._ft_params_from_file.get('protection', {}), - getattr(self, 'protection_params', {})) + buy_params = deep_merge_dicts( + self._ft_params_from_file.get("buy", {}), getattr(self, "buy_params", {}) + ) + sell_params = deep_merge_dicts( + self._ft_params_from_file.get("sell", {}), getattr(self, "sell_params", {}) + ) + protection_params = deep_merge_dicts( + self._ft_params_from_file.get("protection", {}), getattr(self, "protection_params", {}) + ) - self._ft_load_params(buy_params, 'buy', hyperopt) - self._ft_load_params(sell_params, 'sell', hyperopt) - self._ft_load_params(protection_params, 'protection', hyperopt) + self._ft_load_params(buy_params, "buy", hyperopt) + self._ft_load_params(sell_params, "sell", hyperopt) + self._ft_load_params(protection_params, "protection", hyperopt) def load_params_from_file(self) -> Dict: - filename_str = getattr(self, '__file__', '') + filename_str = getattr(self, "__file__", "") if not filename_str: return {} - filename = Path(filename_str).with_suffix('.json') + filename = Path(filename_str).with_suffix(".json") if filename.is_file(): logger.info(f"Loading parameters from file {filename}") try: params = HyperoptTools.load_params(filename) - if params.get('strategy_name') != self.__class__.__name__: - raise OperationalException('Invalid parameter file provided.') + if params.get("strategy_name") != self.__class__.__name__: + raise OperationalException("Invalid parameter file provided.") return params except ValueError: logger.warning("Invalid parameter file format.") @@ -155,21 +164,23 @@ class HyperStrategyMixin: if params and attr_name in params: if attr.load: attr.value = params[attr_name] - logger.info(f'Strategy Parameter: {attr_name} = {attr.value}') + logger.info(f"Strategy Parameter: {attr_name} = {attr.value}") else: - logger.warning(f'Parameter "{attr_name}" exists, but is disabled. ' - f'Default value "{attr.value}" used.') + logger.warning( + f'Parameter "{attr_name}" exists, but is disabled. ' + f'Default value "{attr.value}" used.' + ) else: - logger.info(f'Strategy Parameter(default): {attr_name} = {attr.value}') + logger.info(f"Strategy Parameter(default): {attr_name} = {attr.value}") def get_no_optimize_params(self) -> Dict[str, Dict]: """ Returns list of Parameters that are not part of the current optimize job """ params: Dict[str, Dict] = { - 'buy': {}, - 'sell': {}, - 'protection': {}, + "buy": {}, + "sell": {}, + "protection": {}, } for name, p in self.enumerate_parameters(): if p.category and (not p.optimize or not p.in_space): @@ -178,23 +189,27 @@ class HyperStrategyMixin: def detect_parameters( - obj: Union[HyperStrategyMixin, Type[HyperStrategyMixin]], - category: str - ) -> Iterator[Tuple[str, BaseParameter]]: + obj: Union[HyperStrategyMixin, Type[HyperStrategyMixin]], category: str +) -> Iterator[Tuple[str, BaseParameter]]: """ Detect all parameters for 'category' for "obj" :param obj: Strategy object or class :param category: category - usually `'buy', 'sell', 'protection',... """ for attr_name in dir(obj): - if not attr_name.startswith('__'): # Ignore internals, not strictly necessary. + if not attr_name.startswith("__"): # Ignore internals, not strictly necessary. attr = getattr(obj, attr_name) if issubclass(attr.__class__, BaseParameter): - if (attr_name.startswith(category + '_') - and attr.category is not None and attr.category != category): + if ( + attr_name.startswith(category + "_") + and attr.category is not None + and attr.category != category + ): raise OperationalException( - f'Inconclusive parameter name {attr_name}, category: {attr.category}.') + f"Inconclusive parameter name {attr_name}, category: {attr.category}." + ) - if (category == attr.category or - (attr_name.startswith(category + '_') and attr.category is None)): + if category == attr.category or ( + attr_name.startswith(category + "_") and attr.category is None + ): yield attr_name, attr diff --git a/freqtrade/strategy/informative_decorator.py b/freqtrade/strategy/informative_decorator.py index 6e44a7e20..12f4281d2 100644 --- a/freqtrade/strategy/informative_decorator.py +++ b/freqtrade/strategy/informative_decorator.py @@ -20,11 +20,14 @@ class InformativeData: candle_type: Optional[CandleType] -def informative(timeframe: str, asset: str = '', - fmt: Optional[Union[str, Callable[[Any], str]]] = None, - *, - candle_type: Optional[Union[CandleType, str]] = None, - ffill: bool = True) -> Callable[[PopulateIndicators], PopulateIndicators]: +def informative( + timeframe: str, + asset: str = "", + fmt: Optional[Union[str, Callable[[Any], str]]] = None, + *, + candle_type: Optional[Union[CandleType, str]] = None, + ffill: bool = True, +) -> Callable[[PopulateIndicators], PopulateIndicators]: """ A decorator for populate_indicators_Nn(self, dataframe, metadata), allowing these functions to define informative indicators. @@ -62,38 +65,43 @@ def informative(timeframe: str, asset: str = '', _candle_type = CandleType.from_string(candle_type) if candle_type else None def decorator(fn: PopulateIndicators): - informative_pairs = getattr(fn, '_ft_informative', []) + informative_pairs = getattr(fn, "_ft_informative", []) informative_pairs.append(InformativeData(_asset, _timeframe, _fmt, _ffill, _candle_type)) - setattr(fn, '_ft_informative', informative_pairs) # noqa: B010 + setattr(fn, "_ft_informative", informative_pairs) # noqa: B010 return fn + return decorator def __get_pair_formats(market: Optional[Dict[str, Any]]) -> Dict[str, str]: if not market: return {} - base = market['base'] - quote = market['quote'] + base = market["base"] + quote = market["quote"] return { - 'base': base.lower(), - 'BASE': base.upper(), - 'quote': quote.lower(), - 'QUOTE': quote.upper(), + "base": base.lower(), + "BASE": base.upper(), + "quote": quote.lower(), + "QUOTE": quote.upper(), } def _format_pair_name(config, pair: str, market: Optional[Dict[str, Any]] = None) -> str: return pair.format( - stake_currency=config['stake_currency'], - stake=config['stake_currency'], + stake_currency=config["stake_currency"], + stake=config["stake_currency"], **__get_pair_formats(market), ).upper() -def _create_and_merge_informative_pair(strategy, dataframe: DataFrame, metadata: dict, - inf_data: InformativeData, - populate_indicators: PopulateIndicators): - asset = inf_data.asset or '' +def _create_and_merge_informative_pair( + strategy, + dataframe: DataFrame, + metadata: dict, + inf_data: InformativeData, + populate_indicators: PopulateIndicators, +): + asset = inf_data.asset or "" timeframe = inf_data.timeframe fmt = inf_data.fmt candle_type = inf_data.candle_type @@ -102,15 +110,15 @@ def _create_and_merge_informative_pair(strategy, dataframe: DataFrame, metadata: if asset: # Insert stake currency if needed. - market1 = strategy.dp.market(metadata['pair']) + market1 = strategy.dp.market(metadata["pair"]) asset = _format_pair_name(config, asset, market1) else: # Not specifying an asset will define informative dataframe for current pair. - asset = metadata['pair'] + asset = metadata["pair"] market = strategy.dp.market(asset) if market is None: - raise OperationalException(f'Market {asset} is not available.') + raise OperationalException(f"Market {asset} is not available.") # Default format. This optimizes for the common case: informative pairs using same stake # currency. When quote currency matches stake currency, column name will omit base currency. @@ -118,33 +126,40 @@ def _create_and_merge_informative_pair(strategy, dataframe: DataFrame, metadata: # where it is desired to keep quote currency in column name at all times user should specify # fmt='{base}_{quote}_{column}_{timeframe}' format or similar. if not fmt: - fmt = '{column}_{timeframe}' # Informatives of current pair + fmt = "{column}_{timeframe}" # Informatives of current pair if inf_data.asset: - fmt = '{base}_{quote}_' + fmt # Informatives of other pairs + fmt = "{base}_{quote}_" + fmt # Informatives of other pairs - inf_metadata = {'pair': asset, 'timeframe': timeframe} + inf_metadata = {"pair": asset, "timeframe": timeframe} inf_dataframe = strategy.dp.get_pair_dataframe(asset, timeframe, candle_type) inf_dataframe = populate_indicators(strategy, inf_dataframe, inf_metadata) formatter: Any = None if callable(fmt): - formatter = fmt # A custom user-specified formatter function. + formatter = fmt # A custom user-specified formatter function. else: - formatter = fmt.format # A default string formatter. + formatter = fmt.format # A default string formatter. fmt_args = { **__get_pair_formats(market), - 'asset': asset, - 'timeframe': timeframe, + "asset": asset, + "timeframe": timeframe, } - inf_dataframe.rename(columns=lambda column: formatter(column=column, **fmt_args), - inplace=True) + inf_dataframe.rename(columns=lambda column: formatter(column=column, **fmt_args), inplace=True) - date_column = formatter(column='date', **fmt_args) + date_column = formatter(column="date", **fmt_args) if date_column in dataframe.columns: - raise OperationalException(f'Duplicate column name {date_column} exists in ' - f'dataframe! Ensure column names are unique!') - dataframe = merge_informative_pair(dataframe, inf_dataframe, strategy.timeframe, timeframe, - ffill=inf_data.ffill, append_timeframe=False, - date_column=date_column) + raise OperationalException( + f"Duplicate column name {date_column} exists in " + f"dataframe! Ensure column names are unique!" + ) + dataframe = merge_informative_pair( + dataframe, + inf_dataframe, + strategy.timeframe, + timeframe, + ffill=inf_data.ffill, + append_timeframe=False, + date_column=date_column, + ) return dataframe diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index ea8e6ce6a..e9152579b 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -2,6 +2,7 @@ IStrategy interface This module defines the interface to apply for strategies """ + import logging from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone @@ -51,6 +52,7 @@ class IStrategy(ABC, HyperStrategyMixin): stoploss -> float: optimal stoploss designed for the strategy timeframe -> str: value of the timeframe to use with the strategy """ + # Strategy interface version # Default to version 2 # Version 1 is the initial interface without metadata dict - deprecated and no longer supported. @@ -66,7 +68,7 @@ class IStrategy(ABC, HyperStrategyMixin): stoploss: float # max open trades for the strategy - max_open_trades: IntOrInf + max_open_trades: IntOrInf # trailing stoploss trailing_stop: bool = False @@ -83,17 +85,17 @@ class IStrategy(ABC, HyperStrategyMixin): # Optional order types order_types: Dict = { - 'entry': 'limit', - 'exit': 'limit', - 'stoploss': 'limit', - 'stoploss_on_exchange': False, - 'stoploss_on_exchange_interval': 60, + "entry": "limit", + "exit": "limit", + "stoploss": "limit", + "stoploss_on_exchange": False, + "stoploss_on_exchange_interval": 60, } # Optional time in force order_time_in_force: Dict = { - 'entry': 'GTC', - 'exit': 'GTC', + "entry": "GTC", + "exit": "GTC", } # run "populate_indicators" only for new candle @@ -128,7 +130,7 @@ class IStrategy(ABC, HyperStrategyMixin): # Filled from configuration stake_currency: str # container variable for strategy source code - __source__: str = '' + __source__: str = "" # Definition of plot_config. See plotting documentation for more details. plot_config: Dict = {} @@ -148,7 +150,7 @@ class IStrategy(ABC, HyperStrategyMixin): cls_method = getattr(self.__class__, attr_name) if not callable(cls_method): continue - informative_data_list = getattr(cls_method, '_ft_informative', None) + informative_data_list = getattr(cls_method, "_ft_informative", None) if not isinstance(informative_data_list, list): # Type check is required because mocker would return a mock object that evaluates to # True, confusing this code. @@ -156,22 +158,24 @@ class IStrategy(ABC, HyperStrategyMixin): strategy_timeframe_minutes = timeframe_to_minutes(self.timeframe) for informative_data in informative_data_list: if timeframe_to_minutes(informative_data.timeframe) < strategy_timeframe_minutes: - raise OperationalException('Informative timeframe must be equal or higher than ' - 'strategy timeframe!') + raise OperationalException( + "Informative timeframe must be equal or higher than strategy timeframe!" + ) if not informative_data.candle_type: - informative_data.candle_type = config['candle_type_def'] + informative_data.candle_type = config["candle_type_def"] self._ft_informative.append((informative_data, cls_method)) def load_freqAI_model(self) -> None: - if self.config.get('freqai', {}).get('enabled', False): + if self.config.get("freqai", {}).get("enabled", False): # Import here to avoid importing this if freqAI is disabled from freqtrade.freqai.utils import download_all_data_for_training from freqtrade.resolvers.freqaimodel_resolver import FreqaiModelResolver + self.freqai = FreqaiModelResolver.load_freqaimodel(self.config) self.freqai_info = self.config["freqai"] # download the desired data in dry/live - if self.config.get('runmode') in (RunMode.DRY_RUN, RunMode.LIVE): + if self.config.get("runmode") in (RunMode.DRY_RUN, RunMode.LIVE): logger.info( "Downloading all training data for all pairs in whitelist and " "corr_pairlist, this may take a while if the data is not " @@ -183,8 +187,9 @@ class IStrategy(ABC, HyperStrategyMixin): class DummyClass: def start(self, *args, **kwargs): raise OperationalException( - 'freqAI is not enabled. ' - 'Please enable it in your config to use this strategy.') + "freqAI is not enabled. " + "Please enable it in your config to use this strategy." + ) def shutdown(self, *args, **kwargs): pass @@ -200,7 +205,7 @@ class IStrategy(ABC, HyperStrategyMixin): strategy_safe_wrapper(self.bot_start)() - self.ft_load_hyper_params(self.config.get('runmode') == RunMode.HYPEROPT) + self.ft_load_hyper_params(self.config.get("runmode") == RunMode.HYPEROPT) def ft_bot_cleanup(self) -> None: """ @@ -272,15 +277,17 @@ class IStrategy(ABC, HyperStrategyMixin): """ pass - def check_buy_timeout(self, pair: str, trade: Trade, order: Order, - current_time: datetime, **kwargs) -> bool: + def check_buy_timeout( + self, pair: str, trade: Trade, order: Order, current_time: datetime, **kwargs + ) -> bool: """ DEPRECATED: Please use `check_entry_timeout` instead. """ return False - def check_entry_timeout(self, pair: str, trade: Trade, order: Order, - current_time: datetime, **kwargs) -> bool: + def check_entry_timeout( + self, pair: str, trade: Trade, order: Order, current_time: datetime, **kwargs + ) -> bool: """ Check entry timeout function callback. This method can be used to override the entry-timeout. @@ -298,17 +305,20 @@ class IStrategy(ABC, HyperStrategyMixin): :return bool: When True is returned, then the entry order is cancelled. """ return self.check_buy_timeout( - pair=pair, trade=trade, order=order, current_time=current_time) + pair=pair, trade=trade, order=order, current_time=current_time + ) - def check_sell_timeout(self, pair: str, trade: Trade, order: Order, - current_time: datetime, **kwargs) -> bool: + def check_sell_timeout( + self, pair: str, trade: Trade, order: Order, current_time: datetime, **kwargs + ) -> bool: """ DEPRECATED: Please use `check_exit_timeout` instead. """ return False - def check_exit_timeout(self, pair: str, trade: Trade, order: Order, - current_time: datetime, **kwargs) -> bool: + def check_exit_timeout( + self, pair: str, trade: Trade, order: Order, current_time: datetime, **kwargs + ) -> bool: """ Check exit timeout function callback. This method can be used to override the exit-timeout. @@ -326,11 +336,21 @@ class IStrategy(ABC, HyperStrategyMixin): :return bool: When True is returned, then the exit-order is cancelled. """ return self.check_sell_timeout( - pair=pair, trade=trade, order=order, current_time=current_time) + pair=pair, trade=trade, order=order, current_time=current_time + ) - def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, - time_in_force: str, current_time: datetime, entry_tag: Optional[str], - side: str, **kwargs) -> bool: + def confirm_trade_entry( + self, + pair: str, + order_type: str, + amount: float, + rate: float, + time_in_force: str, + current_time: datetime, + entry_tag: Optional[str], + side: str, + **kwargs, + ) -> bool: """ Called right before placing a entry order. Timing for this function is critical, so avoid doing heavy computations or @@ -355,9 +375,18 @@ class IStrategy(ABC, HyperStrategyMixin): """ return True - def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, - rate: float, time_in_force: str, exit_reason: str, - current_time: datetime, **kwargs) -> bool: + def confirm_trade_exit( + self, + pair: str, + trade: Trade, + order_type: str, + amount: float, + rate: float, + time_in_force: str, + exit_reason: str, + current_time: datetime, + **kwargs, + ) -> bool: """ Called right before placing a regular exit order. Timing for this function is critical, so avoid doing heavy computations or @@ -384,8 +413,9 @@ class IStrategy(ABC, HyperStrategyMixin): """ return True - def order_filled(self, pair: str, trade: Trade, order: Order, - current_time: datetime, **kwargs) -> None: + def order_filled( + self, pair: str, trade: Trade, order: Order, current_time: datetime, **kwargs + ) -> None: """ Called right after an order fills. Will be called for all order types (entry, exit, stoploss, position adjustment). @@ -397,8 +427,16 @@ class IStrategy(ABC, HyperStrategyMixin): """ pass - def custom_stoploss(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, - current_profit: float, after_fill: bool, **kwargs) -> Optional[float]: + def custom_stoploss( + self, + pair: str, + trade: Trade, + current_time: datetime, + current_rate: float, + current_profit: float, + after_fill: bool, + **kwargs, + ) -> Optional[float]: """ Custom stoploss logic, returning the new distance relative to current_rate (as ratio). e.g. returning -0.05 would create a stoploss 5% below current_rate. @@ -420,9 +458,16 @@ class IStrategy(ABC, HyperStrategyMixin): """ return self.stoploss - def custom_entry_price(self, pair: str, trade: Optional[Trade], - current_time: datetime, proposed_rate: float, - entry_tag: Optional[str], side: str, **kwargs) -> float: + def custom_entry_price( + self, + pair: str, + trade: Optional[Trade], + current_time: datetime, + proposed_rate: float, + entry_tag: Optional[str], + side: str, + **kwargs, + ) -> float: """ Custom entry price logic, returning the new entry price. @@ -441,9 +486,16 @@ class IStrategy(ABC, HyperStrategyMixin): """ return proposed_rate - def custom_exit_price(self, pair: str, trade: Trade, - current_time: datetime, proposed_rate: float, - current_profit: float, exit_tag: Optional[str], **kwargs) -> float: + def custom_exit_price( + self, + pair: str, + trade: Trade, + current_time: datetime, + proposed_rate: float, + current_profit: float, + exit_tag: Optional[str], + **kwargs, + ) -> float: """ Custom exit price logic, returning the new exit price. @@ -462,8 +514,15 @@ class IStrategy(ABC, HyperStrategyMixin): """ return proposed_rate - def custom_sell(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, - current_profit: float, **kwargs) -> Optional[Union[str, bool]]: + def custom_sell( + self, + pair: str, + trade: Trade, + current_time: datetime, + current_rate: float, + current_profit: float, + **kwargs, + ) -> Optional[Union[str, bool]]: """ DEPRECATED - please use custom_exit instead. Custom exit signal logic indicating that specified position should be sold. Returning a @@ -487,8 +546,15 @@ class IStrategy(ABC, HyperStrategyMixin): """ return None - def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, - current_profit: float, **kwargs) -> Optional[Union[str, bool]]: + def custom_exit( + self, + pair: str, + trade: Trade, + current_time: datetime, + current_rate: float, + current_profit: float, + **kwargs, + ) -> Optional[Union[str, bool]]: """ Custom exit signal logic indicating that specified position should be sold. Returning a string or True from this method is equal to setting exit signal on a candle at specified @@ -511,10 +577,19 @@ class IStrategy(ABC, HyperStrategyMixin): """ return self.custom_sell(pair, trade, current_time, current_rate, current_profit, **kwargs) - def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, - proposed_stake: float, min_stake: Optional[float], max_stake: float, - leverage: float, entry_tag: Optional[str], side: str, - **kwargs) -> float: + def custom_stake_amount( + self, + pair: str, + current_time: datetime, + current_rate: float, + proposed_stake: float, + min_stake: Optional[float], + max_stake: float, + leverage: float, + entry_tag: Optional[str], + side: str, + **kwargs, + ) -> float: """ Customize stake size for each new trade. @@ -531,13 +606,20 @@ class IStrategy(ABC, HyperStrategyMixin): """ return proposed_stake - def adjust_trade_position(self, trade: Trade, current_time: datetime, - current_rate: float, current_profit: float, - min_stake: Optional[float], max_stake: float, - current_entry_rate: float, current_exit_rate: float, - current_entry_profit: float, current_exit_profit: float, - **kwargs - ) -> Union[Optional[float], Tuple[Optional[float], Optional[str]]]: + def adjust_trade_position( + self, + trade: Trade, + current_time: datetime, + current_rate: float, + current_profit: float, + min_stake: Optional[float], + max_stake: float, + current_entry_rate: float, + current_exit_rate: float, + current_entry_profit: float, + current_exit_profit: float, + **kwargs, + ) -> Union[Optional[float], Tuple[Optional[float], Optional[str]]]: """ Custom trade adjustment logic, returning the stake amount that a trade should be increased or decreased. @@ -567,9 +649,18 @@ class IStrategy(ABC, HyperStrategyMixin): """ return None - def adjust_entry_price(self, trade: Trade, order: Optional[Order], pair: str, - current_time: datetime, proposed_rate: float, current_order_rate: float, - entry_tag: Optional[str], side: str, **kwargs) -> float: + def adjust_entry_price( + self, + trade: Trade, + order: Optional[Order], + pair: str, + current_time: datetime, + proposed_rate: float, + current_order_rate: float, + entry_tag: Optional[str], + side: str, + **kwargs, + ) -> float: """ Entry price re-adjustment logic, returning the user desired limit price. This only executes when a order was already placed, still open (unfilled fully or partially) @@ -595,9 +686,17 @@ class IStrategy(ABC, HyperStrategyMixin): """ return current_order_rate - def leverage(self, pair: str, current_time: datetime, current_rate: float, - proposed_leverage: float, max_leverage: float, entry_tag: Optional[str], - side: str, **kwargs) -> float: + def leverage( + self, + pair: str, + current_time: datetime, + current_rate: float, + proposed_leverage: float, + max_leverage: float, + entry_tag: Optional[str], + side: str, + **kwargs, + ) -> float: """ Customize leverage for each new trade. This method is only called in futures mode. @@ -631,9 +730,14 @@ class IStrategy(ABC, HyperStrategyMixin): """ return None - def populate_any_indicators(self, pair: str, df: DataFrame, tf: str, - informative: Optional[DataFrame] = None, - set_generalized_indicators: bool = False) -> DataFrame: + def populate_any_indicators( + self, + pair: str, + df: DataFrame, + tf: str, + informative: Optional[DataFrame] = None, + set_generalized_indicators: bool = False, + ) -> DataFrame: """ DEPRECATED - USE FEATURE ENGINEERING FUNCTIONS INSTEAD Function designed to automatically generate, name and merge features @@ -648,8 +752,9 @@ class IStrategy(ABC, HyperStrategyMixin): """ return df - def feature_engineering_expand_all(self, dataframe: DataFrame, period: int, - metadata: Dict, **kwargs) -> DataFrame: + def feature_engineering_expand_all( + self, dataframe: DataFrame, period: int, metadata: Dict, **kwargs + ) -> DataFrame: """ *Only functional with FreqAI enabled strategies* This function will automatically expand the defined features on the config defined @@ -676,7 +781,8 @@ class IStrategy(ABC, HyperStrategyMixin): return dataframe def feature_engineering_expand_basic( - self, dataframe: DataFrame, metadata: Dict, **kwargs) -> DataFrame: + self, dataframe: DataFrame, metadata: Dict, **kwargs + ) -> DataFrame: """ *Only functional with FreqAI enabled strategies* This function will automatically expand the defined features on the config defined @@ -706,7 +812,8 @@ class IStrategy(ABC, HyperStrategyMixin): return dataframe def feature_engineering_standard( - self, dataframe: DataFrame, metadata: Dict, **kwargs) -> DataFrame: + self, dataframe: DataFrame, metadata: Dict, **kwargs + ) -> DataFrame: """ *Only functional with FreqAI enabled strategies* This optional function will be called once with the dataframe of the base timeframe. @@ -746,38 +853,50 @@ class IStrategy(ABC, HyperStrategyMixin): """ return dataframe -### -# END - Intended to be overridden by strategy -### + ### + # END - Intended to be overridden by strategy + ### _ft_stop_uses_after_fill = False def _adjust_trade_position_internal( - self, trade: Trade, current_time: datetime, - current_rate: float, current_profit: float, - min_stake: Optional[float], max_stake: float, - current_entry_rate: float, current_exit_rate: float, - current_entry_profit: float, current_exit_profit: float, - **kwargs + self, + trade: Trade, + current_time: datetime, + current_rate: float, + current_profit: float, + min_stake: Optional[float], + max_stake: float, + current_entry_rate: float, + current_exit_rate: float, + current_entry_profit: float, + current_exit_profit: float, + **kwargs, ) -> Tuple[Optional[float], str]: """ wrapper around adjust_trade_position to handle the return value """ - resp = strategy_safe_wrapper(self.adjust_trade_position, - default_retval=(None, ''), supress_error=True)( - trade=trade, current_time=current_time, - current_rate=current_rate, current_profit=current_profit, - min_stake=min_stake, max_stake=max_stake, - current_entry_rate=current_entry_rate, current_exit_rate=current_exit_rate, - current_entry_profit=current_entry_profit, current_exit_profit=current_exit_profit, - **kwargs + resp = strategy_safe_wrapper( + self.adjust_trade_position, default_retval=(None, ""), supress_error=True + )( + trade=trade, + current_time=current_time, + current_rate=current_rate, + current_profit=current_profit, + min_stake=min_stake, + max_stake=max_stake, + current_entry_rate=current_entry_rate, + current_exit_rate=current_exit_rate, + current_entry_profit=current_entry_profit, + current_exit_profit=current_exit_profit, + **kwargs, ) - order_tag = '' + order_tag = "" if isinstance(resp, tuple): if len(resp) >= 1: stake_amount = resp[0] if len(resp) > 1: - order_tag = resp[1] or '' + order_tag = resp[1] or "" else: stake_amount = resp return stake_amount, order_tag @@ -786,9 +905,9 @@ class IStrategy(ABC, HyperStrategyMixin): """ Create informative-pairs needed for FreqAI """ - if self.config.get('freqai', {}).get('enabled', False): + if self.config.get("freqai", {}).get("enabled", False): whitelist_pairs = self.dp.current_whitelist() - candle_type = self.config.get('candle_type_def', CandleType.SPOT) + candle_type = self.config.get("candle_type_def", CandleType.SPOT) corr_pairs = self.config["freqai"]["feature_parameters"]["include_corr_pairlist"] informative_pairs = [] for tf in self.config["freqai"]["feature_parameters"]["include_timeframes"]: @@ -805,17 +924,25 @@ class IStrategy(ABC, HyperStrategyMixin): informative_pairs = self.informative_pairs() # Compatibility code for 2 tuple informative pairs informative_pairs = [ - (p[0], p[1], CandleType.from_string(p[2]) if len( - p) > 2 and p[2] != '' else self.config.get('candle_type_def', CandleType.SPOT)) - for p in informative_pairs] + ( + p[0], + p[1], + CandleType.from_string(p[2]) + if len(p) > 2 and p[2] != "" + else self.config.get("candle_type_def", CandleType.SPOT), + ) + for p in informative_pairs + ] for inf_data, _ in self._ft_informative: # Get default candle type if not provided explicitly. - candle_type = (inf_data.candle_type if inf_data.candle_type - else self.config.get('candle_type_def', CandleType.SPOT)) + candle_type = ( + inf_data.candle_type + if inf_data.candle_type + else self.config.get("candle_type_def", CandleType.SPOT) + ) if inf_data.asset: if any(s in inf_data.asset for s in ("{BASE}", "{base}")): for pair in self.dp.current_whitelist(): - pair_tf = ( _format_pair_name(self.config, inf_data.asset, self.dp.market(pair)), inf_data.timeframe, @@ -842,8 +969,9 @@ class IStrategy(ABC, HyperStrategyMixin): """ return self.__class__.__name__ - def lock_pair(self, pair: str, until: datetime, - reason: Optional[str] = None, side: str = '*') -> None: + def lock_pair( + self, pair: str, until: datetime, reason: Optional[str] = None, side: str = "*" + ) -> None: """ Locks pair until a given timestamp happens. Locked pairs are not analyzed, and are prevented from opening new trades. @@ -875,8 +1003,9 @@ class IStrategy(ABC, HyperStrategyMixin): """ PairLocks.unlock_reason(reason, datetime.now(timezone.utc)) - def is_pair_locked(self, pair: str, *, candle_date: Optional[datetime] = None, - side: str = '*') -> bool: + def is_pair_locked( + self, pair: str, *, candle_date: Optional[datetime] = None, side: str = "*" + ) -> bool: """ Checks if a pair is currently locked The 2nd, optional parameter ensures that locks are applied until the new candle arrives, @@ -919,19 +1048,18 @@ class IStrategy(ABC, HyperStrategyMixin): :param metadata: Metadata dictionary with additional data (e.g. 'pair') :return: DataFrame of candle (OHLCV) data with indicator data and signals added """ - pair = str(metadata.get('pair')) + pair = str(metadata.get("pair")) - new_candle = self._last_candle_seen_per_pair.get(pair, None) != dataframe.iloc[-1]['date'] + new_candle = self._last_candle_seen_per_pair.get(pair, None) != dataframe.iloc[-1]["date"] # Test if seen this pair and last candle before. # always run if process_only_new_candles is set to false if not self.process_only_new_candles or new_candle: - # Defs that only make change on new candle data. dataframe = self.analyze_ticker(dataframe, metadata) - self._last_candle_seen_per_pair[pair] = dataframe.iloc[-1]['date'] + self._last_candle_seen_per_pair[pair] = dataframe.iloc[-1]["date"] - candle_type = self.config.get('candle_type_def', CandleType.SPOT) + candle_type = self.config.get("candle_type_def", CandleType.SPOT) self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type) self.dp._emit_df((pair, self.timeframe, candle_type), dataframe, new_candle) @@ -951,18 +1079,18 @@ class IStrategy(ABC, HyperStrategyMixin): :param pair: Pair to analyze. """ dataframe = self.dp.ohlcv( - pair, self.timeframe, candle_type=self.config.get('candle_type_def', CandleType.SPOT) + pair, self.timeframe, candle_type=self.config.get("candle_type_def", CandleType.SPOT) ) if not isinstance(dataframe, DataFrame) or dataframe.empty: - logger.warning('Empty candle (OHLCV) data for pair %s', pair) + logger.warning("Empty candle (OHLCV) data for pair %s", pair) return try: df_len, df_close, df_date = self.preserve_df(dataframe) - dataframe = strategy_safe_wrapper( - self._analyze_ticker_internal, message="" - )(dataframe, {'pair': pair}) + dataframe = strategy_safe_wrapper(self._analyze_ticker_internal, message="")( + dataframe, {"pair": pair} + ) self.assert_df(dataframe, df_len, df_close, df_date) except StrategyError as error: @@ -970,7 +1098,7 @@ class IStrategy(ABC, HyperStrategyMixin): return if dataframe.empty: - logger.warning('Empty dataframe for pair %s', pair) + logger.warning("Empty dataframe for pair %s", pair) return def analyze(self, pairs: List[str]) -> None: @@ -983,7 +1111,7 @@ class IStrategy(ABC, HyperStrategyMixin): @staticmethod def preserve_df(dataframe: DataFrame) -> Tuple[int, float, datetime]: - """ keep some data for dataframes """ + """keep some data for dataframes""" return len(dataframe), dataframe["close"].iloc[-1], dataframe["date"].iloc[-1] def assert_df(self, dataframe: DataFrame, df_len: int, df_close: float, df_date: datetime): @@ -994,7 +1122,7 @@ class IStrategy(ABC, HyperStrategyMixin): message = "" if dataframe is None: message = "No dataframe returned (return statement missing?)." - elif 'enter_long' not in dataframe: + elif "enter_long" not in dataframe: message = "enter_long/buy column not set." elif df_len != len(dataframe): message = message_template.format("length") @@ -1024,31 +1152,28 @@ class IStrategy(ABC, HyperStrategyMixin): :return: (None, None) or (Dataframe, latest_date) - corresponding to the last candle """ if not isinstance(dataframe, DataFrame) or dataframe.empty: - logger.warning(f'Empty candle (OHLCV) data for pair {pair}') + logger.warning(f"Empty candle (OHLCV) data for pair {pair}") return None, None - latest_date = dataframe['date'].max() - latest = dataframe.loc[dataframe['date'] == latest_date].iloc[-1] + latest_date = dataframe["date"].max() + latest = dataframe.loc[dataframe["date"] == latest_date].iloc[-1] # Explicitly convert to datetime object to ensure the below comparison does not fail latest_date = latest_date.to_pydatetime() # Check if dataframe is out of date timeframe_minutes = timeframe_to_minutes(timeframe) - offset = self.config.get('exchange', {}).get('outdated_offset', 5) + offset = self.config.get("exchange", {}).get("outdated_offset", 5) if latest_date < (dt_now() - timedelta(minutes=timeframe_minutes * 2 + offset)): logger.warning( - 'Outdated history for pair %s. Last tick is %s minutes old', - pair, int((dt_now() - latest_date).total_seconds() // 60) + "Outdated history for pair %s. Last tick is %s minutes old", + pair, + int((dt_now() - latest_date).total_seconds() // 60), ) return None, None return latest, latest_date def get_exit_signal( - self, - pair: str, - timeframe: str, - dataframe: DataFrame, - is_short: Optional[bool] = None + self, pair: str, timeframe: str, dataframe: DataFrame, is_short: Optional[bool] = None ) -> Tuple[bool, bool, Optional[str]]: """ Calculates current exit signal based based on the dataframe @@ -1074,10 +1199,9 @@ class IStrategy(ABC, HyperStrategyMixin): exit_ = latest.get(SignalType.EXIT_LONG.value, 0) == 1 exit_tag = latest.get(SignalTagType.EXIT_TAG.value, None) # Tags can be None, which does not resolve to False. - exit_tag = exit_tag if isinstance(exit_tag, str) and exit_tag != 'nan' else None + exit_tag = exit_tag if isinstance(exit_tag, str) and exit_tag != "nan" else None - logger.debug(f"exit-trigger: {latest['date']} (pair={pair}) " - f"enter={enter} exit={exit_}") + logger.debug(f"exit-trigger: {latest['date']} (pair={pair}) " f"enter={enter} exit={exit_}") return enter, exit_, exit_tag @@ -1110,13 +1234,16 @@ class IStrategy(ABC, HyperStrategyMixin): if enter_long == 1 and not any([exit_long, enter_short]): enter_signal = SignalDirection.LONG enter_tag = latest.get(SignalTagType.ENTER_TAG.value, None) - if (self.config.get('trading_mode', TradingMode.SPOT) != TradingMode.SPOT - and self.can_short - and enter_short == 1 and not any([exit_short, enter_long])): + if ( + self.config.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT + and self.can_short + and enter_short == 1 + and not any([exit_short, enter_long]) + ): enter_signal = SignalDirection.SHORT enter_tag = latest.get(SignalTagType.ENTER_TAG.value, None) - enter_tag = enter_tag if isinstance(enter_tag, str) and enter_tag != 'nan' else None + enter_tag = enter_tag if isinstance(enter_tag, str) and enter_tag != "nan" else None timeframe_seconds = timeframe_to_seconds(timeframe) @@ -1124,20 +1251,18 @@ class IStrategy(ABC, HyperStrategyMixin): latest_date=latest_date, current_time=dt_now(), timeframe_seconds=timeframe_seconds, - enter=bool(enter_signal) + enter=bool(enter_signal), ): return None, enter_tag - logger.debug(f"entry trigger: {latest['date']} (pair={pair}) " - f"enter={enter_long} enter_tag_value={enter_tag}") + logger.debug( + f"entry trigger: {latest['date']} (pair={pair}) " + f"enter={enter_long} enter_tag_value={enter_tag}" + ) return enter_signal, enter_tag def ignore_expired_candle( - self, - latest_date: datetime, - current_time: datetime, - timeframe_seconds: int, - enter: bool + self, latest_date: datetime, current_time: datetime, timeframe_seconds: int, enter: bool ): if self.ignore_buying_expired_candle_after and enter: time_delta = current_time - (latest_date + timedelta(seconds=timeframe_seconds)) @@ -1145,10 +1270,18 @@ class IStrategy(ABC, HyperStrategyMixin): else: return False - def should_exit(self, trade: Trade, rate: float, current_time: datetime, *, - enter: bool, exit_: bool, - low: Optional[float] = None, high: Optional[float] = None, - force_stoploss: float = 0) -> List[ExitCheckTuple]: + def should_exit( + self, + trade: Trade, + rate: float, + current_time: datetime, + *, + enter: bool, + exit_: bool, + low: Optional[float] = None, + high: Optional[float] = None, + force_stoploss: float = 0, + ) -> List[ExitCheckTuple]: """ This function evaluates if one of the conditions required to trigger an exit order has been reached, which can either be a stop-loss, ROI or exit-signal. @@ -1168,45 +1301,57 @@ class IStrategy(ABC, HyperStrategyMixin): trade.adjust_min_max_rates(high or current_rate, low or current_rate) - stoplossflag = self.ft_stoploss_reached(current_rate=current_rate, trade=trade, - current_time=current_time, - current_profit=current_profit, - force_stoploss=force_stoploss, low=low, high=high) + stoplossflag = self.ft_stoploss_reached( + current_rate=current_rate, + trade=trade, + current_time=current_time, + current_profit=current_profit, + force_stoploss=force_stoploss, + low=low, + high=high, + ) # if enter signal and ignore_roi is set, we don't need to evaluate min_roi. - roi_reached = (not (enter and self.ignore_roi_if_entry_signal) - and self.min_roi_reached(trade=trade, current_profit=current_profit_best, - current_time=current_time)) + roi_reached = not (enter and self.ignore_roi_if_entry_signal) and self.min_roi_reached( + trade=trade, current_profit=current_profit_best, current_time=current_time + ) exit_signal = ExitType.NONE - custom_reason = '' + custom_reason = "" if self.use_exit_signal: if exit_ and not enter: exit_signal = ExitType.EXIT_SIGNAL else: reason_cust = strategy_safe_wrapper(self.custom_exit, default_retval=False)( - pair=trade.pair, trade=trade, current_time=current_time, - current_rate=current_rate, current_profit=current_profit) + pair=trade.pair, + trade=trade, + current_time=current_time, + current_rate=current_rate, + current_profit=current_profit, + ) if reason_cust: exit_signal = ExitType.CUSTOM_EXIT if isinstance(reason_cust, str): custom_reason = reason_cust if len(reason_cust) > CUSTOM_TAG_MAX_LENGTH: - logger.warning(f'Custom exit reason returned from ' - f'custom_exit is too long and was trimmed' - f'to {CUSTOM_TAG_MAX_LENGTH} characters.') + logger.warning( + f"Custom exit reason returned from " + f"custom_exit is too long and was trimmed" + f"to {CUSTOM_TAG_MAX_LENGTH} characters." + ) custom_reason = reason_cust[:CUSTOM_TAG_MAX_LENGTH] else: - custom_reason = '' - if ( - exit_signal == ExitType.CUSTOM_EXIT - or (exit_signal == ExitType.EXIT_SIGNAL - and (not self.exit_profit_only or current_profit > self.exit_profit_offset)) + custom_reason = "" + if exit_signal == ExitType.CUSTOM_EXIT or ( + exit_signal == ExitType.EXIT_SIGNAL + and (not self.exit_profit_only or current_profit > self.exit_profit_offset) ): - logger.debug(f"{trade.pair} - Sell signal received. " - f"exit_type=ExitType.{exit_signal.name}" + - (f", custom_reason={custom_reason}" if custom_reason else "")) + logger.debug( + f"{trade.pair} - Sell signal received. " + f"exit_type=ExitType.{exit_signal.name}" + + (f", custom_reason={custom_reason}" if custom_reason else "") + ) exits.append(ExitCheckTuple(exit_type=exit_signal, exit_reason=custom_reason)) # Sequence: @@ -1216,7 +1361,6 @@ class IStrategy(ABC, HyperStrategyMixin): # Trailing stoploss if stoplossflag.exit_type in (ExitType.STOP_LOSS, ExitType.LIQUIDATION): - logger.debug(f"{trade.pair} - Stoploss hit. exit_type={stoplossflag.exit_type}") exits.append(stoplossflag) @@ -1225,16 +1369,22 @@ class IStrategy(ABC, HyperStrategyMixin): exits.append(ExitCheckTuple(exit_type=ExitType.ROI)) if stoplossflag.exit_type == ExitType.TRAILING_STOP_LOSS: - logger.debug(f"{trade.pair} - Trailing stoploss hit.") exits.append(stoplossflag) return exits - def ft_stoploss_adjust(self, current_rate: float, trade: Trade, - current_time: datetime, current_profit: float, - force_stoploss: float, low: Optional[float] = None, - high: Optional[float] = None, after_fill: bool = False) -> None: + def ft_stoploss_adjust( + self, + current_rate: float, + trade: Trade, + current_time: datetime, + current_profit: float, + force_stoploss: float, + low: Optional[float] = None, + high: Optional[float] = None, + after_fill: bool = False, + ) -> None: """ Adjust stop-loss dynamically if configured to do so. :param current_profit: current profit as ratio @@ -1250,27 +1400,32 @@ class IStrategy(ABC, HyperStrategyMixin): # Initiate stoploss with open_rate. Does nothing if stoploss is already set. trade.adjust_stop_loss(trade.open_rate, stop_loss_value, initial=True) - dir_correct = (trade.stop_loss < (low or current_rate) - if not trade.is_short else - trade.stop_loss > (high or current_rate) - ) + dir_correct = ( + trade.stop_loss < (low or current_rate) + if not trade.is_short + else trade.stop_loss > (high or current_rate) + ) # Make sure current_profit is calculated using high for backtesting. - bound = (low if trade.is_short else high) + bound = low if trade.is_short else high bound_profit = current_profit if not bound else trade.calc_profit_ratio(bound) if self.use_custom_stoploss and dir_correct: stop_loss_value_custom = strategy_safe_wrapper( self.custom_stoploss, default_retval=None, supress_error=True - )(pair=trade.pair, trade=trade, - current_time=current_time, - current_rate=(bound or current_rate), - current_profit=bound_profit, - after_fill=after_fill) + )( + pair=trade.pair, + trade=trade, + current_time=current_time, + current_rate=(bound or current_rate), + current_profit=bound_profit, + after_fill=after_fill, + ) # Sanity check - error cases will return None if stop_loss_value_custom: stop_loss_value = stop_loss_value_custom - trade.adjust_stop_loss(bound or current_rate, stop_loss_value, - allow_refresh=after_fill) + trade.adjust_stop_loss( + bound or current_rate, stop_loss_value, allow_refresh=after_fill + ) else: logger.debug("CustomStoploss function did not return valid stoploss") @@ -1284,15 +1439,23 @@ class IStrategy(ABC, HyperStrategyMixin): # Specific handling for trailing_stop_positive if self.trailing_stop_positive is not None and bound_profit > sl_offset: stop_loss_value = self.trailing_stop_positive - logger.debug(f"{trade.pair} - Using positive stoploss: {stop_loss_value} " - f"offset: {sl_offset:.4g} profit: {bound_profit:.2%}") + logger.debug( + f"{trade.pair} - Using positive stoploss: {stop_loss_value} " + f"offset: {sl_offset:.4g} profit: {bound_profit:.2%}" + ) trade.adjust_stop_loss(bound or current_rate, stop_loss_value) - def ft_stoploss_reached(self, current_rate: float, trade: Trade, - current_time: datetime, current_profit: float, - force_stoploss: float, low: Optional[float] = None, - high: Optional[float] = None) -> ExitCheckTuple: + def ft_stoploss_reached( + self, + current_rate: float, + trade: Trade, + current_time: datetime, + current_profit: float, + force_stoploss: float, + low: Optional[float] = None, + high: Optional[float] = None, + ) -> ExitCheckTuple: """ Based on current profit of the trade and configured (trailing) stoploss, decides to exit or not @@ -1300,24 +1463,29 @@ class IStrategy(ABC, HyperStrategyMixin): :param low: Low value of this candle, only set in backtesting :param high: High value of this candle, only set in backtesting """ - self.ft_stoploss_adjust(current_rate, trade, current_time, current_profit, - force_stoploss, low, high) + self.ft_stoploss_adjust( + current_rate, trade, current_time, current_profit, force_stoploss, low, high + ) - sl_higher_long = (trade.stop_loss >= (low or current_rate) and not trade.is_short) - sl_lower_short = (trade.stop_loss <= (high or current_rate) and trade.is_short) - liq_higher_long = (trade.liquidation_price - and trade.liquidation_price >= (low or current_rate) - and not trade.is_short) - liq_lower_short = (trade.liquidation_price - and trade.liquidation_price <= (high or current_rate) - and trade.is_short) + sl_higher_long = trade.stop_loss >= (low or current_rate) and not trade.is_short + sl_lower_short = trade.stop_loss <= (high or current_rate) and trade.is_short + liq_higher_long = ( + trade.liquidation_price + and trade.liquidation_price >= (low or current_rate) + and not trade.is_short + ) + liq_lower_short = ( + trade.liquidation_price + and trade.liquidation_price <= (high or current_rate) + and trade.is_short + ) # evaluate if the stoploss was hit if stoploss is not on exchange # in Dry-Run, this handles stoploss logic as well, as the logic will not be different to # regular stoploss handling. - if ((sl_higher_long or sl_lower_short) and - (not self.order_types.get('stoploss_on_exchange') or self.config['dry_run'])): - + if (sl_higher_long or sl_lower_short) and ( + not self.order_types.get("stoploss_on_exchange") or self.config["dry_run"] + ): exit_type = ExitType.STOP_LOSS # If initial stoploss is not the same as current one then it is trailing. @@ -1328,11 +1496,12 @@ class IStrategy(ABC, HyperStrategyMixin): f"{((high if trade.is_short else low) or current_rate):.6f}, " f"stoploss is {trade.stop_loss:.6f}, " f"initial stoploss was at {trade.initial_stop_loss:.6f}, " - f"trade opened at {trade.open_rate:.6f}") + f"trade opened at {trade.open_rate:.6f}" + ) return ExitCheckTuple(exit_type=exit_type) - if (liq_higher_long or liq_lower_short): + if liq_higher_long or liq_lower_short: logger.debug(f"{trade.pair} - Liquidation price hit. exit_type=ExitType.LIQUIDATION") return ExitCheckTuple(exit_type=ExitType.LIQUIDATION) @@ -1366,29 +1535,30 @@ class IStrategy(ABC, HyperStrategyMixin): else: return current_profit > roi - def ft_check_timed_out(self, trade: Trade, order: Order, - current_time: datetime) -> bool: + def ft_check_timed_out(self, trade: Trade, order: Order, current_time: datetime) -> bool: """ FT Internal method. Check if timeout is active, and if the order is still open and timed out """ - side = 'entry' if order.ft_order_side == trade.entry_side else 'exit' + side = "entry" if order.ft_order_side == trade.entry_side else "exit" - timeout = self.config.get('unfilledtimeout', {}).get(side) + timeout = self.config.get("unfilledtimeout", {}).get(side) if timeout is not None: - timeout_unit = self.config.get('unfilledtimeout', {}).get('unit', 'minutes') + timeout_unit = self.config.get("unfilledtimeout", {}).get("unit", "minutes") timeout_kwargs = {timeout_unit: -timeout} timeout_threshold = current_time + timedelta(**timeout_kwargs) - timedout = (order.status == 'open' and order.order_date_utc < timeout_threshold) + timedout = order.status == "open" and order.order_date_utc < timeout_threshold if timedout: return True - time_method = (self.check_exit_timeout if order.ft_order_side == trade.exit_side - else self.check_entry_timeout) + time_method = ( + self.check_exit_timeout + if order.ft_order_side == trade.exit_side + else self.check_entry_timeout + ) - return strategy_safe_wrapper(time_method, - default_retval=False)( - pair=trade.pair, trade=trade, order=order, - current_time=current_time) + return strategy_safe_wrapper(time_method, default_retval=False)( + pair=trade.pair, trade=trade, order=order, current_time=current_time + ) def advise_all_indicators(self, data: Dict[str, DataFrame]) -> Dict[str, DataFrame]: """ @@ -1400,8 +1570,10 @@ class IStrategy(ABC, HyperStrategyMixin): Has positive effects on memory usage for whatever reason - also when using only one strategy. """ - return {pair: self.advise_indicators(pair_data.copy(), {'pair': pair}).copy() - for pair, pair_data in data.items()} + return { + pair: self.advise_indicators(pair_data.copy(), {"pair": pair}).copy() + for pair, pair_data in data.items() + } def ft_advise_signals(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ @@ -1430,7 +1602,8 @@ class IStrategy(ABC, HyperStrategyMixin): # call populate_indicators_Nm() which were tagged with @informative decorator. for inf_data, populate_fn in self._ft_informative: dataframe = _create_and_merge_informative_pair( - self, dataframe, metadata, inf_data, populate_fn) + self, dataframe, metadata, inf_data, populate_fn + ) return self.populate_indicators(dataframe, metadata) @@ -1446,10 +1619,10 @@ class IStrategy(ABC, HyperStrategyMixin): logger.debug(f"Populating enter signals for pair {metadata.get('pair')}.") # Initialize column to work around Pandas bug #56503. - dataframe.loc[:, 'enter_tag'] = '' + dataframe.loc[:, "enter_tag"] = "" df = self.populate_entry_trend(dataframe, metadata) - if 'enter_long' not in df.columns: - df = df.rename({'buy': 'enter_long', 'buy_tag': 'enter_tag'}, axis='columns') + if "enter_long" not in df.columns: + df = df.rename({"buy": "enter_long", "buy_tag": "enter_tag"}, axis="columns") return df @@ -1463,9 +1636,9 @@ class IStrategy(ABC, HyperStrategyMixin): :return: DataFrame with exit column """ # Initialize column to work around Pandas bug #56503. - dataframe.loc[:, 'exit_tag'] = '' + dataframe.loc[:, "exit_tag"] = "" logger.debug(f"Populating exit signals for pair {metadata.get('pair')}.") df = self.populate_exit_trend(dataframe, metadata) - if 'exit_long' not in df.columns: - df = df.rename({'sell': 'exit_long'}, axis='columns') + if "exit_long" not in df.columns: + df = df.rename({"sell": "exit_long"}, axis="columns") return df diff --git a/freqtrade/strategy/parameters.py b/freqtrade/strategy/parameters.py index 019b01050..79091e2d6 100644 --- a/freqtrade/strategy/parameters.py +++ b/freqtrade/strategy/parameters.py @@ -2,6 +2,7 @@ IHyperStrategy interface, hyperoptable Parameter class. This module defines a base class for auto-hyperoptable strategies. """ + import logging from abc import ABC, abstractmethod from contextlib import suppress @@ -26,14 +27,22 @@ class BaseParameter(ABC): """ Defines a parameter that can be optimized by hyperopt. """ + category: Optional[str] default: Any value: Any in_space: bool = False name: str - def __init__(self, *, default: Any, space: Optional[str] = None, - optimize: bool = True, load: bool = True, **kwargs): + def __init__( + self, + *, + default: Any, + space: Optional[str] = None, + optimize: bool = True, + load: bool = True, + **kwargs, + ): """ Initialize hyperopt-optimizable parameter. :param space: A parameter category. Can be 'buy' or 'sell'. This parameter is optional if @@ -43,9 +52,10 @@ class BaseParameter(ABC): :param load: Load parameter value from {space}_params. :param kwargs: Extra parameters to skopt.space.(Integer|Real|Categorical). """ - if 'name' in kwargs: + if "name" in kwargs: raise OperationalException( - 'Name is determined by parameter field name and can not be specified manually.') + "Name is determined by parameter field name and can not be specified manually." + ) self.category = space self._space_params = kwargs self.value = default @@ -53,10 +63,10 @@ class BaseParameter(ABC): self.load = load def __repr__(self): - return f'{self.__class__.__name__}({self.value})' + return f"{self.__class__.__name__}({self.value})" @abstractmethod - def get_space(self, name: str) -> Union['Integer', 'Real', 'SKDecimal', 'Categorical']: + def get_space(self, name: str) -> Union["Integer", "Real", "SKDecimal", "Categorical"]: """ Get-space - will be used by Hyperopt to get the hyperopt Space """ @@ -70,14 +80,23 @@ class BaseParameter(ABC): class NumericParameter(BaseParameter): - """ Internal parameter used for Numeric purposes """ + """Internal parameter used for Numeric purposes""" + float_or_int = Union[int, float] default: float_or_int value: float_or_int - def __init__(self, low: Union[float_or_int, Sequence[float_or_int]], - high: Optional[float_or_int] = None, *, default: float_or_int, - space: Optional[str] = None, optimize: bool = True, load: bool = True, **kwargs): + def __init__( + self, + low: Union[float_or_int, Sequence[float_or_int]], + high: Optional[float_or_int] = None, + *, + default: float_or_int, + space: Optional[str] = None, + optimize: bool = True, + load: bool = True, + **kwargs, + ): """ Initialize hyperopt-optimizable numeric parameter. Cannot be instantiated, but provides the validation for other numeric parameters @@ -92,17 +111,16 @@ class NumericParameter(BaseParameter): :param kwargs: Extra parameters to skopt.space.*. """ if high is not None and isinstance(low, Sequence): - raise OperationalException(f'{self.__class__.__name__} space invalid.') + raise OperationalException(f"{self.__class__.__name__} space invalid.") if high is None or isinstance(low, Sequence): if not isinstance(low, Sequence) or len(low) != 2: - raise OperationalException(f'{self.__class__.__name__} space must be [low, high]') + raise OperationalException(f"{self.__class__.__name__} space must be [low, high]") self.low, self.high = low else: self.low = low self.high = high - super().__init__(default=default, space=space, optimize=optimize, - load=load, **kwargs) + super().__init__(default=default, space=space, optimize=optimize, load=load, **kwargs) class IntParameter(NumericParameter): @@ -111,8 +129,17 @@ class IntParameter(NumericParameter): low: int high: int - def __init__(self, low: Union[int, Sequence[int]], high: Optional[int] = None, *, default: int, - space: Optional[str] = None, optimize: bool = True, load: bool = True, **kwargs): + def __init__( + self, + low: Union[int, Sequence[int]], + high: Optional[int] = None, + *, + default: int, + space: Optional[str] = None, + optimize: bool = True, + load: bool = True, + **kwargs, + ): """ Initialize hyperopt-optimizable integer parameter. :param low: Lower end (inclusive) of optimization space or [low, high]. @@ -126,10 +153,11 @@ class IntParameter(NumericParameter): :param kwargs: Extra parameters to skopt.space.Integer. """ - super().__init__(low=low, high=high, default=default, space=space, optimize=optimize, - load=load, **kwargs) + super().__init__( + low=low, high=high, default=default, space=space, optimize=optimize, load=load, **kwargs + ) - def get_space(self, name: str) -> 'Integer': + def get_space(self, name: str) -> "Integer": """ Create skopt optimization space. :param name: A name of parameter field. @@ -155,9 +183,17 @@ class RealParameter(NumericParameter): default: float value: float - def __init__(self, low: Union[float, Sequence[float]], high: Optional[float] = None, *, - default: float, space: Optional[str] = None, optimize: bool = True, - load: bool = True, **kwargs): + def __init__( + self, + low: Union[float, Sequence[float]], + high: Optional[float] = None, + *, + default: float, + space: Optional[str] = None, + optimize: bool = True, + load: bool = True, + **kwargs, + ): """ Initialize hyperopt-optimizable floating point parameter with unlimited precision. :param low: Lower end (inclusive) of optimization space or [low, high]. @@ -170,10 +206,11 @@ class RealParameter(NumericParameter): :param load: Load parameter value from {space}_params. :param kwargs: Extra parameters to skopt.space.Real. """ - super().__init__(low=low, high=high, default=default, space=space, optimize=optimize, - load=load, **kwargs) + super().__init__( + low=low, high=high, default=default, space=space, optimize=optimize, load=load, **kwargs + ) - def get_space(self, name: str) -> 'Real': + def get_space(self, name: str) -> "Real": """ Create skopt optimization space. :param name: A name of parameter field. @@ -185,9 +222,18 @@ class DecimalParameter(NumericParameter): default: float value: float - def __init__(self, low: Union[float, Sequence[float]], high: Optional[float] = None, *, - default: float, decimals: int = 3, space: Optional[str] = None, - optimize: bool = True, load: bool = True, **kwargs): + def __init__( + self, + low: Union[float, Sequence[float]], + high: Optional[float] = None, + *, + default: float, + decimals: int = 3, + space: Optional[str] = None, + optimize: bool = True, + load: bool = True, + **kwargs, + ): """ Initialize hyperopt-optimizable decimal parameter with a limited precision. :param low: Lower end (inclusive) of optimization space or [low, high]. @@ -204,16 +250,18 @@ class DecimalParameter(NumericParameter): self._decimals = decimals default = round(default, self._decimals) - super().__init__(low=low, high=high, default=default, space=space, optimize=optimize, - load=load, **kwargs) + super().__init__( + low=low, high=high, default=default, space=space, optimize=optimize, load=load, **kwargs + ) - def get_space(self, name: str) -> 'SKDecimal': + def get_space(self, name: str) -> "SKDecimal": """ Create skopt optimization space. :param name: A name of parameter field. """ - return SKDecimal(low=self.low, high=self.high, decimals=self._decimals, name=name, - **self._space_params) + return SKDecimal( + low=self.low, high=self.high, decimals=self._decimals, name=name, **self._space_params + ) @property def range(self): @@ -236,8 +284,16 @@ class CategoricalParameter(BaseParameter): value: Any opt_range: Sequence[Any] - def __init__(self, categories: Sequence[Any], *, default: Optional[Any] = None, - space: Optional[str] = None, optimize: bool = True, load: bool = True, **kwargs): + def __init__( + self, + categories: Sequence[Any], + *, + default: Optional[Any] = None, + space: Optional[str] = None, + optimize: bool = True, + load: bool = True, + **kwargs, + ): """ Initialize hyperopt-optimizable parameter. :param categories: Optimization space, [a, b, ...]. @@ -252,12 +308,12 @@ class CategoricalParameter(BaseParameter): """ if len(categories) < 2: raise OperationalException( - 'CategoricalParameter space must be [a, b, ...] (at least two parameters)') + "CategoricalParameter space must be [a, b, ...] (at least two parameters)" + ) self.opt_range = categories - super().__init__(default=default, space=space, optimize=optimize, - load=load, **kwargs) + super().__init__(default=default, space=space, optimize=optimize, load=load, **kwargs) - def get_space(self, name: str) -> 'Categorical': + def get_space(self, name: str) -> "Categorical": """ Create skopt optimization space. :param name: A name of parameter field. @@ -279,9 +335,15 @@ class CategoricalParameter(BaseParameter): class BooleanParameter(CategoricalParameter): - - def __init__(self, *, default: Optional[Any] = None, - space: Optional[str] = None, optimize: bool = True, load: bool = True, **kwargs): + def __init__( + self, + *, + default: Optional[Any] = None, + space: Optional[str] = None, + optimize: bool = True, + load: bool = True, + **kwargs, + ): """ Initialize hyperopt-optimizable Boolean Parameter. It's a shortcut to `CategoricalParameter([True, False])`. @@ -296,5 +358,11 @@ class BooleanParameter(CategoricalParameter): """ categories = [True, False] - super().__init__(categories=categories, default=default, space=space, optimize=optimize, - load=load, **kwargs) + super().__init__( + categories=categories, + default=default, + space=space, + optimize=optimize, + load=load, + **kwargs, + ) diff --git a/freqtrade/strategy/strategy_helper.py b/freqtrade/strategy/strategy_helper.py index 5085063a3..1a91629d9 100644 --- a/freqtrade/strategy/strategy_helper.py +++ b/freqtrade/strategy/strategy_helper.py @@ -5,11 +5,16 @@ import pandas as pd from freqtrade.exchange import timeframe_to_minutes -def merge_informative_pair(dataframe: pd.DataFrame, informative: pd.DataFrame, - timeframe: str, timeframe_inf: str, ffill: bool = True, - append_timeframe: bool = True, - date_column: str = 'date', - suffix: Optional[str] = None) -> pd.DataFrame: +def merge_informative_pair( + dataframe: pd.DataFrame, + informative: pd.DataFrame, + timeframe: str, + timeframe_inf: str, + ffill: bool = True, + append_timeframe: bool = True, + date_column: str = "date", + suffix: Optional[str] = None, +) -> pd.DataFrame: """ Correctly merge informative samples to the original dataframe, avoiding lookahead bias. @@ -41,37 +46,39 @@ def merge_informative_pair(dataframe: pd.DataFrame, informative: pd.DataFrame, minutes = timeframe_to_minutes(timeframe) if minutes == minutes_inf: # No need to forwardshift if the timeframes are identical - informative['date_merge'] = informative[date_column] + informative["date_merge"] = informative[date_column] elif minutes < minutes_inf: # Subtract "small" timeframe so merging is not delayed by 1 small candle # Detailed explanation in https://github.com/freqtrade/freqtrade/issues/4073 if not informative.empty: - if timeframe_inf == '1M': - informative['date_merge'] = ( - (informative[date_column] + pd.offsets.MonthBegin(1)) - - pd.to_timedelta(minutes, 'm') - ) + if timeframe_inf == "1M": + informative["date_merge"] = ( + informative[date_column] + pd.offsets.MonthBegin(1) + ) - pd.to_timedelta(minutes, "m") else: - informative['date_merge'] = ( - informative[date_column] + pd.to_timedelta(minutes_inf, 'm') - - pd.to_timedelta(minutes, 'm') + informative["date_merge"] = ( + informative[date_column] + + pd.to_timedelta(minutes_inf, "m") + - pd.to_timedelta(minutes, "m") ) else: - informative['date_merge'] = informative[date_column] + informative["date_merge"] = informative[date_column] else: - raise ValueError("Tried to merge a faster timeframe to a slower timeframe." - "This would create new rows, and can throw off your regular indicators.") + raise ValueError( + "Tried to merge a faster timeframe to a slower timeframe." + "This would create new rows, and can throw off your regular indicators." + ) # Rename columns to be unique - date_merge = 'date_merge' + date_merge = "date_merge" if suffix and append_timeframe: raise ValueError("You can not specify `append_timeframe` as True and a `suffix`.") elif append_timeframe: - date_merge = f'date_merge_{timeframe_inf}' + date_merge = f"date_merge_{timeframe_inf}" informative.columns = [f"{col}_{timeframe_inf}" for col in informative.columns] elif suffix: - date_merge = f'date_merge_{suffix}' + date_merge = f"date_merge_{suffix}" informative.columns = [f"{col}_{suffix}" for col in informative.columns] # Combine the 2 dataframes @@ -79,21 +86,25 @@ def merge_informative_pair(dataframe: pd.DataFrame, informative: pd.DataFrame, if ffill: # https://pandas.pydata.org/docs/user_guide/merging.html#timeseries-friendly-merging # merge_ordered - ffill method is 2.5x faster than separate ffill() - dataframe = pd.merge_ordered(dataframe, informative, fill_method="ffill", left_on='date', - right_on=date_merge, how='left') + dataframe = pd.merge_ordered( + dataframe, + informative, + fill_method="ffill", + left_on="date", + right_on=date_merge, + how="left", + ) else: - dataframe = pd.merge(dataframe, informative, left_on='date', - right_on=date_merge, how='left') + dataframe = pd.merge( + dataframe, informative, left_on="date", right_on=date_merge, how="left" + ) dataframe = dataframe.drop(date_merge, axis=1) return dataframe def stoploss_from_open( - open_relative_stop: float, - current_profit: float, - is_short: bool = False, - leverage: float = 1.0 + open_relative_stop: float, current_profit: float, is_short: bool = False, leverage: float = 1.0 ) -> float: """ Given the current profit, and a desired stop loss value relative to the trade entry price, @@ -129,8 +140,9 @@ def stoploss_from_open( return max(stoploss * leverage, 0.0) -def stoploss_from_absolute(stop_rate: float, current_rate: float, is_short: bool = False, - leverage: float = 1.0) -> float: +def stoploss_from_absolute( + stop_rate: float, current_rate: float, is_short: bool = False, leverage: float = 1.0 +) -> float: """ Given current price and desired stop price, return a stop loss value that is relative to current price. diff --git a/freqtrade/strategy/strategy_wrapper.py b/freqtrade/strategy/strategy_wrapper.py index 8cb0bde15..eaeb6aa7e 100644 --- a/freqtrade/strategy/strategy_wrapper.py +++ b/freqtrade/strategy/strategy_wrapper.py @@ -9,7 +9,7 @@ from freqtrade.exceptions import StrategyError logger = logging.getLogger(__name__) -F = TypeVar('F', bound=Callable[..., Any]) +F = TypeVar("F", bound=Callable[..., Any]) def strategy_safe_wrapper(f: F, message: str = "", default_retval=None, supress_error=False) -> F: @@ -18,27 +18,21 @@ def strategy_safe_wrapper(f: F, message: str = "", default_retval=None, supress_ Caches all exceptions and returns either the default_retval (if it's not None) or raises a StrategyError exception, which then needs to be handled by the calling method. """ + @wraps(f) def wrapper(*args, **kwargs): try: - if 'trade' in kwargs: + if "trade" in kwargs: # Protect accidental modifications from within the strategy - kwargs['trade'] = deepcopy(kwargs['trade']) + kwargs["trade"] = deepcopy(kwargs["trade"]) return f(*args, **kwargs) except ValueError as error: - logger.warning( - f"{message}" - f"Strategy caused the following exception: {error}" - f"{f}" - ) + logger.warning(f"{message}" f"Strategy caused the following exception: {error}" f"{f}") if default_retval is None and not supress_error: raise StrategyError(str(error)) from error return default_retval except Exception as error: - logger.exception( - f"{message}" - f"Unexpected error {error} calling {f}" - ) + logger.exception(f"{message}" f"Unexpected error {error} calling {f}") if default_retval is None and not supress_error: raise StrategyError(str(error)) from error return default_retval diff --git a/freqtrade/strategy/strategyupdater.py b/freqtrade/strategy/strategyupdater.py index 2669dcc4a..05494537d 100644 --- a/freqtrade/strategy/strategyupdater.py +++ b/freqtrade/strategy/strategyupdater.py @@ -8,41 +8,39 @@ from freqtrade.constants import Config class StrategyUpdater: name_mapping = { - 'ticker_interval': 'timeframe', - 'buy': 'enter_long', - 'sell': 'exit_long', - 'buy_tag': 'enter_tag', - 'sell_reason': 'exit_reason', - - 'sell_signal': 'exit_signal', - 'custom_sell': 'custom_exit', - 'force_sell': 'force_exit', - 'emergency_sell': 'emergency_exit', - + "ticker_interval": "timeframe", + "buy": "enter_long", + "sell": "exit_long", + "buy_tag": "enter_tag", + "sell_reason": "exit_reason", + "sell_signal": "exit_signal", + "custom_sell": "custom_exit", + "force_sell": "force_exit", + "emergency_sell": "emergency_exit", # Strategy/config settings: - 'use_sell_signal': 'use_exit_signal', - 'sell_profit_only': 'exit_profit_only', - 'sell_profit_offset': 'exit_profit_offset', - 'ignore_roi_if_buy_signal': 'ignore_roi_if_entry_signal', - 'forcebuy_enable': 'force_entry_enable', + "use_sell_signal": "use_exit_signal", + "sell_profit_only": "exit_profit_only", + "sell_profit_offset": "exit_profit_offset", + "ignore_roi_if_buy_signal": "ignore_roi_if_entry_signal", + "forcebuy_enable": "force_entry_enable", } function_mapping = { - 'populate_buy_trend': 'populate_entry_trend', - 'populate_sell_trend': 'populate_exit_trend', - 'custom_sell': 'custom_exit', - 'check_buy_timeout': 'check_entry_timeout', - 'check_sell_timeout': 'check_exit_timeout', + "populate_buy_trend": "populate_entry_trend", + "populate_sell_trend": "populate_exit_trend", + "custom_sell": "custom_exit", + "check_buy_timeout": "check_entry_timeout", + "check_sell_timeout": "check_exit_timeout", # '': '', } # order_time_in_force, order_types, unfilledtimeout otif_ot_unfilledtimeout = { - 'buy': 'entry', - 'sell': 'exit', + "buy": "entry", + "sell": "exit", } # create a dictionary that maps the old column names to the new ones - rename_dict = {'buy': 'enter_long', 'sell': 'exit_long', 'buy_tag': 'enter_tag'} + rename_dict = {"buy": "enter_long", "sell": "exit_long", "buy_tag": "enter_tag"} def start(self, config: Config, strategy_obj: dict) -> None: """ @@ -51,12 +49,12 @@ class StrategyUpdater: :return: None """ - source_file = strategy_obj['location'] - strategies_backup_folder = Path.joinpath(config['user_data_dir'], "strategies_orig_updater") - target_file = Path.joinpath(strategies_backup_folder, strategy_obj['location_rel']) + source_file = strategy_obj["location"] + strategies_backup_folder = Path.joinpath(config["user_data_dir"], "strategies_orig_updater") + target_file = Path.joinpath(strategies_backup_folder, strategy_obj["location_rel"]) # read the file - with Path(source_file).open('r') as f: + with Path(source_file).open("r") as f: old_code = f.read() if not strategies_backup_folder.is_dir(): Path(strategies_backup_folder).mkdir(parents=True, exist_ok=True) @@ -70,7 +68,7 @@ class StrategyUpdater: # update the code new_code = self.update_code(old_code) # write the modified code to the destination folder - with Path(source_file).open('w') as f: + with Path(source_file).open("w") as f: f.write(new_code) # define the function to update the code @@ -106,7 +104,6 @@ class StrategyUpdater: # Here we go through each respective node, slice, elt, key ... to replace outdated entries. class NameUpdater(ast_comments.NodeTransformer): def generic_visit(self, node): - # space is not yet transferred from buy/sell to entry/exit and thereby has to be skipped. if isinstance(node, ast_comments.keyword): if node.arg == "space": @@ -180,37 +177,38 @@ class NameUpdater(ast_comments.NodeTransformer): def visit_Attribute(self, node): if ( - isinstance(node.value, ast_comments.Name) - and node.value.id == 'trade' - and node.attr == 'nr_of_successful_buys' + isinstance(node.value, ast_comments.Name) + and node.value.id == "trade" + and node.attr == "nr_of_successful_buys" ): - node.attr = 'nr_of_successful_entries' + node.attr = "nr_of_successful_entries" return node def visit_ClassDef(self, node): # check if the class is derived from IStrategy - if any(isinstance(base, ast_comments.Name) and - base.id == 'IStrategy' for base in node.bases): + if any( + isinstance(base, ast_comments.Name) and base.id == "IStrategy" for base in node.bases + ): # check if the INTERFACE_VERSION variable exists has_interface_version = any( - isinstance(child, ast_comments.Assign) and - isinstance(child.targets[0], ast_comments.Name) and - child.targets[0].id == 'INTERFACE_VERSION' + isinstance(child, ast_comments.Assign) + and isinstance(child.targets[0], ast_comments.Name) + and child.targets[0].id == "INTERFACE_VERSION" for child in node.body ) # if the INTERFACE_VERSION variable does not exist, add it as the first child if not has_interface_version: - node.body.insert(0, ast_comments.parse('INTERFACE_VERSION = 3').body[0]) + node.body.insert(0, ast_comments.parse("INTERFACE_VERSION = 3").body[0]) # otherwise, update its value to 3 else: for child in node.body: if ( - isinstance(child, ast_comments.Assign) - and isinstance(child.targets[0], ast_comments.Name) - and child.targets[0].id == 'INTERFACE_VERSION' + isinstance(child, ast_comments.Assign) + and isinstance(child.targets[0], ast_comments.Name) + and child.targets[0].id == "INTERFACE_VERSION" ): - child.value = ast_comments.parse('3').body[0].value + child.value = ast_comments.parse("3").body[0].value self.generic_visit(node) return node