diff --git a/freqtrade/optimize/analysis/lookahead.py b/freqtrade/optimize/analysis/lookahead.py index a8eb0258e..3daad027b 100755 --- a/freqtrade/optimize/analysis/lookahead.py +++ b/freqtrade/optimize/analysis/lookahead.py @@ -3,7 +3,7 @@ import shutil from copy import deepcopy from datetime import datetime, timedelta from pathlib import Path -from typing import Any, Dict, List +from typing import Any from pandas import DataFrame @@ -25,16 +25,16 @@ class Analysis: self.total_signals = 0 self.false_entry_signals = 0 self.false_exit_signals = 0 - self.false_indicators: List[str] = [] + self.false_indicators: list[str] = [] self.has_bias = False class LookaheadAnalysis(BaseAnalysis): - def __init__(self, config: Dict[str, Any], strategy_obj: Dict): + def __init__(self, config: dict[str, Any], strategy_obj: dict): super().__init__(config, strategy_obj) - self.entry_varHolders: List[VarHolder] = [] - self.exit_varHolders: List[VarHolder] = [] + self.entry_varHolders: list[VarHolder] = [] + self.exit_varHolders: list[VarHolder] = [] self.current_analysis = Analysis() self.minimum_trade_amount = config["minimum_trade_amount"] @@ -99,7 +99,7 @@ class LookaheadAnalysis(BaseAnalysis): f"{str(self_value)} != {str(other_value)}" ) - def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]): + def prepare_data(self, varholder: VarHolder, pairs_to_load: list[DataFrame]): if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]: # purge previous data if the freqai model is defined # (to be sure nothing is carried over from older backtests) diff --git a/freqtrade/optimize/analysis/lookahead_helpers.py b/freqtrade/optimize/analysis/lookahead_helpers.py index 730f9fd72..dccf2cb73 100644 --- a/freqtrade/optimize/analysis/lookahead_helpers.py +++ b/freqtrade/optimize/analysis/lookahead_helpers.py @@ -1,7 +1,7 @@ import logging import time from pathlib import Path -from typing import Any, Dict, List, Union +from typing import Any, Union import pandas as pd from rich.text import Text @@ -19,8 +19,8 @@ logger = logging.getLogger(__name__) class LookaheadAnalysisSubFunctions: @staticmethod def text_table_lookahead_analysis_instances( - config: Dict[str, Any], - lookahead_instances: List[LookaheadAnalysis], + config: dict[str, Any], + lookahead_instances: list[LookaheadAnalysis], caption: Union[str, None] = None, ): headers = [ @@ -73,7 +73,7 @@ class LookaheadAnalysisSubFunctions: return data @staticmethod - def export_to_csv(config: Dict[str, Any], lookahead_analysis: List[LookaheadAnalysis]): + def export_to_csv(config: dict[str, Any], lookahead_analysis: list[LookaheadAnalysis]): def add_or_update_row(df, row_data): if ( (df["filename"] == row_data["filename"]) & (df["strategy"] == row_data["strategy"]) @@ -198,7 +198,7 @@ class LookaheadAnalysisSubFunctions: return config @staticmethod - def initialize_single_lookahead_analysis(config: Config, strategy_obj: Dict[str, Any]): + def initialize_single_lookahead_analysis(config: Config, strategy_obj: dict[str, Any]): logger.info(f"Bias test of {Path(strategy_obj['location']).name} started.") start = time.perf_counter() current_instance = LookaheadAnalysis(config, strategy_obj) diff --git a/freqtrade/optimize/analysis/recursive.py b/freqtrade/optimize/analysis/recursive.py index e6f7e4152..a57c4c630 100644 --- a/freqtrade/optimize/analysis/recursive.py +++ b/freqtrade/optimize/analysis/recursive.py @@ -3,7 +3,7 @@ import shutil from copy import deepcopy from datetime import timedelta from pathlib import Path -from typing import Any, Dict, List +from typing import Any from pandas import DataFrame @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) class RecursiveAnalysis(BaseAnalysis): - def __init__(self, config: Dict[str, Any], strategy_obj: Dict): + def __init__(self, config: dict[str, Any], strategy_obj: dict): self._startup_candle = list( map(int, config.get("startup_candle", [199, 399, 499, 999, 1999])) ) @@ -35,10 +35,10 @@ class RecursiveAnalysis(BaseAnalysis): self._startup_candle.append(self._strat_scc) self._startup_candle.sort() - self.partial_varHolder_array: List[VarHolder] = [] - self.partial_varHolder_lookahead_array: List[VarHolder] = [] + self.partial_varHolder_array: list[VarHolder] = [] + self.partial_varHolder_lookahead_array: list[VarHolder] = [] - self.dict_recursive: Dict[str, Any] = dict() + self.dict_recursive: dict[str, Any] = dict() # For recursive bias check # analyzes two data frames with processed indicators and shows differences between them. @@ -114,7 +114,7 @@ class RecursiveAnalysis(BaseAnalysis): else: logger.info("No lookahead bias on indicators found.") - def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]): + def prepare_data(self, varholder: VarHolder, pairs_to_load: list[DataFrame]): if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]: # purge previous data if the freqai model is defined # (to be sure nothing is carried over from older backtests) diff --git a/freqtrade/optimize/analysis/recursive_helpers.py b/freqtrade/optimize/analysis/recursive_helpers.py index 474604923..5877a5864 100644 --- a/freqtrade/optimize/analysis/recursive_helpers.py +++ b/freqtrade/optimize/analysis/recursive_helpers.py @@ -1,7 +1,7 @@ import logging import time from pathlib import Path -from typing import Any, Dict, List +from typing import Any from freqtrade.constants import Config from freqtrade.exceptions import OperationalException @@ -15,7 +15,7 @@ logger = logging.getLogger(__name__) class RecursiveAnalysisSubFunctions: @staticmethod - def text_table_recursive_analysis_instances(recursive_instances: List[RecursiveAnalysis]): + def text_table_recursive_analysis_instances(recursive_instances: list[RecursiveAnalysis]): startups = recursive_instances[0]._startup_candle strat_scc = recursive_instances[0]._strat_scc headers = ["Indicators"] @@ -63,7 +63,7 @@ class RecursiveAnalysisSubFunctions: return config @staticmethod - def initialize_single_recursive_analysis(config: Config, strategy_obj: Dict[str, Any]): + def initialize_single_recursive_analysis(config: Config, strategy_obj: dict[str, Any]): logger.info(f"Recursive test of {Path(strategy_obj['location']).name} started.") start = time.perf_counter() current_instance = RecursiveAnalysis(config, strategy_obj) diff --git a/freqtrade/optimize/backtesting.py b/freqtrade/optimize/backtesting.py index 7d0428939..2ab873bca 100644 --- a/freqtrade/optimize/backtesting.py +++ b/freqtrade/optimize/backtesting.py @@ -8,7 +8,7 @@ import logging from collections import defaultdict from copy import deepcopy from datetime import datetime, timedelta, timezone -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Optional from numpy import nan from pandas import DataFrame @@ -118,13 +118,13 @@ class Backtesting: self.order_id_counter: int = 0 config["dry_run"] = True - self.run_ids: Dict[str, str] = {} - self.strategylist: List[IStrategy] = [] - self.all_results: Dict[str, Dict] = {} - self.processed_dfs: Dict[str, Dict] = {} - self.rejected_dict: Dict[str, List] = {} - self.rejected_df: Dict[str, Dict] = {} - self.exited_dfs: Dict[str, Dict] = {} + self.run_ids: dict[str, str] = {} + self.strategylist: list[IStrategy] = [] + self.all_results: dict[str, dict] = {} + self.processed_dfs: dict[str, dict] = {} + self.rejected_dict: dict[str, list] = {} + self.rejected_df: dict[str, dict] = {} + self.exited_dfs: dict[str, dict] = {} self._exchange_name = self.config["exchange"]["name"] if not exchange: @@ -246,8 +246,8 @@ class Backtesting: else: self.timeframe_detail_td = timedelta(seconds=0) - self.detail_data: Dict[str, DataFrame] = {} - self.futures_data: Dict[str, DataFrame] = {} + self.detail_data: dict[str, DataFrame] = {} + self.futures_data: dict[str, DataFrame] = {} def init_backtest(self): self.prepare_backtest(False) @@ -278,7 +278,7 @@ class Backtesting: if self.config.get("enable_protections", False): self.protections = ProtectionManager(self.config, strategy.protections) - def load_bt_data(self) -> Tuple[Dict[str, DataFrame], TimeRange]: + def load_bt_data(self) -> tuple[dict[str, DataFrame], TimeRange]: """ Loads backtest data and returns the data combined with the timerange as tuple. @@ -408,7 +408,7 @@ class Backtesting: self.abort = False raise DependencyException("Stop requested") - def _get_ohlcv_as_lists(self, processed: Dict[str, DataFrame]) -> Dict[str, Tuple]: + def _get_ohlcv_as_lists(self, processed: dict[str, DataFrame]) -> dict[str, tuple]: """ Helper function to convert a processed dataframes into lists for performance reasons. @@ -418,7 +418,7 @@ class Backtesting: optimize memory usage! """ - data: Dict = {} + data: dict = {} self.progress.init_step(BacktestState.CONVERT, len(processed)) # Create dict with data @@ -466,7 +466,7 @@ class Backtesting: return data def _get_close_rate( - self, row: Tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int + self, row: tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int ) -> float: """ Get close rate for backtesting result @@ -484,7 +484,7 @@ class Backtesting: return row[OPEN_IDX] def _get_close_rate_for_stoploss( - self, row: Tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int + self, row: tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int ) -> float: # our stoploss was already lower than candle high, # possibly due to a cancelled trade exit. @@ -538,7 +538,7 @@ class Backtesting: return stoploss_value def _get_close_rate_for_roi( - self, row: Tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int + self, row: tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int ) -> float: is_short = trade.is_short or False leverage = trade.leverage or 1.0 @@ -601,7 +601,7 @@ class Backtesting: return row[OPEN_IDX] def _get_adjust_trade_entry_for_candle( - self, trade: LocalTrade, row: Tuple, current_time: datetime + self, trade: LocalTrade, row: tuple, current_time: datetime ) -> LocalTrade: current_rate: float = row[OPEN_IDX] current_profit = trade.calc_profit_ratio(current_rate) @@ -669,7 +669,7 @@ class Backtesting: return trade - def _get_order_filled(self, rate: float, row: Tuple) -> bool: + def _get_order_filled(self, rate: float, row: tuple) -> bool: """Rate is within candle, therefore filled""" return row[LOW_IDX] <= rate <= row[HIGH_IDX] @@ -685,7 +685,7 @@ class Backtesting: ) def _try_close_open_order( - self, order: Optional[Order], trade: LocalTrade, current_date: datetime, row: Tuple + self, order: Optional[Order], trade: LocalTrade, current_date: datetime, row: tuple ) -> bool: """ Check if an order is open and if it should've filled. @@ -719,7 +719,7 @@ class Backtesting: return False def _process_exit_order( - self, order: Order, trade: LocalTrade, current_time: datetime, row: Tuple, pair: str + self, order: Order, trade: LocalTrade, current_time: datetime, row: tuple, pair: str ): """ Takes an exit order and processes it, potentially closing the trade. @@ -740,7 +740,7 @@ class Backtesting: def _get_exit_for_signal( self, trade: LocalTrade, - row: Tuple, + row: tuple, exit_: ExitCheckTuple, current_time: datetime, amount: Optional[float] = None, @@ -820,7 +820,7 @@ class Backtesting: def _exit_trade( self, trade: LocalTrade, - sell_row: Tuple, + sell_row: tuple, close_rate: float, amount: float, exit_reason: Optional[str], @@ -859,7 +859,7 @@ class Backtesting: return trade def _check_trade_exit( - self, trade: LocalTrade, row: Tuple, current_time: datetime + self, trade: LocalTrade, row: tuple, current_time: datetime ) -> Optional[LocalTrade]: self._run_funding_fees(trade, current_time) @@ -905,7 +905,7 @@ class Backtesting: def get_valid_price_and_stake( self, pair: str, - row: Tuple, + row: tuple, propose_rate: float, stake_amount: float, direction: LongShort, @@ -914,7 +914,7 @@ class Backtesting: trade: Optional[LocalTrade], order_type: str, price_precision: Optional[float], - ) -> Tuple[float, float, float, float]: + ) -> tuple[float, float, float, float]: if order_type == "limit": new_rate = strategy_safe_wrapper( self.strategy.custom_entry_price, default_retval=propose_rate @@ -1003,7 +1003,7 @@ class Backtesting: def _enter_trade( self, pair: str, - row: Tuple, + row: tuple, direction: LongShort, stake_amount: Optional[float] = None, trade: Optional[LocalTrade] = None, @@ -1150,7 +1150,7 @@ class Backtesting: return trade def handle_left_open( - self, open_trades: Dict[str, List[LocalTrade]], data: Dict[str, List[Tuple]] + self, open_trades: dict[str, list[LocalTrade]], data: dict[str, list[tuple]] ) -> None: """ Handling of left open trades at the end of backtesting @@ -1197,7 +1197,7 @@ class Backtesting: self.protections.stop_per_pair(pair, current_time, side) self.protections.global_stop(current_time, side) - def manage_open_orders(self, trade: LocalTrade, current_time: datetime, row: Tuple) -> bool: + def manage_open_orders(self, trade: LocalTrade, current_time: datetime, row: tuple) -> bool: """ Check if any open order needs to be cancelled or replaced. Returns True if the trade should be deleted. @@ -1246,7 +1246,7 @@ class Backtesting: return None def check_order_replace( - self, trade: LocalTrade, order: Order, current_time, row: Tuple + self, trade: LocalTrade, order: Order, current_time, row: tuple ) -> bool: """ Check if current analyzed entry order has to be replaced and do so. @@ -1297,8 +1297,8 @@ class Backtesting: return False def validate_row( - self, data: Dict, pair: str, row_index: int, current_time: datetime - ) -> Optional[Tuple]: + self, data: dict, pair: str, row_index: int, current_time: datetime + ) -> Optional[tuple]: try: # Row is treated as "current incomplete candle". # entry / exit signals are shifted by 1 to compensate for this. @@ -1329,7 +1329,7 @@ class Backtesting: def backtest_loop( self, - row: Tuple, + row: tuple, pair: str, current_time: datetime, end_date: datetime, @@ -1383,7 +1383,7 @@ class Backtesting: self._process_exit_order(order, trade, current_time, row, pair) def time_pair_generator( - self, start_date: datetime, end_date: datetime, increment: timedelta, pairs: List[str] + self, start_date: datetime, end_date: datetime, increment: timedelta, pairs: list[str] ): """ Backtest time and pair generator @@ -1404,7 +1404,7 @@ class Backtesting: self.progress.increment() current_time += increment - def backtest(self, processed: Dict, start_date: datetime, end_date: datetime) -> Dict[str, Any]: + def backtest(self, processed: dict, start_date: datetime, end_date: datetime) -> dict[str, Any]: """ Implement backtesting functionality @@ -1423,10 +1423,10 @@ class Backtesting: self.wallets.update() # Use dict of lists with data for performance # (looping lists is a lot faster than pandas DataFrames) - data: Dict = self._get_ohlcv_as_lists(processed) + data: dict = self._get_ohlcv_as_lists(processed) # Indexes per pair, so some pairs are allowed to have a missing start. - indexes: Dict = defaultdict(int) + indexes: dict = defaultdict(int) # Loop timerange and get candle for each pair at that point in time for current_time, pair, is_first in self.time_pair_generator( @@ -1510,7 +1510,7 @@ class Backtesting: } def backtest_one_strategy( - self, strat: IStrategy, data: Dict[str, DataFrame], timerange: TimeRange + self, strat: IStrategy, data: dict[str, DataFrame], timerange: TimeRange ): self.progress.init_step(BacktestState.ANALYZE, 0) strategy_name = strat.get_strategy_name() @@ -1605,7 +1605,7 @@ class Backtesting: """ Run backtesting end-to-end """ - data: Dict[str, DataFrame] = {} + data: dict[str, DataFrame] = {} data, timerange = self.load_bt_data() self.load_bt_data_detail() diff --git a/freqtrade/optimize/base_analysis.py b/freqtrade/optimize/base_analysis.py index 2503ede72..e3e475742 100644 --- a/freqtrade/optimize/base_analysis.py +++ b/freqtrade/optimize/base_analysis.py @@ -1,7 +1,7 @@ import logging from copy import deepcopy from datetime import datetime, timezone -from typing import Any, Dict, Optional +from typing import Any, Optional from pandas import DataFrame @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) class VarHolder: timerange: TimeRange data: DataFrame - indicators: Dict[str, DataFrame] + indicators: dict[str, DataFrame] result: DataFrame compared: DataFrame from_dt: datetime @@ -25,7 +25,7 @@ class VarHolder: class BaseAnalysis: - def __init__(self, config: Dict[str, Any], strategy_obj: Dict): + def __init__(self, config: dict[str, Any], strategy_obj: dict): self.failed_bias_check = True self.full_varHolder = VarHolder() self.exchange: Optional[Any] = None diff --git a/freqtrade/optimize/hyperopt.py b/freqtrade/optimize/hyperopt.py index 428a5cddd..daa4661ad 100644 --- a/freqtrade/optimize/hyperopt.py +++ b/freqtrade/optimize/hyperopt.py @@ -11,7 +11,7 @@ import warnings from datetime import datetime, timezone from math import ceil from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Optional import rapidjson from joblib import Parallel, cpu_count, delayed, dump, load, wrap_non_picklable_objects @@ -70,14 +70,14 @@ class Hyperopt: """ def __init__(self, config: Config) -> None: - self.buy_space: List[Dimension] = [] - self.sell_space: List[Dimension] = [] - self.protection_space: List[Dimension] = [] - self.roi_space: List[Dimension] = [] - self.stoploss_space: List[Dimension] = [] - self.trailing_space: List[Dimension] = [] - self.max_open_trades_space: List[Dimension] = [] - self.dimensions: List[Dimension] = [] + self.buy_space: list[Dimension] = [] + self.sell_space: list[Dimension] = [] + self.protection_space: list[Dimension] = [] + self.roi_space: list[Dimension] = [] + self.stoploss_space: list[Dimension] = [] + self.trailing_space: list[Dimension] = [] + self.max_open_trades_space: list[Dimension] = [] + self.dimensions: list[Dimension] = [] self._hyper_out: HyperoptOutput = HyperoptOutput(streaming=True) @@ -125,7 +125,7 @@ class Hyperopt: self.market_change = 0.0 self.num_epochs_saved = 0 - self.current_best_epoch: Optional[Dict[str, Any]] = None + self.current_best_epoch: Optional[dict[str, Any]] = None # Use max_open_trades for hyperopt as well, except --disable-max-market-positions is set if not self.config.get("use_max_market_positions", True): @@ -168,8 +168,8 @@ class Hyperopt: self.hyperopt_pickle_magic(modules.__bases__) def _get_params_dict( - self, dimensions: List[Dimension], raw_params: List[Any] - ) -> Dict[str, Any]: + self, dimensions: list[Dimension], raw_params: list[Any] + ) -> dict[str, Any]: # Ensure the number of dimensions match # the number of parameters in the list. if len(raw_params) != len(dimensions): @@ -179,7 +179,7 @@ class Hyperopt: # and the values are taken from the list of parameters. return {d.name: v for d, v in zip(dimensions, raw_params)} - def _save_result(self, epoch: Dict) -> None: + def _save_result(self, epoch: dict) -> None: """ Save hyperopt results to file Store one line per epoch. @@ -205,11 +205,11 @@ class Hyperopt: latest_filename = Path.joinpath(self.results_file.parent, LAST_BT_RESULT_FN) file_dump_json(latest_filename, {"latest_hyperopt": str(self.results_file.name)}, log=False) - def _get_params_details(self, params: Dict) -> Dict: + def _get_params_details(self, params: dict) -> dict: """ Return the params for each space """ - result: Dict = {} + result: dict = {} if HyperoptTools.has_space(self.config, "buy"): result["buy"] = {p.name: params.get(p.name) for p in self.buy_space} @@ -236,11 +236,11 @@ class Hyperopt: return result - def _get_no_optimize_details(self) -> Dict[str, Any]: + def _get_no_optimize_details(self) -> dict[str, Any]: """ Get non-optimized parameters """ - result: Dict[str, Any] = {} + result: dict[str, Any] = {} strategy = self.backtesting.strategy if not HyperoptTools.has_space(self.config, "roi"): result["roi"] = {str(k): v for k, v in strategy.minimal_roi.items()} @@ -257,7 +257,7 @@ class Hyperopt: result["max_open_trades"] = {"max_open_trades": strategy.max_open_trades} return result - def print_results(self, results: Dict[str, Any]) -> None: + def print_results(self, results: dict[str, Any]) -> None: """ Log results if it is better than any previous evaluation TODO: this should be moved to HyperoptTools too @@ -318,7 +318,7 @@ class Hyperopt: + self.max_open_trades_space ) - def assign_params(self, params_dict: Dict[str, Any], category: str) -> None: + def assign_params(self, params_dict: dict[str, Any], category: str) -> None: """ Assign hyperoptable parameters """ @@ -327,7 +327,7 @@ class Hyperopt: # noinspection PyProtectedMember attr.value = params_dict[attr_name] - def generate_optimizer(self, raw_params: List[Any]) -> Dict[str, Any]: + def generate_optimizer(self, raw_params: list[Any]) -> dict[str, Any]: """ Used Optimize function. Called once per epoch to optimize whatever is configured. @@ -406,12 +406,12 @@ class Hyperopt: def _get_results_dict( self, - backtesting_results: Dict[str, Any], + backtesting_results: dict[str, Any], min_date: datetime, max_date: datetime, - params_dict: Dict[str, Any], - processed: Dict[str, DataFrame], - ) -> Dict[str, Any]: + params_dict: dict[str, Any], + processed: dict[str, DataFrame], + ) -> dict[str, Any]: params_details = self._get_params_details(params_dict) strat_stats = generate_strategy_stats( @@ -458,7 +458,7 @@ class Hyperopt: "total_profit": total_profit, } - def get_optimizer(self, dimensions: List[Dimension], cpu_count) -> Optimizer: + def get_optimizer(self, dimensions: list[Dimension], cpu_count) -> Optimizer: estimator = self.custom_hyperopt.generate_estimator(dimensions=dimensions) acq_optimizer = "sampling" @@ -479,7 +479,7 @@ class Hyperopt: model_queue_size=SKOPT_MODEL_QUEUE_SIZE, ) - def run_optimizer_parallel(self, parallel: Parallel, asked: List[List]) -> List[Dict[str, Any]]: + def run_optimizer_parallel(self, parallel: Parallel, asked: list[list]) -> list[dict[str, Any]]: """Start optimizer in a parallel way""" return parallel( delayed(wrap_non_picklable_objects(self.generate_optimizer))(v) for v in asked @@ -488,7 +488,7 @@ class Hyperopt: def _set_random_state(self, random_state: Optional[int]) -> int: return random_state or random.randint(1, 2**16 - 1) # noqa: S311 - def advise_and_trim(self, data: Dict[str, DataFrame]) -> Dict[str, DataFrame]: + def advise_and_trim(self, data: dict[str, DataFrame]) -> dict[str, DataFrame]: preprocessed = self.backtesting.strategy.advise_all_indicators(data) # Trim startup period from analyzed dataframe to get correct dates for output. @@ -524,7 +524,7 @@ class Hyperopt: else: dump(data, self.data_pickle_file) - def get_asked_points(self, n_points: int) -> Tuple[List[List[Any]], List[bool]]: + def get_asked_points(self, n_points: int) -> tuple[list[list[Any]], list[bool]]: """ Enforce points returned from `self.opt.ask` have not been already evaluated @@ -545,8 +545,8 @@ class Hyperopt: return new_list i = 0 - asked_non_tried: List[List[Any]] = [] - is_random_non_tried: List[bool] = [] + asked_non_tried: list[list[Any]] = [] + is_random_non_tried: list[bool] = [] while i < 5 and len(asked_non_tried) < n_points: if i < 3: self.opt.cache_ = {} @@ -573,7 +573,7 @@ class Hyperopt: else: return self.opt.ask(n_points=n_points), [False for _ in range(n_points)] - def evaluate_result(self, val: Dict[str, Any], current: int, is_random: bool): + def evaluate_result(self, val: dict[str, Any], current: int, is_random: bool): """ Evaluate results returned from generate_optimizer """ diff --git a/freqtrade/optimize/hyperopt_auto.py b/freqtrade/optimize/hyperopt_auto.py index cf0103162..9d2f89dc7 100644 --- a/freqtrade/optimize/hyperopt_auto.py +++ b/freqtrade/optimize/hyperopt_auto.py @@ -6,7 +6,7 @@ This module implements a convenience auto-hyperopt class, which can be used toge import logging from contextlib import suppress -from typing import Callable, Dict, List +from typing import Callable from freqtrade.exceptions import OperationalException @@ -59,7 +59,7 @@ class HyperOptAuto(IHyperOpt): if attr.optimize: yield attr.get_space(attr_name) - def _get_indicator_space(self, category) -> List: + def _get_indicator_space(self, category) -> list: # TODO: is this necessary, or can we call "generate_space" directly? indicator_space = list(self._generate_indicator_space(category)) if len(indicator_space) > 0: @@ -70,32 +70,32 @@ class HyperOptAuto(IHyperOpt): ) return [] - def buy_indicator_space(self) -> List["Dimension"]: + def buy_indicator_space(self) -> list["Dimension"]: return self._get_indicator_space("buy") - def sell_indicator_space(self) -> List["Dimension"]: + def sell_indicator_space(self) -> list["Dimension"]: return self._get_indicator_space("sell") - def protection_space(self) -> List["Dimension"]: + def protection_space(self) -> list["Dimension"]: return self._get_indicator_space("protection") - def generate_roi_table(self, params: Dict) -> Dict[int, float]: + def generate_roi_table(self, params: dict) -> dict[int, float]: return self._get_func("generate_roi_table")(params) - def roi_space(self) -> List["Dimension"]: + def roi_space(self) -> list["Dimension"]: return self._get_func("roi_space")() - def stoploss_space(self) -> List["Dimension"]: + def stoploss_space(self) -> list["Dimension"]: return self._get_func("stoploss_space")() - def generate_trailing_params(self, params: Dict) -> Dict: + def generate_trailing_params(self, params: dict) -> dict: return self._get_func("generate_trailing_params")(params) - def trailing_space(self) -> List["Dimension"]: + def trailing_space(self) -> list["Dimension"]: return self._get_func("trailing_space")() - def max_open_trades_space(self) -> List["Dimension"]: + def max_open_trades_space(self) -> list["Dimension"]: return self._get_func("max_open_trades_space")() - def generate_estimator(self, dimensions: List["Dimension"], **kwargs) -> EstimatorType: + def generate_estimator(self, dimensions: list["Dimension"], **kwargs) -> EstimatorType: return self._get_func("generate_estimator")(dimensions=dimensions, **kwargs) diff --git a/freqtrade/optimize/hyperopt_epoch_filters.py b/freqtrade/optimize/hyperopt_epoch_filters.py index 0a88b9d65..5112b062d 100644 --- a/freqtrade/optimize/hyperopt_epoch_filters.py +++ b/freqtrade/optimize/hyperopt_epoch_filters.py @@ -1,5 +1,4 @@ import logging -from typing import List from freqtrade.exceptions import OperationalException @@ -7,7 +6,7 @@ from freqtrade.exceptions import OperationalException logger = logging.getLogger(__name__) -def hyperopt_filter_epochs(epochs: List, filteroptions: dict, log: bool = True) -> List: +def hyperopt_filter_epochs(epochs: list, filteroptions: dict, log: bool = True) -> list: """ Filter our items from the list of hyperopt results """ @@ -33,14 +32,14 @@ def hyperopt_filter_epochs(epochs: List, filteroptions: dict, log: bool = True) return epochs -def _hyperopt_filter_epochs_trade(epochs: List, trade_count: int): +def _hyperopt_filter_epochs_trade(epochs: list, trade_count: int): """ Filter epochs with trade-counts > trades """ return [x for x in epochs if x["results_metrics"].get("total_trades", 0) > trade_count] -def _hyperopt_filter_epochs_trade_count(epochs: List, filteroptions: dict) -> List: +def _hyperopt_filter_epochs_trade_count(epochs: list, filteroptions: dict) -> list: if filteroptions["filter_min_trades"] > 0: epochs = _hyperopt_filter_epochs_trade(epochs, filteroptions["filter_min_trades"]) @@ -53,7 +52,7 @@ def _hyperopt_filter_epochs_trade_count(epochs: List, filteroptions: dict) -> Li return epochs -def _hyperopt_filter_epochs_duration(epochs: List, filteroptions: dict) -> List: +def _hyperopt_filter_epochs_duration(epochs: list, filteroptions: dict) -> list: def get_duration_value(x): # Duration in minutes ... if "holding_avg_s" in x["results_metrics"]: @@ -74,7 +73,7 @@ def _hyperopt_filter_epochs_duration(epochs: List, filteroptions: dict) -> List: return epochs -def _hyperopt_filter_epochs_profit(epochs: List, filteroptions: dict) -> List: +def _hyperopt_filter_epochs_profit(epochs: list, filteroptions: dict) -> list: if filteroptions["filter_min_avg_profit"] is not None: epochs = _hyperopt_filter_epochs_trade(epochs, 0) epochs = [ @@ -110,7 +109,7 @@ def _hyperopt_filter_epochs_profit(epochs: List, filteroptions: dict) -> List: return epochs -def _hyperopt_filter_epochs_objective(epochs: List, filteroptions: dict) -> List: +def _hyperopt_filter_epochs_objective(epochs: list, filteroptions: dict) -> list: if filteroptions["filter_min_objective"] is not None: epochs = _hyperopt_filter_epochs_trade(epochs, 0) diff --git a/freqtrade/optimize/hyperopt_interface.py b/freqtrade/optimize/hyperopt_interface.py index 216e40753..2c0983b52 100644 --- a/freqtrade/optimize/hyperopt_interface.py +++ b/freqtrade/optimize/hyperopt_interface.py @@ -6,7 +6,7 @@ This module defines the interface to apply for hyperopt import logging import math from abc import ABC -from typing import Dict, List, Union +from typing import Union from sklearn.base import RegressorMixin from skopt.space import Categorical, Dimension, Integer @@ -41,7 +41,7 @@ class IHyperOpt(ABC): # Assign timeframe to be used in hyperopt IHyperOpt.timeframe = str(config["timeframe"]) - def generate_estimator(self, dimensions: List[Dimension], **kwargs) -> EstimatorType: + def generate_estimator(self, dimensions: list[Dimension], **kwargs) -> EstimatorType: """ Return base_estimator. Can be any of "GP", "RF", "ET", "GBRT" or an instance of a class @@ -49,7 +49,7 @@ class IHyperOpt(ABC): """ return "ET" - def generate_roi_table(self, params: Dict) -> Dict[int, float]: + def generate_roi_table(self, params: dict) -> dict[int, float]: """ Create a ROI table. @@ -64,7 +64,7 @@ class IHyperOpt(ABC): return roi_table - def roi_space(self) -> List[Dimension]: + def roi_space(self) -> list[Dimension]: """ Create a ROI space. @@ -146,7 +146,7 @@ class IHyperOpt(ABC): ), ] - def stoploss_space(self) -> List[Dimension]: + def stoploss_space(self) -> list[Dimension]: """ Create a stoploss space. @@ -157,7 +157,7 @@ class IHyperOpt(ABC): SKDecimal(-0.35, -0.02, decimals=3, name="stoploss"), ] - def generate_trailing_params(self, params: Dict) -> Dict: + def generate_trailing_params(self, params: dict) -> dict: """ Create dict with trailing stop parameters. """ @@ -170,7 +170,7 @@ class IHyperOpt(ABC): "trailing_only_offset_is_reached": params["trailing_only_offset_is_reached"], } - def trailing_space(self) -> List[Dimension]: + def trailing_space(self) -> list[Dimension]: """ Create a trailing stoploss space. @@ -194,7 +194,7 @@ class IHyperOpt(ABC): Categorical([True, False], name="trailing_only_offset_is_reached"), ] - def max_open_trades_space(self) -> List[Dimension]: + def max_open_trades_space(self) -> list[Dimension]: """ Create a max open trades space. diff --git a/freqtrade/optimize/hyperopt_loss_interface.py b/freqtrade/optimize/hyperopt_loss_interface.py index 39457b753..a48fee731 100644 --- a/freqtrade/optimize/hyperopt_loss_interface.py +++ b/freqtrade/optimize/hyperopt_loss_interface.py @@ -5,7 +5,7 @@ This module defines the interface for the loss-function for hyperopt from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Dict +from typing import Any from pandas import DataFrame @@ -29,8 +29,8 @@ class IHyperOptLoss(ABC): min_date: datetime, max_date: datetime, config: Config, - processed: Dict[str, DataFrame], - backtest_stats: Dict[str, Any], + processed: dict[str, DataFrame], + backtest_stats: dict[str, Any], **kwargs, ) -> float: """ diff --git a/freqtrade/optimize/hyperopt_output.py b/freqtrade/optimize/hyperopt_output.py index c83583d72..c30715046 100644 --- a/freqtrade/optimize/hyperopt_output.py +++ b/freqtrade/optimize/hyperopt_output.py @@ -1,6 +1,6 @@ import sys from os import get_terminal_size -from typing import Any, List, Optional +from typing import Any, Optional from rich.align import Align from rich.console import Console @@ -14,7 +14,7 @@ from freqtrade.util import fmt_coin class HyperoptOutput: def __init__(self, streaming=False) -> None: - self._results: List[Any] = [] + self._results: list[Any] = [] self._streaming = streaming self.__init_table() diff --git a/freqtrade/optimize/hyperopt_tools.py b/freqtrade/optimize/hyperopt_tools.py index 975338cd5..66e194510 100644 --- a/freqtrade/optimize/hyperopt_tools.py +++ b/freqtrade/optimize/hyperopt_tools.py @@ -1,8 +1,9 @@ import logging +from collections.abc import Iterator from copy import deepcopy from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, Iterator, List, Optional, Tuple +from typing import Any, Optional import numpy as np import rapidjson @@ -83,7 +84,7 @@ class HyperoptTools: ) @staticmethod - def load_params(filename: Path) -> Dict: + def load_params(filename: Path) -> dict: """ Load parameters from file """ @@ -92,7 +93,7 @@ class HyperoptTools: return params @staticmethod - def try_export_params(config: Config, strategy_name: str, params: Dict): + def try_export_params(config: Config, strategy_name: str, params: dict): if params.get(FTHYPT_FILEVERSION, 1) >= 2 and not config.get("disableparamexport", False): # Export parameters ... fn = HyperoptTools.get_strategy_filename(config, strategy_name) @@ -113,7 +114,7 @@ class HyperoptTools: return any(s in config["spaces"] for s in [space, "all", "default"]) @staticmethod - def _read_results(results_file: Path, batch_size: int = 10) -> Iterator[List[Any]]: + def _read_results(results_file: Path, batch_size: int = 10) -> Iterator[list[Any]]: """ Stream hyperopt results from file """ @@ -143,7 +144,7 @@ class HyperoptTools: return False @staticmethod - def load_filtered_results(results_file: Path, config: Config) -> Tuple[List, int]: + def load_filtered_results(results_file: Path, config: Config) -> tuple[list, int]: filteroptions = { "only_best": config.get("hyperopt_list_best", False), "only_profitable": config.get("hyperopt_list_profitable", False), @@ -204,7 +205,7 @@ class HyperoptTools: print(f"\n{header_str}:\n\n{explanation_str}\n") if print_json: - result_dict: Dict = {} + result_dict: dict = {} for s in [ "buy", "sell", @@ -256,7 +257,7 @@ class HyperoptTools: @staticmethod def _params_pretty_print( - params, space: str, header: str, non_optimized: Optional[Dict] = None + params, space: str, header: str, non_optimized: Optional[dict] = None ) -> None: if space in params or (non_optimized and space in non_optimized): space_params = HyperoptTools._space_params(params, space, 5) @@ -298,7 +299,7 @@ class HyperoptTools: print(result) @staticmethod - def _space_params(params, space: str, r: Optional[int] = None) -> Dict: + def _space_params(params, space: str, r: Optional[int] = None) -> dict: d = params.get(space) if d: # Round floats to `r` digits after the decimal point if requested @@ -328,7 +329,7 @@ class HyperoptTools: return bool(results["loss"] < current_best_loss) @staticmethod - def format_results_explanation_string(results_metrics: Dict, stake_currency: str) -> str: + def format_results_explanation_string(results_metrics: dict, stake_currency: str) -> str: """ Return the formatted results explanation in a string """ diff --git a/freqtrade/optimize/optimize_reports/bt_output.py b/freqtrade/optimize/optimize_reports/bt_output.py index d509b75fb..a1dd73358 100644 --- a/freqtrade/optimize/optimize_reports/bt_output.py +++ b/freqtrade/optimize/optimize_reports/bt_output.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Dict, List, Literal, Union +from typing import Any, Literal, Union from freqtrade.constants import UNLIMITED_STAKE_AMOUNT, Config from freqtrade.ft_types import BacktestResultType @@ -10,7 +10,7 @@ from freqtrade.util import decimals_per_coin, fmt_coin, print_rich_table logger = logging.getLogger(__name__) -def _get_line_floatfmt(stake_currency: str) -> List[str]: +def _get_line_floatfmt(stake_currency: str) -> list[str]: """ Generate floatformat (goes in line with _generate_result_line()) """ @@ -18,8 +18,8 @@ def _get_line_floatfmt(stake_currency: str) -> List[str]: def _get_line_header( - first_column: Union[str, List[str]], stake_currency: str, direction: str = "Trades" -) -> List[str]: + first_column: Union[str, list[str]], stake_currency: str, direction: str = "Trades" +) -> list[str]: """ Generate header lines (goes in line with _generate_result_line()) """ @@ -45,7 +45,7 @@ def generate_wins_draws_losses(wins, draws, losses): def text_table_bt_results( - pair_results: List[Dict[str, Any]], stake_currency: str, title: str + pair_results: list[dict[str, Any]], stake_currency: str, title: str ) -> None: """ Generates and returns a text table for the given backtest data and the results dataframe @@ -73,7 +73,7 @@ def text_table_bt_results( def text_table_tags( tag_type: Literal["enter_tag", "exit_tag", "mix_tag"], - tag_results: List[Dict[str, Any]], + tag_results: list[dict[str, Any]], stake_currency: str, ) -> None: """ @@ -123,7 +123,7 @@ def text_table_tags( def text_table_periodic_breakdown( - days_breakdown_stats: List[Dict[str, Any]], stake_currency: str, period: str + days_breakdown_stats: list[dict[str, Any]], stake_currency: str, period: str ) -> None: """ Generate small table with Backtest results by days @@ -191,7 +191,7 @@ def text_table_strategy(strategy_results, stake_currency: str, title: str): print_rich_table(output, headers, summary=title) -def text_table_add_metrics(strat_results: Dict) -> None: +def text_table_add_metrics(strat_results: dict) -> None: if len(strat_results["trades"]) > 0: best_trade = max(strat_results["trades"], key=lambda x: x["profit_ratio"]) worst_trade = min(strat_results["trades"], key=lambda x: x["profit_ratio"]) @@ -411,7 +411,7 @@ def text_table_add_metrics(strat_results: Dict) -> None: print(message) -def _show_tag_subresults(results: Dict[str, Any], stake_currency: str): +def _show_tag_subresults(results: dict[str, Any], stake_currency: str): """ Print tag subresults (enter_tag, exit_reason_summary, mix_tag_stats) """ @@ -426,7 +426,7 @@ def _show_tag_subresults(results: Dict[str, Any], stake_currency: str): def show_backtest_result( - strategy: str, results: Dict[str, Any], stake_currency: str, backtest_breakdown: List[str] + strategy: str, results: dict[str, Any], stake_currency: str, backtest_breakdown: list[str] ): """ Print results for one strategy diff --git a/freqtrade/optimize/optimize_reports/bt_storage.py b/freqtrade/optimize/optimize_reports/bt_storage.py index 9766bcd98..9633d9c18 100644 --- a/freqtrade/optimize/optimize_reports/bt_storage.py +++ b/freqtrade/optimize/optimize_reports/bt_storage.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import Dict, Optional +from typing import Optional from pandas import DataFrame @@ -70,7 +70,7 @@ def store_backtest_stats( def _store_backtest_analysis_data( - recordfilename: Path, data: Dict[str, Dict], dtappendix: str, name: str + recordfilename: Path, data: dict[str, dict], dtappendix: str, name: str ) -> Path: """ Stores backtest trade candles for analysis @@ -91,9 +91,9 @@ def _store_backtest_analysis_data( def store_backtest_analysis_results( recordfilename: Path, - candles: Dict[str, Dict], - trades: Dict[str, Dict], - exited: Dict[str, Dict], + candles: dict[str, dict], + trades: dict[str, dict], + exited: dict[str, dict], dtappendix: str, ) -> None: _store_backtest_analysis_data(recordfilename, candles, dtappendix, "signals") diff --git a/freqtrade/optimize/optimize_reports/optimize_reports.py b/freqtrade/optimize/optimize_reports/optimize_reports.py index 1168f0c2f..9650bdde6 100644 --- a/freqtrade/optimize/optimize_reports/optimize_reports.py +++ b/freqtrade/optimize/optimize_reports/optimize_reports.py @@ -1,7 +1,7 @@ import logging from copy import deepcopy from datetime import datetime, timedelta, timezone -from typing import Any, Dict, List, Literal, Tuple, Union +from typing import Any, Literal, Union import numpy as np from pandas import DataFrame, Series, concat, to_datetime @@ -25,8 +25,8 @@ logger = logging.getLogger(__name__) def generate_trade_signal_candles( - preprocessed_df: Dict[str, DataFrame], bt_results: Dict[str, Any], date_col: str -) -> Dict[str, DataFrame]: + preprocessed_df: dict[str, DataFrame], bt_results: dict[str, Any], date_col: str +) -> dict[str, DataFrame]: signal_candles_only = {} for pair in preprocessed_df.keys(): signal_candles_only_df = DataFrame() @@ -48,8 +48,8 @@ def generate_trade_signal_candles( def generate_rejected_signals( - preprocessed_df: Dict[str, DataFrame], rejected_dict: Dict[str, DataFrame] -) -> Dict[str, DataFrame]: + preprocessed_df: dict[str, DataFrame], rejected_dict: dict[str, DataFrame] +) -> dict[str, DataFrame]: rejected_candles_only = {} for pair, signals in rejected_dict.items(): rejected_signals_only_df = DataFrame() @@ -69,8 +69,8 @@ def generate_rejected_signals( def _generate_result_line( - result: DataFrame, starting_balance: int, first_column: Union[str, List[str]] -) -> Dict: + result: DataFrame, starting_balance: int, first_column: Union[str, list[str]] +) -> dict: """ Generate one result dict, with "first_column" as key. """ @@ -109,12 +109,12 @@ def _generate_result_line( def generate_pair_metrics( - pairlist: List[str], + pairlist: list[str], stake_currency: str, starting_balance: int, results: DataFrame, skip_nan: bool = False, -) -> List[Dict]: +) -> list[dict]: """ Generates and returns a list for the given backtest data and the results dataframe :param pairlist: Pairlist used @@ -143,11 +143,11 @@ def generate_pair_metrics( def generate_tag_metrics( - tag_type: Union[Literal["enter_tag", "exit_reason"], List[Literal["enter_tag", "exit_reason"]]], + tag_type: Union[Literal["enter_tag", "exit_reason"], list[Literal["enter_tag", "exit_reason"]]], starting_balance: int, results: DataFrame, skip_nan: bool = False, -) -> List[Dict]: +) -> list[dict]: """ Generates and returns a list of metrics for the given tag trades and the results dataframe :param starting_balance: Starting balance @@ -177,7 +177,7 @@ def generate_tag_metrics( return [] -def generate_strategy_comparison(bt_stats: Dict) -> List[Dict]: +def generate_strategy_comparison(bt_stats: dict) -> list[dict]: """ Generate summary per strategy :param bt_stats: Dict of containing results for all strategies @@ -208,8 +208,8 @@ def _get_resample_from_period(period: str) -> str: def generate_periodic_breakdown_stats( - trade_list: Union[List, DataFrame], period: str -) -> List[Dict[str, Any]]: + trade_list: Union[list, DataFrame], period: str +) -> list[dict[str, Any]]: results = trade_list if not isinstance(trade_list, list) else DataFrame.from_records(trade_list) if len(results) == 0: return [] @@ -237,14 +237,14 @@ def generate_periodic_breakdown_stats( return stats -def generate_all_periodic_breakdown_stats(trade_list: List) -> Dict[str, List]: +def generate_all_periodic_breakdown_stats(trade_list: list) -> dict[str, list]: result = {} for period in BACKTEST_BREAKDOWNS: result[period] = generate_periodic_breakdown_stats(trade_list, period) return result -def calc_streak(dataframe: DataFrame) -> Tuple[int, int]: +def calc_streak(dataframe: DataFrame) -> tuple[int, int]: """ Calculate consecutive win and loss streaks :param dataframe: Dataframe containing the trades dataframe, with profit_ratio column @@ -261,7 +261,7 @@ def calc_streak(dataframe: DataFrame) -> Tuple[int, int]: return cons_wins, cons_losses -def generate_trading_stats(results: DataFrame) -> Dict[str, Any]: +def generate_trading_stats(results: DataFrame) -> dict[str, Any]: """Generate overall trade statistics""" if len(results) == 0: return { @@ -313,7 +313,7 @@ def generate_trading_stats(results: DataFrame) -> Dict[str, Any]: } -def generate_daily_stats(results: DataFrame) -> Dict[str, Any]: +def generate_daily_stats(results: DataFrame) -> dict[str, Any]: """Generate daily statistics""" if len(results) == 0: return { @@ -350,14 +350,14 @@ def generate_daily_stats(results: DataFrame) -> Dict[str, Any]: def generate_strategy_stats( - pairlist: List[str], + pairlist: list[str], strategy: str, - content: Dict[str, Any], + content: dict[str, Any], min_date: datetime, max_date: datetime, market_change: float, is_hyperopt: bool = False, -) -> Dict[str, Any]: +) -> dict[str, Any]: """ :param pairlist: List of pairs to backtest :param strategy: Strategy name @@ -368,7 +368,7 @@ def generate_strategy_stats( :param market_change: float indicating the market change :return: Dictionary containing results per strategy and a strategy summary. """ - results: Dict[str, DataFrame] = content["results"] + results: dict[str, DataFrame] = content["results"] if not isinstance(results, DataFrame): return {} config = content["config"] @@ -558,8 +558,8 @@ def generate_strategy_stats( def generate_backtest_stats( - btdata: Dict[str, DataFrame], - all_results: Dict[str, Dict[str, Union[DataFrame, Dict]]], + btdata: dict[str, DataFrame], + all_results: dict[str, dict[str, Union[DataFrame, dict]]], min_date: datetime, max_date: datetime, ) -> BacktestResultType: