chore: update optimize to modern typing syntax

This commit is contained in:
Matthias
2024-10-04 06:53:50 +02:00
parent c5ed876c09
commit 2e69e38adb
16 changed files with 172 additions and 172 deletions

View File

@@ -3,7 +3,7 @@ import shutil
from copy import deepcopy
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List
from typing import Any
from pandas import DataFrame
@@ -25,16 +25,16 @@ class Analysis:
self.total_signals = 0
self.false_entry_signals = 0
self.false_exit_signals = 0
self.false_indicators: List[str] = []
self.false_indicators: list[str] = []
self.has_bias = False
class LookaheadAnalysis(BaseAnalysis):
def __init__(self, config: Dict[str, Any], strategy_obj: Dict):
def __init__(self, config: dict[str, Any], strategy_obj: dict):
super().__init__(config, strategy_obj)
self.entry_varHolders: List[VarHolder] = []
self.exit_varHolders: List[VarHolder] = []
self.entry_varHolders: list[VarHolder] = []
self.exit_varHolders: list[VarHolder] = []
self.current_analysis = Analysis()
self.minimum_trade_amount = config["minimum_trade_amount"]
@@ -99,7 +99,7 @@ class LookaheadAnalysis(BaseAnalysis):
f"{str(self_value)} != {str(other_value)}"
)
def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]):
def prepare_data(self, varholder: VarHolder, pairs_to_load: list[DataFrame]):
if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]:
# purge previous data if the freqai model is defined
# (to be sure nothing is carried over from older backtests)

View File

@@ -1,7 +1,7 @@
import logging
import time
from pathlib import Path
from typing import Any, Dict, List, Union
from typing import Any, Union
import pandas as pd
from rich.text import Text
@@ -19,8 +19,8 @@ logger = logging.getLogger(__name__)
class LookaheadAnalysisSubFunctions:
@staticmethod
def text_table_lookahead_analysis_instances(
config: Dict[str, Any],
lookahead_instances: List[LookaheadAnalysis],
config: dict[str, Any],
lookahead_instances: list[LookaheadAnalysis],
caption: Union[str, None] = None,
):
headers = [
@@ -73,7 +73,7 @@ class LookaheadAnalysisSubFunctions:
return data
@staticmethod
def export_to_csv(config: Dict[str, Any], lookahead_analysis: List[LookaheadAnalysis]):
def export_to_csv(config: dict[str, Any], lookahead_analysis: list[LookaheadAnalysis]):
def add_or_update_row(df, row_data):
if (
(df["filename"] == row_data["filename"]) & (df["strategy"] == row_data["strategy"])
@@ -198,7 +198,7 @@ class LookaheadAnalysisSubFunctions:
return config
@staticmethod
def initialize_single_lookahead_analysis(config: Config, strategy_obj: Dict[str, Any]):
def initialize_single_lookahead_analysis(config: Config, strategy_obj: dict[str, Any]):
logger.info(f"Bias test of {Path(strategy_obj['location']).name} started.")
start = time.perf_counter()
current_instance = LookaheadAnalysis(config, strategy_obj)

View File

@@ -3,7 +3,7 @@ import shutil
from copy import deepcopy
from datetime import timedelta
from pathlib import Path
from typing import Any, Dict, List
from typing import Any
from pandas import DataFrame
@@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
class RecursiveAnalysis(BaseAnalysis):
def __init__(self, config: Dict[str, Any], strategy_obj: Dict):
def __init__(self, config: dict[str, Any], strategy_obj: dict):
self._startup_candle = list(
map(int, config.get("startup_candle", [199, 399, 499, 999, 1999]))
)
@@ -35,10 +35,10 @@ class RecursiveAnalysis(BaseAnalysis):
self._startup_candle.append(self._strat_scc)
self._startup_candle.sort()
self.partial_varHolder_array: List[VarHolder] = []
self.partial_varHolder_lookahead_array: List[VarHolder] = []
self.partial_varHolder_array: list[VarHolder] = []
self.partial_varHolder_lookahead_array: list[VarHolder] = []
self.dict_recursive: Dict[str, Any] = dict()
self.dict_recursive: dict[str, Any] = dict()
# For recursive bias check
# analyzes two data frames with processed indicators and shows differences between them.
@@ -114,7 +114,7 @@ class RecursiveAnalysis(BaseAnalysis):
else:
logger.info("No lookahead bias on indicators found.")
def prepare_data(self, varholder: VarHolder, pairs_to_load: List[DataFrame]):
def prepare_data(self, varholder: VarHolder, pairs_to_load: list[DataFrame]):
if "freqai" in self.local_config and "identifier" in self.local_config["freqai"]:
# purge previous data if the freqai model is defined
# (to be sure nothing is carried over from older backtests)

View File

@@ -1,7 +1,7 @@
import logging
import time
from pathlib import Path
from typing import Any, Dict, List
from typing import Any
from freqtrade.constants import Config
from freqtrade.exceptions import OperationalException
@@ -15,7 +15,7 @@ logger = logging.getLogger(__name__)
class RecursiveAnalysisSubFunctions:
@staticmethod
def text_table_recursive_analysis_instances(recursive_instances: List[RecursiveAnalysis]):
def text_table_recursive_analysis_instances(recursive_instances: list[RecursiveAnalysis]):
startups = recursive_instances[0]._startup_candle
strat_scc = recursive_instances[0]._strat_scc
headers = ["Indicators"]
@@ -63,7 +63,7 @@ class RecursiveAnalysisSubFunctions:
return config
@staticmethod
def initialize_single_recursive_analysis(config: Config, strategy_obj: Dict[str, Any]):
def initialize_single_recursive_analysis(config: Config, strategy_obj: dict[str, Any]):
logger.info(f"Recursive test of {Path(strategy_obj['location']).name} started.")
start = time.perf_counter()
current_instance = RecursiveAnalysis(config, strategy_obj)

View File

@@ -8,7 +8,7 @@ import logging
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Optional
from numpy import nan
from pandas import DataFrame
@@ -118,13 +118,13 @@ class Backtesting:
self.order_id_counter: int = 0
config["dry_run"] = True
self.run_ids: Dict[str, str] = {}
self.strategylist: List[IStrategy] = []
self.all_results: Dict[str, Dict] = {}
self.processed_dfs: Dict[str, Dict] = {}
self.rejected_dict: Dict[str, List] = {}
self.rejected_df: Dict[str, Dict] = {}
self.exited_dfs: Dict[str, Dict] = {}
self.run_ids: dict[str, str] = {}
self.strategylist: list[IStrategy] = []
self.all_results: dict[str, dict] = {}
self.processed_dfs: dict[str, dict] = {}
self.rejected_dict: dict[str, list] = {}
self.rejected_df: dict[str, dict] = {}
self.exited_dfs: dict[str, dict] = {}
self._exchange_name = self.config["exchange"]["name"]
if not exchange:
@@ -246,8 +246,8 @@ class Backtesting:
else:
self.timeframe_detail_td = timedelta(seconds=0)
self.detail_data: Dict[str, DataFrame] = {}
self.futures_data: Dict[str, DataFrame] = {}
self.detail_data: dict[str, DataFrame] = {}
self.futures_data: dict[str, DataFrame] = {}
def init_backtest(self):
self.prepare_backtest(False)
@@ -278,7 +278,7 @@ class Backtesting:
if self.config.get("enable_protections", False):
self.protections = ProtectionManager(self.config, strategy.protections)
def load_bt_data(self) -> Tuple[Dict[str, DataFrame], TimeRange]:
def load_bt_data(self) -> tuple[dict[str, DataFrame], TimeRange]:
"""
Loads backtest data and returns the data combined with the timerange
as tuple.
@@ -408,7 +408,7 @@ class Backtesting:
self.abort = False
raise DependencyException("Stop requested")
def _get_ohlcv_as_lists(self, processed: Dict[str, DataFrame]) -> Dict[str, Tuple]:
def _get_ohlcv_as_lists(self, processed: dict[str, DataFrame]) -> dict[str, tuple]:
"""
Helper function to convert a processed dataframes into lists for performance reasons.
@@ -418,7 +418,7 @@ class Backtesting:
optimize memory usage!
"""
data: Dict = {}
data: dict = {}
self.progress.init_step(BacktestState.CONVERT, len(processed))
# Create dict with data
@@ -466,7 +466,7 @@ class Backtesting:
return data
def _get_close_rate(
self, row: Tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int
self, row: tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int
) -> float:
"""
Get close rate for backtesting result
@@ -484,7 +484,7 @@ class Backtesting:
return row[OPEN_IDX]
def _get_close_rate_for_stoploss(
self, row: Tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int
self, row: tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int
) -> float:
# our stoploss was already lower than candle high,
# possibly due to a cancelled trade exit.
@@ -538,7 +538,7 @@ class Backtesting:
return stoploss_value
def _get_close_rate_for_roi(
self, row: Tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int
self, row: tuple, trade: LocalTrade, exit_: ExitCheckTuple, trade_dur: int
) -> float:
is_short = trade.is_short or False
leverage = trade.leverage or 1.0
@@ -601,7 +601,7 @@ class Backtesting:
return row[OPEN_IDX]
def _get_adjust_trade_entry_for_candle(
self, trade: LocalTrade, row: Tuple, current_time: datetime
self, trade: LocalTrade, row: tuple, current_time: datetime
) -> LocalTrade:
current_rate: float = row[OPEN_IDX]
current_profit = trade.calc_profit_ratio(current_rate)
@@ -669,7 +669,7 @@ class Backtesting:
return trade
def _get_order_filled(self, rate: float, row: Tuple) -> bool:
def _get_order_filled(self, rate: float, row: tuple) -> bool:
"""Rate is within candle, therefore filled"""
return row[LOW_IDX] <= rate <= row[HIGH_IDX]
@@ -685,7 +685,7 @@ class Backtesting:
)
def _try_close_open_order(
self, order: Optional[Order], trade: LocalTrade, current_date: datetime, row: Tuple
self, order: Optional[Order], trade: LocalTrade, current_date: datetime, row: tuple
) -> bool:
"""
Check if an order is open and if it should've filled.
@@ -719,7 +719,7 @@ class Backtesting:
return False
def _process_exit_order(
self, order: Order, trade: LocalTrade, current_time: datetime, row: Tuple, pair: str
self, order: Order, trade: LocalTrade, current_time: datetime, row: tuple, pair: str
):
"""
Takes an exit order and processes it, potentially closing the trade.
@@ -740,7 +740,7 @@ class Backtesting:
def _get_exit_for_signal(
self,
trade: LocalTrade,
row: Tuple,
row: tuple,
exit_: ExitCheckTuple,
current_time: datetime,
amount: Optional[float] = None,
@@ -820,7 +820,7 @@ class Backtesting:
def _exit_trade(
self,
trade: LocalTrade,
sell_row: Tuple,
sell_row: tuple,
close_rate: float,
amount: float,
exit_reason: Optional[str],
@@ -859,7 +859,7 @@ class Backtesting:
return trade
def _check_trade_exit(
self, trade: LocalTrade, row: Tuple, current_time: datetime
self, trade: LocalTrade, row: tuple, current_time: datetime
) -> Optional[LocalTrade]:
self._run_funding_fees(trade, current_time)
@@ -905,7 +905,7 @@ class Backtesting:
def get_valid_price_and_stake(
self,
pair: str,
row: Tuple,
row: tuple,
propose_rate: float,
stake_amount: float,
direction: LongShort,
@@ -914,7 +914,7 @@ class Backtesting:
trade: Optional[LocalTrade],
order_type: str,
price_precision: Optional[float],
) -> Tuple[float, float, float, float]:
) -> tuple[float, float, float, float]:
if order_type == "limit":
new_rate = strategy_safe_wrapper(
self.strategy.custom_entry_price, default_retval=propose_rate
@@ -1003,7 +1003,7 @@ class Backtesting:
def _enter_trade(
self,
pair: str,
row: Tuple,
row: tuple,
direction: LongShort,
stake_amount: Optional[float] = None,
trade: Optional[LocalTrade] = None,
@@ -1150,7 +1150,7 @@ class Backtesting:
return trade
def handle_left_open(
self, open_trades: Dict[str, List[LocalTrade]], data: Dict[str, List[Tuple]]
self, open_trades: dict[str, list[LocalTrade]], data: dict[str, list[tuple]]
) -> None:
"""
Handling of left open trades at the end of backtesting
@@ -1197,7 +1197,7 @@ class Backtesting:
self.protections.stop_per_pair(pair, current_time, side)
self.protections.global_stop(current_time, side)
def manage_open_orders(self, trade: LocalTrade, current_time: datetime, row: Tuple) -> bool:
def manage_open_orders(self, trade: LocalTrade, current_time: datetime, row: tuple) -> bool:
"""
Check if any open order needs to be cancelled or replaced.
Returns True if the trade should be deleted.
@@ -1246,7 +1246,7 @@ class Backtesting:
return None
def check_order_replace(
self, trade: LocalTrade, order: Order, current_time, row: Tuple
self, trade: LocalTrade, order: Order, current_time, row: tuple
) -> bool:
"""
Check if current analyzed entry order has to be replaced and do so.
@@ -1297,8 +1297,8 @@ class Backtesting:
return False
def validate_row(
self, data: Dict, pair: str, row_index: int, current_time: datetime
) -> Optional[Tuple]:
self, data: dict, pair: str, row_index: int, current_time: datetime
) -> Optional[tuple]:
try:
# Row is treated as "current incomplete candle".
# entry / exit signals are shifted by 1 to compensate for this.
@@ -1329,7 +1329,7 @@ class Backtesting:
def backtest_loop(
self,
row: Tuple,
row: tuple,
pair: str,
current_time: datetime,
end_date: datetime,
@@ -1383,7 +1383,7 @@ class Backtesting:
self._process_exit_order(order, trade, current_time, row, pair)
def time_pair_generator(
self, start_date: datetime, end_date: datetime, increment: timedelta, pairs: List[str]
self, start_date: datetime, end_date: datetime, increment: timedelta, pairs: list[str]
):
"""
Backtest time and pair generator
@@ -1404,7 +1404,7 @@ class Backtesting:
self.progress.increment()
current_time += increment
def backtest(self, processed: Dict, start_date: datetime, end_date: datetime) -> Dict[str, Any]:
def backtest(self, processed: dict, start_date: datetime, end_date: datetime) -> dict[str, Any]:
"""
Implement backtesting functionality
@@ -1423,10 +1423,10 @@ class Backtesting:
self.wallets.update()
# Use dict of lists with data for performance
# (looping lists is a lot faster than pandas DataFrames)
data: Dict = self._get_ohlcv_as_lists(processed)
data: dict = self._get_ohlcv_as_lists(processed)
# Indexes per pair, so some pairs are allowed to have a missing start.
indexes: Dict = defaultdict(int)
indexes: dict = defaultdict(int)
# Loop timerange and get candle for each pair at that point in time
for current_time, pair, is_first in self.time_pair_generator(
@@ -1510,7 +1510,7 @@ class Backtesting:
}
def backtest_one_strategy(
self, strat: IStrategy, data: Dict[str, DataFrame], timerange: TimeRange
self, strat: IStrategy, data: dict[str, DataFrame], timerange: TimeRange
):
self.progress.init_step(BacktestState.ANALYZE, 0)
strategy_name = strat.get_strategy_name()
@@ -1605,7 +1605,7 @@ class Backtesting:
"""
Run backtesting end-to-end
"""
data: Dict[str, DataFrame] = {}
data: dict[str, DataFrame] = {}
data, timerange = self.load_bt_data()
self.load_bt_data_detail()

View File

@@ -1,7 +1,7 @@
import logging
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from typing import Any, Optional
from pandas import DataFrame
@@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class VarHolder:
timerange: TimeRange
data: DataFrame
indicators: Dict[str, DataFrame]
indicators: dict[str, DataFrame]
result: DataFrame
compared: DataFrame
from_dt: datetime
@@ -25,7 +25,7 @@ class VarHolder:
class BaseAnalysis:
def __init__(self, config: Dict[str, Any], strategy_obj: Dict):
def __init__(self, config: dict[str, Any], strategy_obj: dict):
self.failed_bias_check = True
self.full_varHolder = VarHolder()
self.exchange: Optional[Any] = None

View File

@@ -11,7 +11,7 @@ import warnings
from datetime import datetime, timezone
from math import ceil
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Optional
import rapidjson
from joblib import Parallel, cpu_count, delayed, dump, load, wrap_non_picklable_objects
@@ -70,14 +70,14 @@ class Hyperopt:
"""
def __init__(self, config: Config) -> None:
self.buy_space: List[Dimension] = []
self.sell_space: List[Dimension] = []
self.protection_space: List[Dimension] = []
self.roi_space: List[Dimension] = []
self.stoploss_space: List[Dimension] = []
self.trailing_space: List[Dimension] = []
self.max_open_trades_space: List[Dimension] = []
self.dimensions: List[Dimension] = []
self.buy_space: list[Dimension] = []
self.sell_space: list[Dimension] = []
self.protection_space: list[Dimension] = []
self.roi_space: list[Dimension] = []
self.stoploss_space: list[Dimension] = []
self.trailing_space: list[Dimension] = []
self.max_open_trades_space: list[Dimension] = []
self.dimensions: list[Dimension] = []
self._hyper_out: HyperoptOutput = HyperoptOutput(streaming=True)
@@ -125,7 +125,7 @@ class Hyperopt:
self.market_change = 0.0
self.num_epochs_saved = 0
self.current_best_epoch: Optional[Dict[str, Any]] = None
self.current_best_epoch: Optional[dict[str, Any]] = None
# Use max_open_trades for hyperopt as well, except --disable-max-market-positions is set
if not self.config.get("use_max_market_positions", True):
@@ -168,8 +168,8 @@ class Hyperopt:
self.hyperopt_pickle_magic(modules.__bases__)
def _get_params_dict(
self, dimensions: List[Dimension], raw_params: List[Any]
) -> Dict[str, Any]:
self, dimensions: list[Dimension], raw_params: list[Any]
) -> dict[str, Any]:
# Ensure the number of dimensions match
# the number of parameters in the list.
if len(raw_params) != len(dimensions):
@@ -179,7 +179,7 @@ class Hyperopt:
# and the values are taken from the list of parameters.
return {d.name: v for d, v in zip(dimensions, raw_params)}
def _save_result(self, epoch: Dict) -> None:
def _save_result(self, epoch: dict) -> None:
"""
Save hyperopt results to file
Store one line per epoch.
@@ -205,11 +205,11 @@ class Hyperopt:
latest_filename = Path.joinpath(self.results_file.parent, LAST_BT_RESULT_FN)
file_dump_json(latest_filename, {"latest_hyperopt": str(self.results_file.name)}, log=False)
def _get_params_details(self, params: Dict) -> Dict:
def _get_params_details(self, params: dict) -> dict:
"""
Return the params for each space
"""
result: Dict = {}
result: dict = {}
if HyperoptTools.has_space(self.config, "buy"):
result["buy"] = {p.name: params.get(p.name) for p in self.buy_space}
@@ -236,11 +236,11 @@ class Hyperopt:
return result
def _get_no_optimize_details(self) -> Dict[str, Any]:
def _get_no_optimize_details(self) -> dict[str, Any]:
"""
Get non-optimized parameters
"""
result: Dict[str, Any] = {}
result: dict[str, Any] = {}
strategy = self.backtesting.strategy
if not HyperoptTools.has_space(self.config, "roi"):
result["roi"] = {str(k): v for k, v in strategy.minimal_roi.items()}
@@ -257,7 +257,7 @@ class Hyperopt:
result["max_open_trades"] = {"max_open_trades": strategy.max_open_trades}
return result
def print_results(self, results: Dict[str, Any]) -> None:
def print_results(self, results: dict[str, Any]) -> None:
"""
Log results if it is better than any previous evaluation
TODO: this should be moved to HyperoptTools too
@@ -318,7 +318,7 @@ class Hyperopt:
+ self.max_open_trades_space
)
def assign_params(self, params_dict: Dict[str, Any], category: str) -> None:
def assign_params(self, params_dict: dict[str, Any], category: str) -> None:
"""
Assign hyperoptable parameters
"""
@@ -327,7 +327,7 @@ class Hyperopt:
# noinspection PyProtectedMember
attr.value = params_dict[attr_name]
def generate_optimizer(self, raw_params: List[Any]) -> Dict[str, Any]:
def generate_optimizer(self, raw_params: list[Any]) -> dict[str, Any]:
"""
Used Optimize function.
Called once per epoch to optimize whatever is configured.
@@ -406,12 +406,12 @@ class Hyperopt:
def _get_results_dict(
self,
backtesting_results: Dict[str, Any],
backtesting_results: dict[str, Any],
min_date: datetime,
max_date: datetime,
params_dict: Dict[str, Any],
processed: Dict[str, DataFrame],
) -> Dict[str, Any]:
params_dict: dict[str, Any],
processed: dict[str, DataFrame],
) -> dict[str, Any]:
params_details = self._get_params_details(params_dict)
strat_stats = generate_strategy_stats(
@@ -458,7 +458,7 @@ class Hyperopt:
"total_profit": total_profit,
}
def get_optimizer(self, dimensions: List[Dimension], cpu_count) -> Optimizer:
def get_optimizer(self, dimensions: list[Dimension], cpu_count) -> Optimizer:
estimator = self.custom_hyperopt.generate_estimator(dimensions=dimensions)
acq_optimizer = "sampling"
@@ -479,7 +479,7 @@ class Hyperopt:
model_queue_size=SKOPT_MODEL_QUEUE_SIZE,
)
def run_optimizer_parallel(self, parallel: Parallel, asked: List[List]) -> List[Dict[str, Any]]:
def run_optimizer_parallel(self, parallel: Parallel, asked: list[list]) -> list[dict[str, Any]]:
"""Start optimizer in a parallel way"""
return parallel(
delayed(wrap_non_picklable_objects(self.generate_optimizer))(v) for v in asked
@@ -488,7 +488,7 @@ class Hyperopt:
def _set_random_state(self, random_state: Optional[int]) -> int:
return random_state or random.randint(1, 2**16 - 1) # noqa: S311
def advise_and_trim(self, data: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
def advise_and_trim(self, data: dict[str, DataFrame]) -> dict[str, DataFrame]:
preprocessed = self.backtesting.strategy.advise_all_indicators(data)
# Trim startup period from analyzed dataframe to get correct dates for output.
@@ -524,7 +524,7 @@ class Hyperopt:
else:
dump(data, self.data_pickle_file)
def get_asked_points(self, n_points: int) -> Tuple[List[List[Any]], List[bool]]:
def get_asked_points(self, n_points: int) -> tuple[list[list[Any]], list[bool]]:
"""
Enforce points returned from `self.opt.ask` have not been already evaluated
@@ -545,8 +545,8 @@ class Hyperopt:
return new_list
i = 0
asked_non_tried: List[List[Any]] = []
is_random_non_tried: List[bool] = []
asked_non_tried: list[list[Any]] = []
is_random_non_tried: list[bool] = []
while i < 5 and len(asked_non_tried) < n_points:
if i < 3:
self.opt.cache_ = {}
@@ -573,7 +573,7 @@ class Hyperopt:
else:
return self.opt.ask(n_points=n_points), [False for _ in range(n_points)]
def evaluate_result(self, val: Dict[str, Any], current: int, is_random: bool):
def evaluate_result(self, val: dict[str, Any], current: int, is_random: bool):
"""
Evaluate results returned from generate_optimizer
"""

View File

@@ -6,7 +6,7 @@ This module implements a convenience auto-hyperopt class, which can be used toge
import logging
from contextlib import suppress
from typing import Callable, Dict, List
from typing import Callable
from freqtrade.exceptions import OperationalException
@@ -59,7 +59,7 @@ class HyperOptAuto(IHyperOpt):
if attr.optimize:
yield attr.get_space(attr_name)
def _get_indicator_space(self, category) -> List:
def _get_indicator_space(self, category) -> list:
# TODO: is this necessary, or can we call "generate_space" directly?
indicator_space = list(self._generate_indicator_space(category))
if len(indicator_space) > 0:
@@ -70,32 +70,32 @@ class HyperOptAuto(IHyperOpt):
)
return []
def buy_indicator_space(self) -> List["Dimension"]:
def buy_indicator_space(self) -> list["Dimension"]:
return self._get_indicator_space("buy")
def sell_indicator_space(self) -> List["Dimension"]:
def sell_indicator_space(self) -> list["Dimension"]:
return self._get_indicator_space("sell")
def protection_space(self) -> List["Dimension"]:
def protection_space(self) -> list["Dimension"]:
return self._get_indicator_space("protection")
def generate_roi_table(self, params: Dict) -> Dict[int, float]:
def generate_roi_table(self, params: dict) -> dict[int, float]:
return self._get_func("generate_roi_table")(params)
def roi_space(self) -> List["Dimension"]:
def roi_space(self) -> list["Dimension"]:
return self._get_func("roi_space")()
def stoploss_space(self) -> List["Dimension"]:
def stoploss_space(self) -> list["Dimension"]:
return self._get_func("stoploss_space")()
def generate_trailing_params(self, params: Dict) -> Dict:
def generate_trailing_params(self, params: dict) -> dict:
return self._get_func("generate_trailing_params")(params)
def trailing_space(self) -> List["Dimension"]:
def trailing_space(self) -> list["Dimension"]:
return self._get_func("trailing_space")()
def max_open_trades_space(self) -> List["Dimension"]:
def max_open_trades_space(self) -> list["Dimension"]:
return self._get_func("max_open_trades_space")()
def generate_estimator(self, dimensions: List["Dimension"], **kwargs) -> EstimatorType:
def generate_estimator(self, dimensions: list["Dimension"], **kwargs) -> EstimatorType:
return self._get_func("generate_estimator")(dimensions=dimensions, **kwargs)

View File

@@ -1,5 +1,4 @@
import logging
from typing import List
from freqtrade.exceptions import OperationalException
@@ -7,7 +6,7 @@ from freqtrade.exceptions import OperationalException
logger = logging.getLogger(__name__)
def hyperopt_filter_epochs(epochs: List, filteroptions: dict, log: bool = True) -> List:
def hyperopt_filter_epochs(epochs: list, filteroptions: dict, log: bool = True) -> list:
"""
Filter our items from the list of hyperopt results
"""
@@ -33,14 +32,14 @@ def hyperopt_filter_epochs(epochs: List, filteroptions: dict, log: bool = True)
return epochs
def _hyperopt_filter_epochs_trade(epochs: List, trade_count: int):
def _hyperopt_filter_epochs_trade(epochs: list, trade_count: int):
"""
Filter epochs with trade-counts > trades
"""
return [x for x in epochs if x["results_metrics"].get("total_trades", 0) > trade_count]
def _hyperopt_filter_epochs_trade_count(epochs: List, filteroptions: dict) -> List:
def _hyperopt_filter_epochs_trade_count(epochs: list, filteroptions: dict) -> list:
if filteroptions["filter_min_trades"] > 0:
epochs = _hyperopt_filter_epochs_trade(epochs, filteroptions["filter_min_trades"])
@@ -53,7 +52,7 @@ def _hyperopt_filter_epochs_trade_count(epochs: List, filteroptions: dict) -> Li
return epochs
def _hyperopt_filter_epochs_duration(epochs: List, filteroptions: dict) -> List:
def _hyperopt_filter_epochs_duration(epochs: list, filteroptions: dict) -> list:
def get_duration_value(x):
# Duration in minutes ...
if "holding_avg_s" in x["results_metrics"]:
@@ -74,7 +73,7 @@ def _hyperopt_filter_epochs_duration(epochs: List, filteroptions: dict) -> List:
return epochs
def _hyperopt_filter_epochs_profit(epochs: List, filteroptions: dict) -> List:
def _hyperopt_filter_epochs_profit(epochs: list, filteroptions: dict) -> list:
if filteroptions["filter_min_avg_profit"] is not None:
epochs = _hyperopt_filter_epochs_trade(epochs, 0)
epochs = [
@@ -110,7 +109,7 @@ def _hyperopt_filter_epochs_profit(epochs: List, filteroptions: dict) -> List:
return epochs
def _hyperopt_filter_epochs_objective(epochs: List, filteroptions: dict) -> List:
def _hyperopt_filter_epochs_objective(epochs: list, filteroptions: dict) -> list:
if filteroptions["filter_min_objective"] is not None:
epochs = _hyperopt_filter_epochs_trade(epochs, 0)

View File

@@ -6,7 +6,7 @@ This module defines the interface to apply for hyperopt
import logging
import math
from abc import ABC
from typing import Dict, List, Union
from typing import Union
from sklearn.base import RegressorMixin
from skopt.space import Categorical, Dimension, Integer
@@ -41,7 +41,7 @@ class IHyperOpt(ABC):
# Assign timeframe to be used in hyperopt
IHyperOpt.timeframe = str(config["timeframe"])
def generate_estimator(self, dimensions: List[Dimension], **kwargs) -> EstimatorType:
def generate_estimator(self, dimensions: list[Dimension], **kwargs) -> EstimatorType:
"""
Return base_estimator.
Can be any of "GP", "RF", "ET", "GBRT" or an instance of a class
@@ -49,7 +49,7 @@ class IHyperOpt(ABC):
"""
return "ET"
def generate_roi_table(self, params: Dict) -> Dict[int, float]:
def generate_roi_table(self, params: dict) -> dict[int, float]:
"""
Create a ROI table.
@@ -64,7 +64,7 @@ class IHyperOpt(ABC):
return roi_table
def roi_space(self) -> List[Dimension]:
def roi_space(self) -> list[Dimension]:
"""
Create a ROI space.
@@ -146,7 +146,7 @@ class IHyperOpt(ABC):
),
]
def stoploss_space(self) -> List[Dimension]:
def stoploss_space(self) -> list[Dimension]:
"""
Create a stoploss space.
@@ -157,7 +157,7 @@ class IHyperOpt(ABC):
SKDecimal(-0.35, -0.02, decimals=3, name="stoploss"),
]
def generate_trailing_params(self, params: Dict) -> Dict:
def generate_trailing_params(self, params: dict) -> dict:
"""
Create dict with trailing stop parameters.
"""
@@ -170,7 +170,7 @@ class IHyperOpt(ABC):
"trailing_only_offset_is_reached": params["trailing_only_offset_is_reached"],
}
def trailing_space(self) -> List[Dimension]:
def trailing_space(self) -> list[Dimension]:
"""
Create a trailing stoploss space.
@@ -194,7 +194,7 @@ class IHyperOpt(ABC):
Categorical([True, False], name="trailing_only_offset_is_reached"),
]
def max_open_trades_space(self) -> List[Dimension]:
def max_open_trades_space(self) -> list[Dimension]:
"""
Create a max open trades space.

View File

@@ -5,7 +5,7 @@ This module defines the interface for the loss-function for hyperopt
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict
from typing import Any
from pandas import DataFrame
@@ -29,8 +29,8 @@ class IHyperOptLoss(ABC):
min_date: datetime,
max_date: datetime,
config: Config,
processed: Dict[str, DataFrame],
backtest_stats: Dict[str, Any],
processed: dict[str, DataFrame],
backtest_stats: dict[str, Any],
**kwargs,
) -> float:
"""

