ruff format: freqtrade/strategies

This commit is contained in:
Matthias
2024-05-12 16:41:08 +02:00
parent 6bfe7aa72d
commit 439b8a0320
7 changed files with 721 additions and 446 deletions

View File

@@ -2,6 +2,7 @@
IHyperStrategy interface, hyperoptable Parameter class.
This module defines a base class for auto-hyperoptable strategies.
"""
import logging
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union
@@ -32,20 +33,22 @@ class HyperStrategyMixin:
self.ft_protection_params: List[BaseParameter] = []
params = self.load_params_from_file()
params = params.get('params', {})
params = params.get("params", {})
self._ft_params_from_file = params
# Init/loading of parameters is done as part of ft_bot_start().
def enumerate_parameters(
self, category: Optional[str] = None) -> Iterator[Tuple[str, BaseParameter]]:
self, category: Optional[str] = None
) -> Iterator[Tuple[str, BaseParameter]]:
"""
Find all optimizable parameters and return (name, attr) iterator.
:param category:
:return:
"""
if category not in ('buy', 'sell', 'protection', None):
if category not in ("buy", "sell", "protection", None):
raise OperationalException(
'Category must be one of: "buy", "sell", "protection", None.')
'Category must be one of: "buy", "sell", "protection", None.'
)
if category is None:
params = self.ft_buy_params + self.ft_sell_params + self.ft_protection_params
@@ -57,15 +60,13 @@ class HyperStrategyMixin:
@classmethod
def detect_all_parameters(cls) -> Dict:
""" Detect all parameters and return them as a list"""
"""Detect all parameters and return them as a list"""
params: Dict[str, Any] = {
'buy': list(detect_parameters(cls, 'buy')),
'sell': list(detect_parameters(cls, 'sell')),
'protection': list(detect_parameters(cls, 'protection')),
"buy": list(detect_parameters(cls, "buy")),
"sell": list(detect_parameters(cls, "sell")),
"protection": list(detect_parameters(cls, "protection")),
}
params.update({
'count': len(params['buy'] + params['sell'] + params['protection'])
})
params.update({"count": len(params["buy"] + params["sell"] + params["protection"])})
return params
@@ -77,23 +78,28 @@ class HyperStrategyMixin:
if self._ft_params_from_file:
# Set parameters from Hyperopt results file
params = self._ft_params_from_file
self.minimal_roi = params.get('roi', getattr(self, 'minimal_roi', {}))
self.minimal_roi = params.get("roi", getattr(self, "minimal_roi", {}))
self.stoploss = params.get('stoploss', {}).get(
'stoploss', getattr(self, 'stoploss', -0.1))
self.max_open_trades = params.get('max_open_trades', {}).get(
'max_open_trades', getattr(self, 'max_open_trades', -1))
trailing = params.get('trailing', {})
self.stoploss = params.get("stoploss", {}).get(
"stoploss", getattr(self, "stoploss", -0.1)
)
self.max_open_trades = params.get("max_open_trades", {}).get(
"max_open_trades", getattr(self, "max_open_trades", -1)
)
trailing = params.get("trailing", {})
self.trailing_stop = trailing.get(
'trailing_stop', getattr(self, 'trailing_stop', False))
"trailing_stop", getattr(self, "trailing_stop", False)
)
self.trailing_stop_positive = trailing.get(
'trailing_stop_positive', getattr(self, 'trailing_stop_positive', None))
"trailing_stop_positive", getattr(self, "trailing_stop_positive", None)
)
self.trailing_stop_positive_offset = trailing.get(
'trailing_stop_positive_offset',
getattr(self, 'trailing_stop_positive_offset', 0))
"trailing_stop_positive_offset", getattr(self, "trailing_stop_positive_offset", 0)
)
self.trailing_only_offset_is_reached = trailing.get(
'trailing_only_offset_is_reached',
getattr(self, 'trailing_only_offset_is_reached', 0.0))
"trailing_only_offset_is_reached",
getattr(self, "trailing_only_offset_is_reached", 0.0),
)
def ft_load_hyper_params(self, hyperopt: bool = False) -> None:
"""
@@ -104,29 +110,32 @@ class HyperStrategyMixin:
* Parameter defaults
"""
buy_params = deep_merge_dicts(self._ft_params_from_file.get('buy', {}),
getattr(self, 'buy_params', {}))
sell_params = deep_merge_dicts(self._ft_params_from_file.get('sell', {}),
getattr(self, 'sell_params', {}))
protection_params = deep_merge_dicts(self._ft_params_from_file.get('protection', {}),
getattr(self, 'protection_params', {}))
buy_params = deep_merge_dicts(
self._ft_params_from_file.get("buy", {}), getattr(self, "buy_params", {})
)
sell_params = deep_merge_dicts(
self._ft_params_from_file.get("sell", {}), getattr(self, "sell_params", {})
)
protection_params = deep_merge_dicts(
self._ft_params_from_file.get("protection", {}), getattr(self, "protection_params", {})
)
self._ft_load_params(buy_params, 'buy', hyperopt)
self._ft_load_params(sell_params, 'sell', hyperopt)
self._ft_load_params(protection_params, 'protection', hyperopt)
self._ft_load_params(buy_params, "buy", hyperopt)
self._ft_load_params(sell_params, "sell", hyperopt)
self._ft_load_params(protection_params, "protection", hyperopt)
def load_params_from_file(self) -> Dict:
filename_str = getattr(self, '__file__', '')
filename_str = getattr(self, "__file__", "")
if not filename_str:
return {}
filename = Path(filename_str).with_suffix('.json')
filename = Path(filename_str).with_suffix(".json")
if filename.is_file():
logger.info(f"Loading parameters from file {filename}")
try:
params = HyperoptTools.load_params(filename)
if params.get('strategy_name') != self.__class__.__name__:
raise OperationalException('Invalid parameter file provided.')
if params.get("strategy_name") != self.__class__.__name__:
raise OperationalException("Invalid parameter file provided.")
return params
except ValueError:
logger.warning("Invalid parameter file format.")
@@ -155,21 +164,23 @@ class HyperStrategyMixin:
if params and attr_name in params:
if attr.load:
attr.value = params[attr_name]
logger.info(f'Strategy Parameter: {attr_name} = {attr.value}')
logger.info(f"Strategy Parameter: {attr_name} = {attr.value}")
else:
logger.warning(f'Parameter "{attr_name}" exists, but is disabled. '
f'Default value "{attr.value}" used.')
logger.warning(
f'Parameter "{attr_name}" exists, but is disabled. '
f'Default value "{attr.value}" used.'
)
else:
logger.info(f'Strategy Parameter(default): {attr_name} = {attr.value}')
logger.info(f"Strategy Parameter(default): {attr_name} = {attr.value}")
def get_no_optimize_params(self) -> Dict[str, Dict]:
"""
Returns list of Parameters that are not part of the current optimize job
"""
params: Dict[str, Dict] = {
'buy': {},
'sell': {},
'protection': {},
"buy": {},
"sell": {},
"protection": {},
}
for name, p in self.enumerate_parameters():
if p.category and (not p.optimize or not p.in_space):
@@ -178,23 +189,27 @@ class HyperStrategyMixin:
def detect_parameters(
obj: Union[HyperStrategyMixin, Type[HyperStrategyMixin]],
category: str
) -> Iterator[Tuple[str, BaseParameter]]:
obj: Union[HyperStrategyMixin, Type[HyperStrategyMixin]], category: str
) -> Iterator[Tuple[str, BaseParameter]]:
"""
Detect all parameters for 'category' for "obj"
:param obj: Strategy object or class
:param category: category - usually `'buy', 'sell', 'protection',...
"""
for attr_name in dir(obj):
if not attr_name.startswith('__'): # Ignore internals, not strictly necessary.
if not attr_name.startswith("__"): # Ignore internals, not strictly necessary.
attr = getattr(obj, attr_name)
if issubclass(attr.__class__, BaseParameter):
if (attr_name.startswith(category + '_')
and attr.category is not None and attr.category != category):
if (
attr_name.startswith(category + "_")
and attr.category is not None
and attr.category != category
):
raise OperationalException(
f'Inconclusive parameter name {attr_name}, category: {attr.category}.')
f"Inconclusive parameter name {attr_name}, category: {attr.category}."
)
if (category == attr.category or
(attr_name.startswith(category + '_') and attr.category is None)):
if category == attr.category or (
attr_name.startswith(category + "_") and attr.category is None
):
yield attr_name, attr

View File

@@ -20,11 +20,14 @@ class InformativeData:
candle_type: Optional[CandleType]
def informative(timeframe: str, asset: str = '',
fmt: Optional[Union[str, Callable[[Any], str]]] = None,
*,
candle_type: Optional[Union[CandleType, str]] = None,
ffill: bool = True) -> Callable[[PopulateIndicators], PopulateIndicators]:
def informative(
timeframe: str,
asset: str = "",
fmt: Optional[Union[str, Callable[[Any], str]]] = None,
*,
candle_type: Optional[Union[CandleType, str]] = None,
ffill: bool = True,
) -> Callable[[PopulateIndicators], PopulateIndicators]:
"""
A decorator for populate_indicators_Nn(self, dataframe, metadata), allowing these functions to
define informative indicators.
@@ -62,38 +65,43 @@ def informative(timeframe: str, asset: str = '',
_candle_type = CandleType.from_string(candle_type) if candle_type else None
def decorator(fn: PopulateIndicators):
informative_pairs = getattr(fn, '_ft_informative', [])
informative_pairs = getattr(fn, "_ft_informative", [])
informative_pairs.append(InformativeData(_asset, _timeframe, _fmt, _ffill, _candle_type))
setattr(fn, '_ft_informative', informative_pairs) # noqa: B010
setattr(fn, "_ft_informative", informative_pairs) # noqa: B010
return fn
return decorator
def __get_pair_formats(market: Optional[Dict[str, Any]]) -> Dict[str, str]:
if not market:
return {}
base = market['base']
quote = market['quote']
base = market["base"]
quote = market["quote"]
return {
'base': base.lower(),
'BASE': base.upper(),
'quote': quote.lower(),
'QUOTE': quote.upper(),
"base": base.lower(),
"BASE": base.upper(),
"quote": quote.lower(),
"QUOTE": quote.upper(),
}
def _format_pair_name(config, pair: str, market: Optional[Dict[str, Any]] = None) -> str:
return pair.format(
stake_currency=config['stake_currency'],
stake=config['stake_currency'],
stake_currency=config["stake_currency"],
stake=config["stake_currency"],
**__get_pair_formats(market),
).upper()
def _create_and_merge_informative_pair(strategy, dataframe: DataFrame, metadata: dict,
inf_data: InformativeData,
populate_indicators: PopulateIndicators):
asset = inf_data.asset or ''
def _create_and_merge_informative_pair(
strategy,
dataframe: DataFrame,
metadata: dict,
inf_data: InformativeData,
populate_indicators: PopulateIndicators,
):
asset = inf_data.asset or ""
timeframe = inf_data.timeframe
fmt = inf_data.fmt
candle_type = inf_data.candle_type
@@ -102,15 +110,15 @@ def _create_and_merge_informative_pair(strategy, dataframe: DataFrame, metadata:
if asset:
# Insert stake currency if needed.
market1 = strategy.dp.market(metadata['pair'])
market1 = strategy.dp.market(metadata["pair"])
asset = _format_pair_name(config, asset, market1)
else:
# Not specifying an asset will define informative dataframe for current pair.
asset = metadata['pair']
asset = metadata["pair"]
market = strategy.dp.market(asset)
if market is None:
raise OperationalException(f'Market {asset} is not available.')
raise OperationalException(f"Market {asset} is not available.")
# Default format. This optimizes for the common case: informative pairs using same stake
# currency. When quote currency matches stake currency, column name will omit base currency.
@@ -118,33 +126,40 @@ def _create_and_merge_informative_pair(strategy, dataframe: DataFrame, metadata:
# where it is desired to keep quote currency in column name at all times user should specify
# fmt='{base}_{quote}_{column}_{timeframe}' format or similar.
if not fmt:
fmt = '{column}_{timeframe}' # Informatives of current pair
fmt = "{column}_{timeframe}" # Informatives of current pair
if inf_data.asset:
fmt = '{base}_{quote}_' + fmt # Informatives of other pairs
fmt = "{base}_{quote}_" + fmt # Informatives of other pairs
inf_metadata = {'pair': asset, 'timeframe': timeframe}
inf_metadata = {"pair": asset, "timeframe": timeframe}
inf_dataframe = strategy.dp.get_pair_dataframe(asset, timeframe, candle_type)
inf_dataframe = populate_indicators(strategy, inf_dataframe, inf_metadata)
formatter: Any = None
if callable(fmt):
formatter = fmt # A custom user-specified formatter function.
formatter = fmt # A custom user-specified formatter function.
else:
formatter = fmt.format # A default string formatter.
formatter = fmt.format # A default string formatter.
fmt_args = {
**__get_pair_formats(market),
'asset': asset,
'timeframe': timeframe,
"asset": asset,
"timeframe": timeframe,
}
inf_dataframe.rename(columns=lambda column: formatter(column=column, **fmt_args),
inplace=True)
inf_dataframe.rename(columns=lambda column: formatter(column=column, **fmt_args), inplace=True)
date_column = formatter(column='date', **fmt_args)
date_column = formatter(column="date", **fmt_args)
if date_column in dataframe.columns:
raise OperationalException(f'Duplicate column name {date_column} exists in '
f'dataframe! Ensure column names are unique!')
dataframe = merge_informative_pair(dataframe, inf_dataframe, strategy.timeframe, timeframe,
ffill=inf_data.ffill, append_timeframe=False,
date_column=date_column)
raise OperationalException(
f"Duplicate column name {date_column} exists in "
f"dataframe! Ensure column names are unique!"
)
dataframe = merge_informative_pair(
dataframe,
inf_dataframe,
strategy.timeframe,
timeframe,
ffill=inf_data.ffill,
append_timeframe=False,
date_column=date_column,
)
return dataframe

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@
IHyperStrategy interface, hyperoptable Parameter class.
This module defines a base class for auto-hyperoptable strategies.
"""
import logging
from abc import ABC, abstractmethod
from contextlib import suppress
@@ -26,14 +27,22 @@ class BaseParameter(ABC):
"""
Defines a parameter that can be optimized by hyperopt.
"""
category: Optional[str]
default: Any
value: Any
in_space: bool = False
name: str
def __init__(self, *, default: Any, space: Optional[str] = None,
optimize: bool = True, load: bool = True, **kwargs):
def __init__(
self,
*,
default: Any,
space: Optional[str] = None,
optimize: bool = True,
load: bool = True,
**kwargs,
):
"""
Initialize hyperopt-optimizable parameter.
:param space: A parameter category. Can be 'buy' or 'sell'. This parameter is optional if
@@ -43,9 +52,10 @@ class BaseParameter(ABC):
:param load: Load parameter value from {space}_params.
:param kwargs: Extra parameters to skopt.space.(Integer|Real|Categorical).
"""
if 'name' in kwargs:
if "name" in kwargs:
raise OperationalException(
'Name is determined by parameter field name and can not be specified manually.')
"Name is determined by parameter field name and can not be specified manually."
)
self.category = space
self._space_params = kwargs
self.value = default
@@ -53,10 +63,10 @@ class BaseParameter(ABC):
self.load = load
def __repr__(self):
return f'{self.__class__.__name__}({self.value})'
return f"{self.__class__.__name__}({self.value})"
@abstractmethod
def get_space(self, name: str) -> Union['Integer', 'Real', 'SKDecimal', 'Categorical']:
def get_space(self, name: str) -> Union["Integer", "Real", "SKDecimal", "Categorical"]:
"""
Get-space - will be used by Hyperopt to get the hyperopt Space
"""
@@ -70,14 +80,23 @@ class BaseParameter(ABC):
class NumericParameter(BaseParameter):
""" Internal parameter used for Numeric purposes """
"""Internal parameter used for Numeric purposes"""
float_or_int = Union[int, float]
default: float_or_int
value: float_or_int
def __init__(self, low: Union[float_or_int, Sequence[float_or_int]],
high: Optional[float_or_int] = None, *, default: float_or_int,
space: Optional[str] = None, optimize: bool = True, load: bool = True, **kwargs):
def __init__(
self,
low: Union[float_or_int, Sequence[float_or_int]],
high: Optional[float_or_int] = None,
*,
default: float_or_int,
space: Optional[str] = None,
optimize: bool = True,
load: bool = True,
**kwargs,
):
"""
Initialize hyperopt-optimizable numeric parameter.
Cannot be instantiated, but provides the validation for other numeric parameters
@@ -92,17 +111,16 @@ class NumericParameter(BaseParameter):
:param kwargs: Extra parameters to skopt.space.*.
"""
if high is not None and isinstance(low, Sequence):
raise OperationalException(f'{self.__class__.__name__} space invalid.')
raise OperationalException(f"{self.__class__.__name__} space invalid.")
if high is None or isinstance(low, Sequence):
if not isinstance(low, Sequence) or len(low) != 2:
raise OperationalException(f'{self.__class__.__name__} space must be [low, high]')
raise OperationalException(f"{self.__class__.__name__} space must be [low, high]")
self.low, self.high = low
else:
self.low = low
self.high = high
super().__init__(default=default, space=space, optimize=optimize,
load=load, **kwargs)
super().__init__(default=default, space=space, optimize=optimize, load=load, **kwargs)
class IntParameter(NumericParameter):
@@ -111,8 +129,17 @@ class IntParameter(NumericParameter):
low: int
high: int
def __init__(self, low: Union[int, Sequence[int]], high: Optional[int] = None, *, default: int,
space: Optional[str] = None, optimize: bool = True, load: bool = True, **kwargs):
def __init__(
self,
low: Union[int, Sequence[int]],
high: Optional[int] = None,
*,
default: int,
space: Optional[str] = None,
optimize: bool = True,
load: bool = True,
**kwargs,
):
"""
Initialize hyperopt-optimizable integer parameter.
:param low: Lower end (inclusive) of optimization space or [low, high].
@@ -126,10 +153,11 @@ class IntParameter(NumericParameter):
:param kwargs: Extra parameters to skopt.space.Integer.
"""
super().__init__(low=low, high=high, default=default, space=space, optimize=optimize,
load=load, **kwargs)
super().__init__(
low=low, high=high, default=default, space=space, optimize=optimize, load=load, **kwargs
)
def get_space(self, name: str) -> 'Integer':
def get_space(self, name: str) -> "Integer":
"""
Create skopt optimization space.
:param name: A name of parameter field.
@@ -155,9 +183,17 @@ class RealParameter(NumericParameter):
default: float
value: float
def __init__(self, low: Union[float, Sequence[float]], high: Optional[float] = None, *,
default: float, space: Optional[str] = None, optimize: bool = True,
load: bool = True, **kwargs):
def __init__(
self,
low: Union[float, Sequence[float]],
high: Optional[float] = None,
*,
default: float,
space: Optional[str] = None,
optimize: bool = True,
load: bool = True,
**kwargs,
):
"""
Initialize hyperopt-optimizable floating point parameter with unlimited precision.
:param low: Lower end (inclusive) of optimization space or [low, high].
@@ -170,10 +206,11 @@ class RealParameter(NumericParameter):
:param load: Load parameter value from {space}_params.
:param kwargs: Extra parameters to skopt.space.Real.
"""
super().__init__(low=low, high=high, default=default, space=space, optimize=optimize,
load=load, **kwargs)
super().__init__(
low=low, high=high, default=default, space=space, optimize=optimize, load=load, **kwargs
)
def get_space(self, name: str) -> 'Real':
def get_space(self, name: str) -> "Real":
"""
Create skopt optimization space.
:param name: A name of parameter field.
@@ -185,9 +222,18 @@ class DecimalParameter(NumericParameter):
default: float
value: float
def __init__(self, low: Union[float, Sequence[float]], high: Optional[float] = None, *,
default: float, decimals: int = 3, space: Optional[str] = None,
optimize: bool = True, load: bool = True, **kwargs):
def __init__(
self,
low: Union[float, Sequence[float]],
high: Optional[float] = None,
*,
default: float,
decimals: int = 3,
space: Optional[str] = None,
optimize: bool = True,
load: bool = True,
**kwargs,
):
"""
Initialize hyperopt-optimizable decimal parameter with a limited precision.
:param low: Lower end (inclusive) of optimization space or [low, high].
@@ -204,16 +250,18 @@ class DecimalParameter(NumericParameter):
self._decimals = decimals
default = round(default, self._decimals)
super().__init__(low=low, high=high, default=default, space=space, optimize=optimize,
load=load, **kwargs)
super().__init__(
low=low, high=high, default=default, space=space, optimize=optimize, load=load, **kwargs
)
def get_space(self, name: str) -> 'SKDecimal':
def get_space(self, name: str) -> "SKDecimal":
"""
Create skopt optimization space.
:param name: A name of parameter field.
"""
return SKDecimal(low=self.low, high=self.high, decimals=self._decimals, name=name,
**self._space_params)
return SKDecimal(
low=self.low, high=self.high, decimals=self._decimals, name=name, **self._space_params
)
@property
def range(self):
@@ -236,8 +284,16 @@ class CategoricalParameter(BaseParameter):
value: Any
opt_range: Sequence[Any]
def __init__(self, categories: Sequence[Any], *, default: Optional[Any] = None,
space: Optional[str] = None, optimize: bool = True, load: bool = True, **kwargs):
def __init__(
self,
categories: Sequence[Any],
*,
default: Optional[Any] = None,
space: Optional[str] = None,
optimize: bool = True,
load: bool = True,
**kwargs,
):
"""
Initialize hyperopt-optimizable parameter.
:param categories: Optimization space, [a, b, ...].
@@ -252,12 +308,12 @@ class CategoricalParameter(BaseParameter):
"""
if len(categories) < 2:
raise OperationalException(
'CategoricalParameter space must be [a, b, ...] (at least two parameters)')
"CategoricalParameter space must be [a, b, ...] (at least two parameters)"
)
self.opt_range = categories
super().__init__(default=default, space=space, optimize=optimize,
load=load, **kwargs)
super().__init__(default=default, space=space, optimize=optimize, load=load, **kwargs)
def get_space(self, name: str) -> 'Categorical':
def get_space(self, name: str) -> "Categorical":
"""
Create skopt optimization space.
:param name: A name of parameter field.
@@ -279,9 +335,15 @@ class CategoricalParameter(BaseParameter):
class BooleanParameter(CategoricalParameter):
def __init__(self, *, default: Optional[Any] = None,
space: Optional[str] = None, optimize: bool = True, load: bool = True, **kwargs):
def __init__(
self,
*,
default: Optional[Any] = None,
space: Optional[str] = None,
optimize: bool = True,
load: bool = True,
**kwargs,
):
"""
Initialize hyperopt-optimizable Boolean Parameter.
It's a shortcut to `CategoricalParameter([True, False])`.
@@ -296,5 +358,11 @@ class BooleanParameter(CategoricalParameter):
"""
categories = [True, False]
super().__init__(categories=categories, default=default, space=space, optimize=optimize,
load=load, **kwargs)
super().__init__(
categories=categories,
default=default,
space=space,
optimize=optimize,
load=load,
**kwargs,
)

View File

@@ -5,11 +5,16 @@ import pandas as pd
from freqtrade.exchange import timeframe_to_minutes
def merge_informative_pair(dataframe: pd.DataFrame, informative: pd.DataFrame,
timeframe: str, timeframe_inf: str, ffill: bool = True,
append_timeframe: bool = True,
date_column: str = 'date',
suffix: Optional[str] = None) -> pd.DataFrame:
def merge_informative_pair(
dataframe: pd.DataFrame,
informative: pd.DataFrame,
timeframe: str,
timeframe_inf: str,
ffill: bool = True,
append_timeframe: bool = True,
date_column: str = "date",
suffix: Optional[str] = None,
) -> pd.DataFrame:
"""
Correctly merge informative samples to the original dataframe, avoiding lookahead bias.
@@ -41,37 +46,39 @@ def merge_informative_pair(dataframe: pd.DataFrame, informative: pd.DataFrame,
minutes = timeframe_to_minutes(timeframe)
if minutes == minutes_inf:
# No need to forwardshift if the timeframes are identical
informative['date_merge'] = informative[date_column]
informative["date_merge"] = informative[date_column]
elif minutes < minutes_inf:
# Subtract "small" timeframe so merging is not delayed by 1 small candle
# Detailed explanation in https://github.com/freqtrade/freqtrade/issues/4073
if not informative.empty:
if timeframe_inf == '1M':
informative['date_merge'] = (
(informative[date_column] + pd.offsets.MonthBegin(1))
- pd.to_timedelta(minutes, 'm')
)
if timeframe_inf == "1M":
informative["date_merge"] = (
informative[date_column] + pd.offsets.MonthBegin(1)
) - pd.to_timedelta(minutes, "m")
else:
informative['date_merge'] = (
informative[date_column] + pd.to_timedelta(minutes_inf, 'm') -
pd.to_timedelta(minutes, 'm')
informative["date_merge"] = (
informative[date_column]
+ pd.to_timedelta(minutes_inf, "m")
- pd.to_timedelta(minutes, "m")
)
else:
informative['date_merge'] = informative[date_column]
informative["date_merge"] = informative[date_column]
else:
raise ValueError("Tried to merge a faster timeframe to a slower timeframe."
"This would create new rows, and can throw off your regular indicators.")
raise ValueError(
"Tried to merge a faster timeframe to a slower timeframe."
"This would create new rows, and can throw off your regular indicators."
)
# Rename columns to be unique
date_merge = 'date_merge'
date_merge = "date_merge"
if suffix and append_timeframe:
raise ValueError("You can not specify `append_timeframe` as True and a `suffix`.")
elif append_timeframe:
date_merge = f'date_merge_{timeframe_inf}'
date_merge = f"date_merge_{timeframe_inf}"
informative.columns = [f"{col}_{timeframe_inf}" for col in informative.columns]
elif suffix:
date_merge = f'date_merge_{suffix}'
date_merge = f"date_merge_{suffix}"
informative.columns = [f"{col}_{suffix}" for col in informative.columns]
# Combine the 2 dataframes
@@ -79,21 +86,25 @@ def merge_informative_pair(dataframe: pd.DataFrame, informative: pd.DataFrame,
if ffill:
# https://pandas.pydata.org/docs/user_guide/merging.html#timeseries-friendly-merging
# merge_ordered - ffill method is 2.5x faster than separate ffill()
dataframe = pd.merge_ordered(dataframe, informative, fill_method="ffill", left_on='date',
right_on=date_merge, how='left')
dataframe = pd.merge_ordered(
dataframe,
informative,
fill_method="ffill",
left_on="date",
right_on=date_merge,
how="left",
)
else:
dataframe = pd.merge(dataframe, informative, left_on='date',
right_on=date_merge, how='left')
dataframe = pd.merge(
dataframe, informative, left_on="date", right_on=date_merge, how="left"
)
dataframe = dataframe.drop(date_merge, axis=1)
return dataframe
def stoploss_from_open(
open_relative_stop: float,
current_profit: float,
is_short: bool = False,
leverage: float = 1.0
open_relative_stop: float, current_profit: float, is_short: bool = False, leverage: float = 1.0
) -> float:
"""
Given the current profit, and a desired stop loss value relative to the trade entry price,
@@ -129,8 +140,9 @@ def stoploss_from_open(
return max(stoploss * leverage, 0.0)
def stoploss_from_absolute(stop_rate: float, current_rate: float, is_short: bool = False,
leverage: float = 1.0) -> float:
def stoploss_from_absolute(
stop_rate: float, current_rate: float, is_short: bool = False, leverage: float = 1.0
) -> float:
"""
Given current price and desired stop price, return a stop loss value that is relative to current
price.

View File

@@ -9,7 +9,7 @@ from freqtrade.exceptions import StrategyError
logger = logging.getLogger(__name__)
F = TypeVar('F', bound=Callable[..., Any])
F = TypeVar("F", bound=Callable[..., Any])
def strategy_safe_wrapper(f: F, message: str = "", default_retval=None, supress_error=False) -> F:
@@ -18,27 +18,21 @@ def strategy_safe_wrapper(f: F, message: str = "", default_retval=None, supress_
Caches all exceptions and returns either the default_retval (if it's not None) or raises
a StrategyError exception, which then needs to be handled by the calling method.
"""
@wraps(f)
def wrapper(*args, **kwargs):
try:
if 'trade' in kwargs:
if "trade" in kwargs:
# Protect accidental modifications from within the strategy
kwargs['trade'] = deepcopy(kwargs['trade'])
kwargs["trade"] = deepcopy(kwargs["trade"])
return f(*args, **kwargs)
except ValueError as error:
logger.warning(
f"{message}"
f"Strategy caused the following exception: {error}"
f"{f}"
)
logger.warning(f"{message}" f"Strategy caused the following exception: {error}" f"{f}")
if default_retval is None and not supress_error:
raise StrategyError(str(error)) from error
return default_retval
except Exception as error:
logger.exception(
f"{message}"
f"Unexpected error {error} calling {f}"
)
logger.exception(f"{message}" f"Unexpected error {error} calling {f}")
if default_retval is None and not supress_error:
raise StrategyError(str(error)) from error
return default_retval

View File

@@ -8,41 +8,39 @@ from freqtrade.constants import Config
class StrategyUpdater:
name_mapping = {
'ticker_interval': 'timeframe',
'buy': 'enter_long',
'sell': 'exit_long',
'buy_tag': 'enter_tag',
'sell_reason': 'exit_reason',
'sell_signal': 'exit_signal',
'custom_sell': 'custom_exit',
'force_sell': 'force_exit',
'emergency_sell': 'emergency_exit',
"ticker_interval": "timeframe",
"buy": "enter_long",
"sell": "exit_long",
"buy_tag": "enter_tag",
"sell_reason": "exit_reason",
"sell_signal": "exit_signal",
"custom_sell": "custom_exit",
"force_sell": "force_exit",
"emergency_sell": "emergency_exit",
# Strategy/config settings:
'use_sell_signal': 'use_exit_signal',
'sell_profit_only': 'exit_profit_only',
'sell_profit_offset': 'exit_profit_offset',
'ignore_roi_if_buy_signal': 'ignore_roi_if_entry_signal',
'forcebuy_enable': 'force_entry_enable',
"use_sell_signal": "use_exit_signal",
"sell_profit_only": "exit_profit_only",
"sell_profit_offset": "exit_profit_offset",
"ignore_roi_if_buy_signal": "ignore_roi_if_entry_signal",
"forcebuy_enable": "force_entry_enable",
}
function_mapping = {
'populate_buy_trend': 'populate_entry_trend',
'populate_sell_trend': 'populate_exit_trend',
'custom_sell': 'custom_exit',
'check_buy_timeout': 'check_entry_timeout',
'check_sell_timeout': 'check_exit_timeout',
"populate_buy_trend": "populate_entry_trend",
"populate_sell_trend": "populate_exit_trend",
"custom_sell": "custom_exit",
"check_buy_timeout": "check_entry_timeout",
"check_sell_timeout": "check_exit_timeout",
# '': '',
}
# order_time_in_force, order_types, unfilledtimeout
otif_ot_unfilledtimeout = {
'buy': 'entry',
'sell': 'exit',
"buy": "entry",
"sell": "exit",
}
# create a dictionary that maps the old column names to the new ones
rename_dict = {'buy': 'enter_long', 'sell': 'exit_long', 'buy_tag': 'enter_tag'}
rename_dict = {"buy": "enter_long", "sell": "exit_long", "buy_tag": "enter_tag"}
def start(self, config: Config, strategy_obj: dict) -> None:
"""
@@ -51,12 +49,12 @@ class StrategyUpdater:
:return: None
"""
source_file = strategy_obj['location']
strategies_backup_folder = Path.joinpath(config['user_data_dir'], "strategies_orig_updater")
target_file = Path.joinpath(strategies_backup_folder, strategy_obj['location_rel'])
source_file = strategy_obj["location"]
strategies_backup_folder = Path.joinpath(config["user_data_dir"], "strategies_orig_updater")
target_file = Path.joinpath(strategies_backup_folder, strategy_obj["location_rel"])
# read the file
with Path(source_file).open('r') as f:
with Path(source_file).open("r") as f:
old_code = f.read()
if not strategies_backup_folder.is_dir():
Path(strategies_backup_folder).mkdir(parents=True, exist_ok=True)
@@ -70,7 +68,7 @@ class StrategyUpdater:
# update the code
new_code = self.update_code(old_code)
# write the modified code to the destination folder
with Path(source_file).open('w') as f:
with Path(source_file).open("w") as f:
f.write(new_code)
# define the function to update the code
@@ -106,7 +104,6 @@ class StrategyUpdater:
# Here we go through each respective node, slice, elt, key ... to replace outdated entries.
class NameUpdater(ast_comments.NodeTransformer):
def generic_visit(self, node):
# space is not yet transferred from buy/sell to entry/exit and thereby has to be skipped.
if isinstance(node, ast_comments.keyword):
if node.arg == "space":
@@ -180,37 +177,38 @@ class NameUpdater(ast_comments.NodeTransformer):
def visit_Attribute(self, node):
if (
isinstance(node.value, ast_comments.Name)
and node.value.id == 'trade'
and node.attr == 'nr_of_successful_buys'
isinstance(node.value, ast_comments.Name)
and node.value.id == "trade"
and node.attr == "nr_of_successful_buys"
):
node.attr = 'nr_of_successful_entries'
node.attr = "nr_of_successful_entries"
return node
def visit_ClassDef(self, node):
# check if the class is derived from IStrategy
if any(isinstance(base, ast_comments.Name) and
base.id == 'IStrategy' for base in node.bases):
if any(
isinstance(base, ast_comments.Name) and base.id == "IStrategy" for base in node.bases
):
# check if the INTERFACE_VERSION variable exists
has_interface_version = any(
isinstance(child, ast_comments.Assign) and
isinstance(child.targets[0], ast_comments.Name) and
child.targets[0].id == 'INTERFACE_VERSION'
isinstance(child, ast_comments.Assign)
and isinstance(child.targets[0], ast_comments.Name)
and child.targets[0].id == "INTERFACE_VERSION"
for child in node.body
)
# if the INTERFACE_VERSION variable does not exist, add it as the first child
if not has_interface_version:
node.body.insert(0, ast_comments.parse('INTERFACE_VERSION = 3').body[0])
node.body.insert(0, ast_comments.parse("INTERFACE_VERSION = 3").body[0])
# otherwise, update its value to 3
else:
for child in node.body:
if (
isinstance(child, ast_comments.Assign)
and isinstance(child.targets[0], ast_comments.Name)
and child.targets[0].id == 'INTERFACE_VERSION'
isinstance(child, ast_comments.Assign)
and isinstance(child.targets[0], ast_comments.Name)
and child.targets[0].id == "INTERFACE_VERSION"
):
child.value = ast_comments.parse('3').body[0].value
child.value = ast_comments.parse("3").body[0].value
self.generic_visit(node)
return node