fix formatting

This commit is contained in:
viotemp1
2025-03-25 15:07:09 +02:00
parent 62f05964b4
commit c5088e6b66
4 changed files with 48 additions and 43 deletions

View File

@@ -171,7 +171,9 @@ class Hyperopt:
asked.append(self.opt.ask(dimensions))
return asked
def get_asked_points(self, n_points: int, dimensions: dict) -> tuple[list[list[Any]], list[bool]]:
def get_asked_points(
self, n_points: int, dimensions: dict
) -> tuple[list[list[Any]], list[bool]]:
"""
Enforce points returned from `self.opt.ask` have not been already evaluated
@@ -197,20 +199,19 @@ class Hyperopt:
while i < 5 and len(asked_non_tried) < n_points:
if i < 3:
self.opt.cache_ = {}
asked = unique_list(self.get_optuna_asked_points(n_points=n_points * 5 if i > 0 else n_points,
dimensions=dimensions))
asked = unique_list(
self.get_optuna_asked_points(
n_points=n_points * 5 if i > 0 else n_points, dimensions=dimensions
)
)
is_random = [False for _ in range(len(asked))]
else:
asked = unique_list(self.opt.space.rvs(n_samples=n_points * 5))
is_random = [True for _ in range(len(asked))]
is_random_non_tried += [
rand
for x, rand in zip(asked, is_random, strict=False)
if x not in asked_non_tried
]
asked_non_tried += [
x for x in asked if x not in asked_non_tried
rand for x, rand in zip(asked, is_random, strict=False) if x not in asked_non_tried
]
asked_non_tried += [x for x in asked if x not in asked_non_tried]
i += 1
if asked_non_tried:
@@ -219,7 +220,9 @@ class Hyperopt:
is_random_non_tried[: min(len(asked_non_tried), n_points)],
)
else:
return self.get_optuna_asked_points(n_points=n_points, dimensions=dimensions), [False for _ in range(n_points)]
return self.get_optuna_asked_points(n_points=n_points, dimensions=dimensions), [
False for _ in range(n_points)
]
def evaluate_result(self, val: dict[str, Any], current: int, is_random: bool):
"""
@@ -281,7 +284,9 @@ class Hyperopt:
if self.analyze_per_epoch:
# First analysis not in parallel mode when using --analyze-per-epoch.
# This allows dataprovider to load it's informative cache.
asked, is_random = self.get_asked_points(n_points=1, dimensions=self.hyperopter.o_dimensions)
asked, is_random = self.get_asked_points(
n_points=1, dimensions=self.hyperopter.o_dimensions
)
f_val0 = self.hyperopter.generate_optimizer(asked[0].params)
self.opt.tell(asked[0], [f_val0["loss"]])
self.evaluate_result(f_val0, 1, is_random[0])
@@ -296,9 +301,12 @@ class Hyperopt:
current_jobs = jobs - n_rest if n_rest > 0 else jobs
asked, is_random = self.get_asked_points(
n_points=current_jobs, dimensions=self.hyperopter.o_dimensions)
f_val = self.run_optimizer_parallel(parallel, [asked1.params for asked1 in asked])
for o_ask, v in zip(asked, f_val):
n_points=current_jobs, dimensions=self.hyperopter.o_dimensions
)
f_val = self.run_optimizer_parallel(
parallel, [asked1.params for asked1 in asked]
)
for o_ask, v in zip(asked, f_val, strict=False):
self.opt.tell(o_ask, v["loss"])
# self.opt.tell(asked, [v["loss"] for v in f_val])

View File

@@ -49,7 +49,7 @@ class IHyperOpt(ABC):
inheriting from BaseSampler (from optuna.samplers).
"""
return "NSGAIISampler"
def generate_roi_table(self, params: dict) -> dict[int, float]:
"""
Create a ROI table.

View File

@@ -30,15 +30,15 @@ from freqtrade.optimize.optimize_reports import generate_strategy_stats
from freqtrade.resolvers.hyperopt_resolver import HyperOptLossResolver
from freqtrade.util.dry_run_wallet import get_dry_run_wallet
# Suppress scikit-learn FutureWarnings from skopt
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
# from skopt import Optimizer
from freqtrade.optimize.space.decimalspace import SKDecimal
from skopt.space import Categorical, Integer, Real
import optuna
import optunahub
from skopt.space import Dimension
from skopt.space import Categorical, Dimension, Integer, Real
from freqtrade.optimize.space.decimalspace import SKDecimal
logger = logging.getLogger(__name__)
@@ -61,7 +61,7 @@ class HyperOptimizer:
self.trailing_space: list[Dimension] = []
self.max_open_trades_space: list[Dimension] = []
self.dimensions: list[Dimension] = []
self.o_dimensions: Dict = {}
self.o_dimensions: dict = {}
self.config = config
self.min_date: datetime
@@ -131,7 +131,9 @@ class HyperOptimizer:
self.hyperopt_pickle_magic(modules.__bases__)
def _get_params_dict(
self, dimensions: list[Dimension], raw_params: dict[str, Any] # list[Any]
self,
dimensions: list[Dimension],
raw_params: dict[str, Any],
) -> dict[str, Any]:
# logger.info(f"_get_params_dict: {raw_params}")
# Ensure the number of dimensions match
@@ -253,7 +255,7 @@ class HyperOptimizer:
# noinspection PyProtectedMember
attr.value = params_dict[attr_name]
def generate_optimizer(self, raw_params: list[Any]) -> dict[str, Any]:
def generate_optimizer(self, raw_params: dict[str, Any]) -> dict[str, Any]: # list[Any]
"""
Used Optimize function.
Called once per epoch to optimize whatever is configured.
@@ -385,41 +387,32 @@ class HyperOptimizer:
"total_profit": total_profit,
}
def convert_dimensions_to_optuna_space(self, s_dimensions: list[Dimension]) -> dict:
o_dimensions = {}
for original_dim in s_dimensions:
if type(original_dim) == Integer: # isinstance(original_dim, Integer):
if isinstance(original_dim, Integer):
o_dimensions[original_dim.name] = optuna.distributions.IntDistribution(
original_dim.low, original_dim.high, log=False, step=1
)
elif (
type(original_dim) == SKDecimal
):
elif isinstance(original_dim, SKDecimal):
o_dimensions[original_dim.name] = optuna.distributions.FloatDistribution(
original_dim.low_orig,
original_dim.high_orig,
log=False,
step=1 / pow(10, original_dim.decimals)
step=1 / pow(10, original_dim.decimals),
)
elif (
type(original_dim) == Real
):
elif isinstance(original_dim, Real):
o_dimensions[original_dim.name] = optuna.distributions.FloatDistribution(
original_dim.low,
original_dim.high,
log=False,
)
elif (
type(original_dim) == Categorical
):
elif isinstance(original_dim, Categorical):
o_dimensions[original_dim.name] = optuna.distributions.CategoricalDistribution(
list(original_dim.bounds)
)
else:
raise Exception(
f"Unknown search space {original_dim} / {type(original_dim)}"
)
raise Exception(f"Unknown search space {original_dim} / {type(original_dim)}")
# logger.info(f"convert_dimensions_to_optuna_space: {s_dimensions} - {o_dimensions}")
return o_dimensions
@@ -427,7 +420,6 @@ class HyperOptimizer:
self,
random_state: int,
):
o_sampler = self.custom_hyperopt.generate_estimator(dimensions=self.dimensions)
self.o_dimensions = self.convert_dimensions_to_optuna_space(self.dimensions)
@@ -437,9 +429,14 @@ class HyperOptimizer:
# restored_sampler = pickle.load(open("sampler.pkl", "rb"))
if isinstance(o_sampler, str):
if o_sampler not in ("TPESampler", "GPSampler", "CmaEsSampler",
"NSGAIISampler", "NSGAIIISampler", "QMCSampler"
):
if o_sampler not in (
"TPESampler",
"GPSampler",
"CmaEsSampler",
"NSGAIISampler",
"NSGAIIISampler",
"QMCSampler",
):
raise OperationalException(f"Optuna Sampler {o_sampler} not supported.")
if o_sampler == "TPESampler":

View File

@@ -607,8 +607,8 @@ def test_generate_optimizer(mocker, hyperopt_conf) -> None:
hyperopt.hyperopter.max_date = dt_utc(2017, 12, 13)
hyperopt.hyperopter.init_spaces()
generate_optimizer_value = hyperopt.hyperopter.generate_optimizer(optimizer_param)
# list(optimizer_param.values())
# list(optimizer_param.values())
assert generate_optimizer_value == response_expected