View File

@@ -1,6 +1,6 @@
import sys
from os import get_terminal_size
from typing import Any, List, Optional
from typing import Any, Optional
from rich.align import Align
from rich.console import Console
@@ -14,7 +14,7 @@ from freqtrade.util import fmt_coin
class HyperoptOutput:
def __init__(self, streaming=False) -> None:
self._results: List[Any] = []
self._results: list[Any] = []
self._streaming = streaming
self.__init_table()

View File

@@ -1,8 +1,9 @@
import logging
from collections.abc import Iterator
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple
from typing import Any, Optional
import numpy as np
import rapidjson
@@ -83,7 +84,7 @@ class HyperoptTools:
)
@staticmethod
def load_params(filename: Path) -> Dict:
def load_params(filename: Path) -> dict:
"""
Load parameters from file
"""
@@ -92,7 +93,7 @@ class HyperoptTools:
return params
@staticmethod
def try_export_params(config: Config, strategy_name: str, params: Dict):
def try_export_params(config: Config, strategy_name: str, params: dict):
if params.get(FTHYPT_FILEVERSION, 1) >= 2 and not config.get("disableparamexport", False):
# Export parameters ...
fn = HyperoptTools.get_strategy_filename(config, strategy_name)
@@ -113,7 +114,7 @@ class HyperoptTools:
return any(s in config["spaces"] for s in [space, "all", "default"])
@staticmethod
def _read_results(results_file: Path, batch_size: int = 10) -> Iterator[List[Any]]:
def _read_results(results_file: Path, batch_size: int = 10) -> Iterator[list[Any]]:
"""
Stream hyperopt results from file
"""
@@ -143,7 +144,7 @@ class HyperoptTools:
return False
@staticmethod
def load_filtered_results(results_file: Path, config: Config) -> Tuple[List, int]:
def load_filtered_results(results_file: Path, config: Config) -> tuple[list, int]:
filteroptions = {
"only_best": config.get("hyperopt_list_best", False),
"only_profitable": config.get("hyperopt_list_profitable", False),
@@ -204,7 +205,7 @@ class HyperoptTools:
print(f"\n{header_str}:\n\n{explanation_str}\n")
if print_json:
result_dict: Dict = {}
result_dict: dict = {}
for s in [
"buy",
"sell",
@@ -256,7 +257,7 @@ class HyperoptTools:
@staticmethod
def _params_pretty_print(
params, space: str, header: str, non_optimized: Optional[Dict] = None
params, space: str, header: str, non_optimized: Optional[dict] = None
) -> None:
if space in params or (non_optimized and space in non_optimized):
space_params = HyperoptTools._space_params(params, space, 5)
@@ -298,7 +299,7 @@ class HyperoptTools:
print(result)
@staticmethod
def _space_params(params, space: str, r: Optional[int] = None) -> Dict:
def _space_params(params, space: str, r: Optional[int] = None) -> dict:
d = params.get(space)
if d:
# Round floats to `r` digits after the decimal point if requested
@@ -328,7 +329,7 @@ class HyperoptTools:
return bool(results["loss"] < current_best_loss)
@staticmethod
def format_results_explanation_string(results_metrics: Dict, stake_currency: str) -> str:
def format_results_explanation_string(results_metrics: dict, stake_currency: str) -> str:
"""
Return the formatted results explanation in a string
"""

View File

@@ -1,5 +1,5 @@
import logging
from typing import Any, Dict, List, Literal, Union
from typing import Any, Literal, Union
from freqtrade.constants import UNLIMITED_STAKE_AMOUNT, Config
from freqtrade.ft_types import BacktestResultType
@@ -10,7 +10,7 @@ from freqtrade.util import decimals_per_coin, fmt_coin, print_rich_table
logger = logging.getLogger(__name__)
def _get_line_floatfmt(stake_currency: str) -> List[str]:
def _get_line_floatfmt(stake_currency: str) -> list[str]:
"""
Generate floatformat (goes in line with _generate_result_line())
"""
@@ -18,8 +18,8 @@ def _get_line_floatfmt(stake_currency: str) -> List[str]:
def _get_line_header(
first_column: Union[str, List[str]], stake_currency: str, direction: str = "Trades"
) -> List[str]:
first_column: Union[str, list[str]], stake_currency: str, direction: str = "Trades"
) -> list[str]:
"""
Generate header lines (goes in line with _generate_result_line())
"""
@@ -45,7 +45,7 @@ def generate_wins_draws_losses(wins, draws, losses):
def text_table_bt_results(
pair_results: List[Dict[str, Any]], stake_currency: str, title: str
pair_results: list[dict[str, Any]], stake_currency: str, title: str
) -> None:
"""
Generates and returns a text table for the given backtest data and the results dataframe
@@ -73,7 +73,7 @@ def text_table_bt_results(
def text_table_tags(
tag_type: Literal["enter_tag", "exit_tag", "mix_tag"],
tag_results: List[Dict[str, Any]],
tag_results: list[dict[str, Any]],
stake_currency: str,
) -> None:
"""
@@ -123,7 +123,7 @@ def text_table_tags(
def text_table_periodic_breakdown(
days_breakdown_stats: List[Dict[str, Any]], stake_currency: str, period: str
days_breakdown_stats: list[dict[str, Any]], stake_currency: str, period: str
) -> None:
"""
Generate small table with Backtest results by days
@@ -191,7 +191,7 @@ def text_table_strategy(strategy_results, stake_currency: str, title: str):
print_rich_table(output, headers, summary=title)
def text_table_add_metrics(strat_results: Dict) -> None:
def text_table_add_metrics(strat_results: dict) -> None:
if len(strat_results["trades"]) > 0:
best_trade = max(strat_results["trades"], key=lambda x: x["profit_ratio"])
worst_trade = min(strat_results["trades"], key=lambda x: x["profit_ratio"])
@@ -411,7 +411,7 @@ def text_table_add_metrics(strat_results: Dict) -> None:
print(message)
def _show_tag_subresults(results: Dict[str, Any], stake_currency: str):
def _show_tag_subresults(results: dict[str, Any], stake_currency: str):
"""
Print tag subresults (enter_tag, exit_reason_summary, mix_tag_stats)
"""
@@ -426,7 +426,7 @@ def _show_tag_subresults(results: Dict[str, Any], stake_currency: str):
def show_backtest_result(
strategy: str, results: Dict[str, Any], stake_currency: str, backtest_breakdown: List[str]
strategy: str, results: dict[str, Any], stake_currency: str, backtest_breakdown: list[str]
):
"""
Print results for one strategy

View File

@@ -1,6 +1,6 @@
import logging
from pathlib import Path
from typing import Dict, Optional
from typing import Optional
from pandas import DataFrame
@@ -70,7 +70,7 @@ def store_backtest_stats(
def _store_backtest_analysis_data(
recordfilename: Path, data: Dict[str, Dict], dtappendix: str, name: str
recordfilename: Path, data: dict[str, dict], dtappendix: str, name: str
) -> Path:
"""
Stores backtest trade candles for analysis
@@ -91,9 +91,9 @@ def _store_backtest_analysis_data(
def store_backtest_analysis_results(
recordfilename: Path,
candles: Dict[str, Dict],
trades: Dict[str, Dict],
exited: Dict[str, Dict],
candles: dict[str, dict],
trades: dict[str, dict],
exited: dict[str, dict],
dtappendix: str,
) -> None:
_store_backtest_analysis_data(recordfilename, candles, dtappendix, "signals")

View File

@@ -1,7 +1,7 @@
import logging
from copy import deepcopy
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Literal, Tuple, Union
from typing import Any, Literal, Union
import numpy as np
from pandas import DataFrame, Series, concat, to_datetime
@@ -25,8 +25,8 @@ logger = logging.getLogger(__name__)
def generate_trade_signal_candles(
preprocessed_df: Dict[str, DataFrame], bt_results: Dict[str, Any], date_col: str
) -> Dict[str, DataFrame]:
preprocessed_df: dict[str, DataFrame], bt_results: dict[str, Any], date_col: str
) -> dict[str, DataFrame]:
signal_candles_only = {}
for pair in preprocessed_df.keys():
signal_candles_only_df = DataFrame()
@@ -48,8 +48,8 @@ def generate_trade_signal_candles(
def generate_rejected_signals(
preprocessed_df: Dict[str, DataFrame], rejected_dict: Dict[str, DataFrame]
) -> Dict[str, DataFrame]:
preprocessed_df: dict[str, DataFrame], rejected_dict: dict[str, DataFrame]
) -> dict[str, DataFrame]:
rejected_candles_only = {}
for pair, signals in rejected_dict.items():
rejected_signals_only_df = DataFrame()
@@ -69,8 +69,8 @@ def generate_rejected_signals(
def _generate_result_line(
result: DataFrame, starting_balance: int, first_column: Union[str, List[str]]
) -> Dict:
result: DataFrame, starting_balance: int, first_column: Union[str, list[str]]
) -> dict:
"""
Generate one result dict, with "first_column" as key.
"""
@@ -109,12 +109,12 @@ def _generate_result_line(
def generate_pair_metrics(
pairlist: List[str],
pairlist: list[str],
stake_currency: str,
starting_balance: int,
results: DataFrame,
skip_nan: bool = False,
) -> List[Dict]:
) -> list[dict]:
"""
Generates and returns a list for the given backtest data and the results dataframe
:param pairlist: Pairlist used
@@ -143,11 +143,11 @@ def generate_pair_metrics(
def generate_tag_metrics(
tag_type: Union[Literal["enter_tag", "exit_reason"], List[Literal["enter_tag", "exit_reason"]]],
tag_type: Union[Literal["enter_tag", "exit_reason"], list[Literal["enter_tag", "exit_reason"]]],
starting_balance: int,
results: DataFrame,
skip_nan: bool = False,
) -> List[Dict]:
) -> list[dict]:
"""
Generates and returns a list of metrics for the given tag trades and the results dataframe
:param starting_balance: Starting balance
@@ -177,7 +177,7 @@ def generate_tag_metrics(
return []
def generate_strategy_comparison(bt_stats: Dict) -> List[Dict]:
def generate_strategy_comparison(bt_stats: dict) -> list[dict]:
"""
Generate summary per strategy
:param bt_stats: Dict of <Strategyname: DataFrame> containing results for all strategies
@@ -208,8 +208,8 @@ def _get_resample_from_period(period: str) -> str:
def generate_periodic_breakdown_stats(
trade_list: Union[List, DataFrame], period: str
) -> List[Dict[str, Any]]:
trade_list: Union[list, DataFrame], period: str
) -> list[dict[str, Any]]:
results = trade_list if not isinstance(trade_list, list) else DataFrame.from_records(trade_list)
if len(results) == 0:
return []
@@ -237,14 +237,14 @@ def generate_periodic_breakdown_stats(
return stats
def generate_all_periodic_breakdown_stats(trade_list: List) -> Dict[str, List]:
def generate_all_periodic_breakdown_stats(trade_list: list) -> dict[str, list]:
result = {}
for period in BACKTEST_BREAKDOWNS:
result[period] = generate_periodic_breakdown_stats(trade_list, period)
return result
def calc_streak(dataframe: DataFrame) -> Tuple[int, int]:
def calc_streak(dataframe: DataFrame) -> tuple[int, int]:
"""
Calculate consecutive win and loss streaks
:param dataframe: Dataframe containing the trades dataframe, with profit_ratio column
@@ -261,7 +261,7 @@ def calc_streak(dataframe: DataFrame) -> Tuple[int, int]:
return cons_wins, cons_losses
def generate_trading_stats(results: DataFrame) -> Dict[str, Any]:
def generate_trading_stats(results: DataFrame) -> dict[str, Any]:
"""Generate overall trade statistics"""
if len(results) == 0:
return {
@@ -313,7 +313,7 @@ def generate_trading_stats(results: DataFrame) -> Dict[str, Any]:
}
def generate_daily_stats(results: DataFrame) -> Dict[str, Any]:
def generate_daily_stats(results: DataFrame) -> dict[str, Any]:
"""Generate daily statistics"""
if len(results) == 0:
return {
@@ -350,14 +350,14 @@ def generate_daily_stats(results: DataFrame) -> Dict[str, Any]:
def generate_strategy_stats(
pairlist: List[str],
pairlist: list[str],
strategy: str,
content: Dict[str, Any],
content: dict[str, Any],
min_date: datetime,
max_date: datetime,
market_change: float,
is_hyperopt: bool = False,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
:param pairlist: List of pairs to backtest
:param strategy: Strategy name
@@ -368,7 +368,7 @@ def generate_strategy_stats(
:param market_change: float indicating the market change
:return: Dictionary containing results per strategy and a strategy summary.
"""
results: Dict[str, DataFrame] = content["results"]
results: dict[str, DataFrame] = content["results"]
if not isinstance(results, DataFrame):
return {}
config = content["config"]
@@ -558,8 +558,8 @@ def generate_strategy_stats(
def generate_backtest_stats(
btdata: Dict[str, DataFrame],
all_results: Dict[str, Dict[str, Union[DataFrame, Dict]]],
btdata: dict[str, DataFrame],
all_results: dict[str, dict[str, Union[DataFrame, dict]]],
min_date: datetime,
max_date: datetime,
) -> BacktestResultType: