chore: update exchange config to modern typing syntax

This commit is contained in:
Matthias
2024-10-04 06:46:45 +02:00
parent d1b9990e4e
commit f369151e8e
14 changed files with 189 additions and 192 deletions

View File

@@ -3,7 +3,7 @@
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Tuple from typing import Optional
import ccxt import ccxt
@@ -46,14 +46,14 @@ class Binance(Exchange):
"ws_enabled": False, "ws_enabled": False,
} }
_supported_trading_mode_margin_pairs: List[Tuple[TradingMode, MarginMode]] = [ _supported_trading_mode_margin_pairs: list[tuple[TradingMode, MarginMode]] = [
# TradingMode.SPOT always supported and not required in this list # TradingMode.SPOT always supported and not required in this list
# (TradingMode.MARGIN, MarginMode.CROSS), # (TradingMode.MARGIN, MarginMode.CROSS),
# (TradingMode.FUTURES, MarginMode.CROSS), # (TradingMode.FUTURES, MarginMode.CROSS),
(TradingMode.FUTURES, MarginMode.ISOLATED) (TradingMode.FUTURES, MarginMode.ISOLATED)
] ]
def get_tickers(self, symbols: Optional[List[str]] = None, cached: bool = False) -> Tickers: def get_tickers(self, symbols: Optional[list[str]] = None, cached: bool = False) -> Tickers:
tickers = super().get_tickers(symbols=symbols, cached=cached) tickers = super().get_tickers(symbols=symbols, cached=cached)
if self.trading_mode == TradingMode.FUTURES: if self.trading_mode == TradingMode.FUTURES:
# Binance's future result has no bid/ask values. # Binance's future result has no bid/ask values.
@@ -145,8 +145,8 @@ class Binance(Exchange):
return open_date.minute == 0 and open_date.second < 15 return open_date.minute == 0 and open_date.second < 15
def fetch_funding_rates( def fetch_funding_rates(
self, symbols: Optional[List[str]] = None self, symbols: Optional[list[str]] = None
) -> Dict[str, Dict[str, float]]: ) -> dict[str, dict[str, float]]:
""" """
Fetch funding rates for the given symbols. Fetch funding rates for the given symbols.
:param symbols: List of symbols to fetch funding rates for :param symbols: List of symbols to fetch funding rates for
@@ -253,7 +253,7 @@ class Binance(Exchange):
"Freqtrade only supports isolated futures for leverage trading" "Freqtrade only supports isolated futures for leverage trading"
) )
def load_leverage_tiers(self) -> Dict[str, List[Dict]]: def load_leverage_tiers(self) -> dict[str, list[dict]]:
if self.trading_mode == TradingMode.FUTURES: if self.trading_mode == TradingMode.FUTURES:
if self._config["dry_run"]: if self._config["dry_run"]:
leverage_tiers_path = Path(__file__).parent / "binance_leverage_tiers.json" leverage_tiers_path = Path(__file__).parent / "binance_leverage_tiers.json"

View File

@@ -2,7 +2,7 @@
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Dict, List, Optional from typing import Optional
from freqtrade.exchange import Exchange from freqtrade.exchange import Exchange
@@ -17,8 +17,8 @@ class Bitpanda(Exchange):
""" """
def get_trades_for_order( def get_trades_for_order(
self, order_id: str, pair: str, since: datetime, params: Optional[Dict] = None self, order_id: str, pair: str, since: datetime, params: Optional[dict] = None
) -> List: ) -> list:
""" """
Fetch Orders using the "fetch_my_trades" endpoint and filter them by order-id. Fetch Orders using the "fetch_my_trades" endpoint and filter them by order-id.
The "since" argument passed in is coming from the database and is in UTC, The "since" argument passed in is coming from the database and is in UTC,

View File

@@ -2,7 +2,7 @@
import logging import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Optional
import ccxt import ccxt
@@ -53,14 +53,14 @@ class Bybit(Exchange):
}, },
} }
_supported_trading_mode_margin_pairs: List[Tuple[TradingMode, MarginMode]] = [ _supported_trading_mode_margin_pairs: list[tuple[TradingMode, MarginMode]] = [
# TradingMode.SPOT always supported and not required in this list # TradingMode.SPOT always supported and not required in this list
# (TradingMode.FUTURES, MarginMode.CROSS), # (TradingMode.FUTURES, MarginMode.CROSS),
(TradingMode.FUTURES, MarginMode.ISOLATED) (TradingMode.FUTURES, MarginMode.ISOLATED)
] ]
@property @property
def _ccxt_config(self) -> Dict: def _ccxt_config(self) -> dict:
# Parameters to add directly to ccxt sync/async initialization. # Parameters to add directly to ccxt sync/async initialization.
# ccxt defaults to swap mode. # ccxt defaults to swap mode.
config = {} config = {}
@@ -69,7 +69,7 @@ class Bybit(Exchange):
config.update(super()._ccxt_config) config.update(super()._ccxt_config)
return config return config
def market_is_future(self, market: Dict[str, Any]) -> bool: def market_is_future(self, market: dict[str, Any]) -> bool:
main = super().market_is_future(market) main = super().market_is_future(market)
# For ByBit, we'll only support USDT markets for now. # For ByBit, we'll only support USDT markets for now.
return main and market["settle"] == "USDT" return main and market["settle"] == "USDT"
@@ -126,7 +126,7 @@ class Bybit(Exchange):
leverage: float, leverage: float,
reduceOnly: bool, reduceOnly: bool,
time_in_force: str = "GTC", time_in_force: str = "GTC",
) -> Dict: ) -> dict:
params = super()._get_params( params = super()._get_params(
side=side, side=side,
ordertype=ordertype, ordertype=ordertype,
@@ -220,7 +220,7 @@ class Bybit(Exchange):
logger.warning(f"Could not update funding fees for {pair}.") logger.warning(f"Could not update funding fees for {pair}.")
return 0.0 return 0.0
def fetch_orders(self, pair: str, since: datetime, params: Optional[Dict] = None) -> List[Dict]: def fetch_orders(self, pair: str, since: datetime, params: Optional[dict] = None) -> list[dict]:
""" """
Fetch all orders for a pair "since" Fetch all orders for a pair "since"
:param pair: Pair for the query :param pair: Pair for the query
@@ -237,7 +237,7 @@ class Bybit(Exchange):
return orders return orders
def fetch_order(self, order_id: str, pair: str, params: Optional[Dict] = None) -> Dict: def fetch_order(self, order_id: str, pair: str, params: Optional[dict] = None) -> dict:
if self.exchange_has("fetchOrder"): if self.exchange_has("fetchOrder"):
# Set acknowledged to True to avoid ccxt exception # Set acknowledged to True to avoid ccxt exception
params = {"acknowledged": True} params = {"acknowledged": True}
@@ -255,7 +255,7 @@ class Bybit(Exchange):
return order return order
@retrier @retrier
def get_leverage_tiers(self) -> Dict[str, List[Dict]]: def get_leverage_tiers(self) -> dict[str, list[dict]]:
""" """
Cache leverage tiers for 1 day, since they are not expected to change often, and Cache leverage tiers for 1 day, since they are not expected to change often, and
bybit requires pagination to fetch all tiers. bybit requires pagination to fetch all tiers.

View File

@@ -2,7 +2,7 @@ import asyncio
import logging import logging
import time import time
from functools import wraps from functools import wraps
from typing import Any, Callable, Dict, List, Optional, TypeVar, cast, overload from typing import Any, Callable, Optional, TypeVar, cast, overload
from freqtrade.constants import ExchangeConfig from freqtrade.constants import ExchangeConfig
from freqtrade.exceptions import DDosProtection, RetryableOrderError, TemporaryError from freqtrade.exceptions import DDosProtection, RetryableOrderError, TemporaryError
@@ -62,7 +62,7 @@ SUPPORTED_EXCHANGES = [
] ]
# either the main, or replacement methods (array) is required # either the main, or replacement methods (array) is required
EXCHANGE_HAS_REQUIRED: Dict[str, List[str]] = { EXCHANGE_HAS_REQUIRED: dict[str, list[str]] = {
# Required / private # Required / private
"fetchOrder": ["fetchOpenOrder", "fetchClosedOrder"], "fetchOrder": ["fetchOpenOrder", "fetchClosedOrder"],
"fetchL2OrderBook": ["fetchTicker"], "fetchL2OrderBook": ["fetchTicker"],

View File

@@ -7,11 +7,12 @@ import asyncio
import inspect import inspect
import logging import logging
import signal import signal
from collections.abc import Coroutine
from copy import deepcopy from copy import deepcopy
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from math import floor, isnan from math import floor, isnan
from threading import Lock from threading import Lock
from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union from typing import Any, Literal, Optional, Union
import ccxt import ccxt
import ccxt.pro as ccxt_pro import ccxt.pro as ccxt_pro
@@ -114,10 +115,10 @@ logger = logging.getLogger(__name__)
class Exchange: class Exchange:
# Parameters to add directly to buy/sell calls (like agreeing to trading agreement) # Parameters to add directly to buy/sell calls (like agreeing to trading agreement)
_params: Dict = {} _params: dict = {}
# Additional parameters - added to the ccxt object # Additional parameters - added to the ccxt object
_ccxt_params: Dict = {} _ccxt_params: dict = {}
# Dict to specify which options each exchange implements # Dict to specify which options each exchange implements
# This defines defaults, which can be selectively overridden by subclasses using _ft_has # This defines defaults, which can be selectively overridden by subclasses using _ft_has
@@ -160,7 +161,7 @@ class Exchange:
_ft_has: FtHas = {} _ft_has: FtHas = {}
_ft_has_futures: FtHas = {} _ft_has_futures: FtHas = {}
_supported_trading_mode_margin_pairs: List[Tuple[TradingMode, MarginMode]] = [ _supported_trading_mode_margin_pairs: list[tuple[TradingMode, MarginMode]] = [
# TradingMode.SPOT always supported and not required in this list # TradingMode.SPOT always supported and not required in this list
] ]
@@ -181,9 +182,9 @@ class Exchange:
self._api_async: ccxt_pro.Exchange self._api_async: ccxt_pro.Exchange
self._ws_async: ccxt_pro.Exchange = None self._ws_async: ccxt_pro.Exchange = None
self._exchange_ws: Optional[ExchangeWS] = None self._exchange_ws: Optional[ExchangeWS] = None
self._markets: Dict = {} self._markets: dict = {}
self._trading_fees: Dict[str, Any] = {} self._trading_fees: dict[str, Any] = {}
self._leverage_tiers: Dict[str, List[Dict]] = {} self._leverage_tiers: dict[str, list[dict]] = {}
# Lock event loop. This is necessary to avoid race-conditions when using force* commands # Lock event loop. This is necessary to avoid race-conditions when using force* commands
# Due to funding fee fetching. # Due to funding fee fetching.
self._loop_lock = Lock() self._loop_lock = Lock()
@@ -193,7 +194,7 @@ class Exchange:
self._config.update(config) self._config.update(config)
# Holds last candle refreshed time of each pair # Holds last candle refreshed time of each pair
self._pairs_last_refresh_time: Dict[PairWithTimeframe, int] = {} self._pairs_last_refresh_time: dict[PairWithTimeframe, int] = {}
# Timestamp of last markets refresh # Timestamp of last markets refresh
self._last_markets_refresh: int = 0 self._last_markets_refresh: int = 0
@@ -208,19 +209,19 @@ class Exchange:
self._entry_rate_cache: TTLCache = TTLCache(maxsize=100, ttl=300) self._entry_rate_cache: TTLCache = TTLCache(maxsize=100, ttl=300)
# Holds candles # Holds candles
self._klines: Dict[PairWithTimeframe, DataFrame] = {} self._klines: dict[PairWithTimeframe, DataFrame] = {}
self._expiring_candle_cache: Dict[Tuple[str, int], PeriodicCache] = {} self._expiring_candle_cache: dict[tuple[str, int], PeriodicCache] = {}
# Holds public_trades # Holds public_trades
self._trades: Dict[PairWithTimeframe, DataFrame] = {} self._trades: dict[PairWithTimeframe, DataFrame] = {}
# Holds all open sell orders for dry_run # Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {} self._dry_run_open_orders: dict[str, Any] = {}
if config["dry_run"]: if config["dry_run"]:
logger.info("Instance is running with dry_run enabled") logger.info("Instance is running with dry_run enabled")
logger.info(f"Using CCXT {ccxt.__version__}") logger.info(f"Using CCXT {ccxt.__version__}")
exchange_conf: Dict[str, Any] = exchange_config if exchange_config else config["exchange"] exchange_conf: dict[str, Any] = exchange_config if exchange_config else config["exchange"]
remove_exchange_credentials(exchange_conf, config.get("dry_run", False)) remove_exchange_credentials(exchange_conf, config.get("dry_run", False))
self.log_responses = exchange_conf.get("log_responses", False) self.log_responses = exchange_conf.get("log_responses", False)
@@ -339,7 +340,7 @@ class Exchange:
self.validate_freqai(config) self.validate_freqai(config)
def _init_ccxt( def _init_ccxt(
self, exchange_config: Dict[str, Any], sync: bool, ccxt_kwargs: Dict[str, Any] self, exchange_config: dict[str, Any], sync: bool, ccxt_kwargs: dict[str, Any]
) -> ccxt.Exchange: ) -> ccxt.Exchange:
""" """
Initialize ccxt with given config and return valid ccxt instance. Initialize ccxt with given config and return valid ccxt instance.
@@ -390,7 +391,7 @@ class Exchange:
return api return api
@property @property
def _ccxt_config(self) -> Dict: def _ccxt_config(self) -> dict:
# Parameters to add directly to ccxt sync/async initialization. # Parameters to add directly to ccxt sync/async initialization.
if self.trading_mode == TradingMode.MARGIN: if self.trading_mode == TradingMode.MARGIN:
return {"options": {"defaultType": "margin"}} return {"options": {"defaultType": "margin"}}
@@ -410,11 +411,11 @@ class Exchange:
return self._api.id return self._api.id
@property @property
def timeframes(self) -> List[str]: def timeframes(self) -> list[str]:
return list((self._api.timeframes or {}).keys()) return list((self._api.timeframes or {}).keys())
@property @property
def markets(self) -> Dict[str, Any]: def markets(self) -> dict[str, Any]:
"""exchange ccxt markets""" """exchange ccxt markets"""
if not self._markets: if not self._markets:
logger.info("Markets were not loaded. Loading them now..") logger.info("Markets were not loaded. Loading them now..")
@@ -471,14 +472,14 @@ class Exchange:
def get_markets( def get_markets(
self, self,
base_currencies: Optional[List[str]] = None, base_currencies: Optional[list[str]] = None,
quote_currencies: Optional[List[str]] = None, quote_currencies: Optional[list[str]] = None,
spot_only: bool = False, spot_only: bool = False,
margin_only: bool = False, margin_only: bool = False,
futures_only: bool = False, futures_only: bool = False,
tradable_only: bool = True, tradable_only: bool = True,
active_only: bool = False, active_only: bool = False,
) -> Dict[str, Any]: ) -> dict[str, Any]:
""" """
Return exchange ccxt markets, filtered out by base currency and quote currency Return exchange ccxt markets, filtered out by base currency and quote currency
if this was requested in parameters. if this was requested in parameters.
@@ -503,7 +504,7 @@ class Exchange:
markets = {k: v for k, v in markets.items() if market_is_active(v)} markets = {k: v for k, v in markets.items() if market_is_active(v)}
return markets return markets
def get_quote_currencies(self) -> List[str]: def get_quote_currencies(self) -> list[str]:
""" """
Return a list of supported quote currencies Return a list of supported quote currencies
""" """
@@ -518,19 +519,19 @@ class Exchange:
"""Return a pair's base currency (base/quote:settlement)""" """Return a pair's base currency (base/quote:settlement)"""
return self.markets.get(pair, {}).get("base", "") return self.markets.get(pair, {}).get("base", "")
def market_is_future(self, market: Dict[str, Any]) -> bool: def market_is_future(self, market: dict[str, Any]) -> bool:
return ( return (
market.get(self._ft_has["ccxt_futures_name"], False) is True market.get(self._ft_has["ccxt_futures_name"], False) is True
and market.get("linear", False) is True and market.get("linear", False) is True
) )
def market_is_spot(self, market: Dict[str, Any]) -> bool: def market_is_spot(self, market: dict[str, Any]) -> bool:
return market.get("spot", False) is True return market.get("spot", False) is True
def market_is_margin(self, market: Dict[str, Any]) -> bool: def market_is_margin(self, market: dict[str, Any]) -> bool:
return market.get("margin", False) is True return market.get("margin", False) is True
def market_is_tradable(self, market: Dict[str, Any]) -> bool: def market_is_tradable(self, market: dict[str, Any]) -> bool:
""" """
Check if the market symbol is tradable by Freqtrade. Check if the market symbol is tradable by Freqtrade.
Ensures that Configured mode aligns to Ensures that Configured mode aligns to
@@ -578,7 +579,7 @@ class Exchange:
else: else:
return 1 return 1
def _trades_contracts_to_amount(self, trades: List) -> List: def _trades_contracts_to_amount(self, trades: list) -> list:
if len(trades) > 0 and "symbol" in trades[0]: if len(trades) > 0 and "symbol" in trades[0]:
contract_size = self.get_contract_size(trades[0]["symbol"]) contract_size = self.get_contract_size(trades[0]["symbol"])
if contract_size != 1: if contract_size != 1:
@@ -586,7 +587,7 @@ class Exchange:
trade["amount"] = trade["amount"] * contract_size trade["amount"] = trade["amount"] * contract_size
return trades return trades
def _order_contracts_to_amount(self, order: Dict) -> Dict: def _order_contracts_to_amount(self, order: dict) -> dict:
if "symbol" in order and order["symbol"] is not None: if "symbol" in order and order["symbol"] is not None:
contract_size = self.get_contract_size(order["symbol"]) contract_size = self.get_contract_size(order["symbol"])
if contract_size != 1: if contract_size != 1:
@@ -620,7 +621,7 @@ class Exchange:
if self._exchange_ws: if self._exchange_ws:
self._exchange_ws.reset_connections() self._exchange_ws.reset_connections()
async def _api_reload_markets(self, reload: bool = False) -> Dict[str, Any]: async def _api_reload_markets(self, reload: bool = False) -> dict[str, Any]:
try: try:
return await self._api_async.load_markets(reload=reload, params={}) return await self._api_async.load_markets(reload=reload, params={})
except ccxt.DDoSProtection as e: except ccxt.DDoSProtection as e:
@@ -632,7 +633,7 @@ class Exchange:
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise TemporaryError(e) from e raise TemporaryError(e) from e
def _load_async_markets(self, reload: bool = False) -> Dict[str, Any]: def _load_async_markets(self, reload: bool = False) -> dict[str, Any]:
try: try:
markets = self.loop.run_until_complete(self._api_reload_markets(reload=reload)) markets = self.loop.run_until_complete(self._api_reload_markets(reload=reload))
@@ -734,7 +735,7 @@ class Exchange:
): ):
raise ConfigurationError("Timeframes < 1m are currently not supported by Freqtrade.") raise ConfigurationError("Timeframes < 1m are currently not supported by Freqtrade.")
def validate_ordertypes(self, order_types: Dict) -> None: def validate_ordertypes(self, order_types: dict) -> None:
""" """
Checks if order-types configured in strategy/config are supported Checks if order-types configured in strategy/config are supported
""" """
@@ -743,7 +744,7 @@ class Exchange:
raise ConfigurationError(f"Exchange {self.name} does not support market orders.") raise ConfigurationError(f"Exchange {self.name} does not support market orders.")
self.validate_stop_ordertypes(order_types) self.validate_stop_ordertypes(order_types)
def validate_stop_ordertypes(self, order_types: Dict) -> None: def validate_stop_ordertypes(self, order_types: dict) -> None:
""" """
Validate stoploss order types Validate stoploss order types
""" """
@@ -762,7 +763,7 @@ class Exchange:
f"On exchange stoploss price type is not supported for {self.name}." f"On exchange stoploss price type is not supported for {self.name}."
) )
def validate_pricing(self, pricing: Dict) -> None: def validate_pricing(self, pricing: dict) -> None:
if pricing.get("use_order_book", False) and not self.exchange_has("fetchL2OrderBook"): if pricing.get("use_order_book", False) and not self.exchange_has("fetchL2OrderBook"):
raise ConfigurationError(f"Orderbook not available for {self.name}.") raise ConfigurationError(f"Orderbook not available for {self.name}.")
if not pricing.get("use_order_book", False) and ( if not pricing.get("use_order_book", False) and (
@@ -770,7 +771,7 @@ class Exchange:
): ):
raise ConfigurationError(f"Ticker pricing not available for {self.name}.") raise ConfigurationError(f"Ticker pricing not available for {self.name}.")
def validate_order_time_in_force(self, order_time_in_force: Dict) -> None: def validate_order_time_in_force(self, order_time_in_force: dict) -> None:
""" """
Checks if order time in force configured in strategy/config are supported Checks if order time in force configured in strategy/config are supported
""" """
@@ -782,7 +783,7 @@ class Exchange:
f"Time in force policies are not supported for {self.name} yet." f"Time in force policies are not supported for {self.name} yet."
) )
def validate_orderflow(self, exchange: Dict) -> None: def validate_orderflow(self, exchange: dict) -> None:
if exchange.get("use_public_trades", False) and ( if exchange.get("use_public_trades", False) and (
not self.exchange_has("fetchTrades") or not self._ft_has["trades_has_history"] not self.exchange_has("fetchTrades") or not self._ft_has["trades_has_history"]
): ):
@@ -1000,16 +1001,16 @@ class Exchange:
amount: float, amount: float,
rate: float, rate: float,
leverage: float, leverage: float,
params: Optional[Dict] = None, params: Optional[dict] = None,
stop_loss: bool = False, stop_loss: bool = False,
) -> Dict[str, Any]: ) -> dict[str, Any]:
now = dt_now() now = dt_now()
order_id = f"dry_run_{side}_{pair}_{now.timestamp()}" order_id = f"dry_run_{side}_{pair}_{now.timestamp()}"
# Rounding here must respect to contract sizes # Rounding here must respect to contract sizes
_amount = self._contracts_to_amount( _amount = self._contracts_to_amount(
pair, self.amount_to_precision(pair, self._amount_to_contracts(pair, amount)) pair, self.amount_to_precision(pair, self._amount_to_contracts(pair, amount))
) )
dry_order: Dict[str, Any] = { dry_order: dict[str, Any] = {
"id": order_id, "id": order_id,
"symbol": pair, "symbol": pair,
"price": rate, "price": rate,
@@ -1071,9 +1072,9 @@ class Exchange:
def add_dry_order_fee( def add_dry_order_fee(
self, self,
pair: str, pair: str,
dry_order: Dict[str, Any], dry_order: dict[str, Any],
taker_or_maker: MakerTaker, taker_or_maker: MakerTaker,
) -> Dict[str, Any]: ) -> dict[str, Any]:
fee = self.get_fee(pair, taker_or_maker=taker_or_maker) fee = self.get_fee(pair, taker_or_maker=taker_or_maker)
dry_order.update( dry_order.update(
{ {
@@ -1157,8 +1158,8 @@ class Exchange:
return False return False
def check_dry_limit_order_filled( def check_dry_limit_order_filled(
self, order: Dict[str, Any], immediate: bool = False, orderbook: Optional[OrderBook] = None self, order: dict[str, Any], immediate: bool = False, orderbook: Optional[OrderBook] = None
) -> Dict[str, Any]: ) -> dict[str, Any]:
""" """
Check dry-run limit order fill and update fee (if it filled). Check dry-run limit order fill and update fee (if it filled).
""" """
@@ -1185,7 +1186,7 @@ class Exchange:
return order return order
def fetch_dry_run_order(self, order_id) -> Dict[str, Any]: def fetch_dry_run_order(self, order_id) -> dict[str, Any]:
""" """
Return dry-run order Return dry-run order
Only call if running in dry-run mode. Only call if running in dry-run mode.
@@ -1221,7 +1222,7 @@ class Exchange:
leverage: float, leverage: float,
reduceOnly: bool, reduceOnly: bool,
time_in_force: str = "GTC", time_in_force: str = "GTC",
) -> Dict: ) -> dict:
params = self._params.copy() params = self._params.copy()
if time_in_force != "GTC" and ordertype != "market": if time_in_force != "GTC" and ordertype != "market":
params.update({"timeInForce": time_in_force.upper()}) params.update({"timeInForce": time_in_force.upper()})
@@ -1247,7 +1248,7 @@ class Exchange:
leverage: float, leverage: float,
reduceOnly: bool = False, reduceOnly: bool = False,
time_in_force: str = "GTC", time_in_force: str = "GTC",
) -> Dict: ) -> dict:
if self._config["dry_run"]: if self._config["dry_run"]:
dry_order = self.create_dry_run_order( dry_order = self.create_dry_run_order(
pair, ordertype, side, amount, self.price_to_precision(pair, rate), leverage pair, ordertype, side, amount, self.price_to_precision(pair, rate), leverage
@@ -1305,7 +1306,7 @@ class Exchange:
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(e) from e raise OperationalException(e) from e
def stoploss_adjust(self, stop_loss: float, order: Dict, side: str) -> bool: def stoploss_adjust(self, stop_loss: float, order: dict, side: str) -> bool:
""" """
Verify stop_loss against stoploss-order value (limit or price) Verify stop_loss against stoploss-order value (limit or price)
Returns True if adjustment is necessary. Returns True if adjustment is necessary.
@@ -1318,8 +1319,8 @@ class Exchange:
or (side == "buy" and stop_loss < float(order[price_param])) or (side == "buy" and stop_loss < float(order[price_param]))
) )
def _get_stop_order_type(self, user_order_type) -> Tuple[str, str]: def _get_stop_order_type(self, user_order_type) -> tuple[str, str]:
available_order_Types: Dict[str, str] = self._ft_has["stoploss_order_types"] available_order_Types: dict[str, str] = self._ft_has["stoploss_order_types"]
if user_order_type in available_order_Types.keys(): if user_order_type in available_order_Types.keys():
ordertype = available_order_Types[user_order_type] ordertype = available_order_Types[user_order_type]
@@ -1329,7 +1330,7 @@ class Exchange:
user_order_type = list(available_order_Types.keys())[0] user_order_type = list(available_order_Types.keys())[0]
return ordertype, user_order_type return ordertype, user_order_type
def _get_stop_limit_rate(self, stop_price: float, order_types: Dict, side: str) -> float: def _get_stop_limit_rate(self, stop_price: float, order_types: dict, side: str) -> float:
# Limit price threshold: As limit price should always be below stop-price # Limit price threshold: As limit price should always be below stop-price
limit_price_pct = order_types.get("stoploss_on_exchange_limit_ratio", 0.99) limit_price_pct = order_types.get("stoploss_on_exchange_limit_ratio", 0.99)
if side == "sell": if side == "sell":
@@ -1351,7 +1352,7 @@ class Exchange:
) )
return limit_rate return limit_rate
def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> Dict: def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> dict:
params = self._params.copy() params = self._params.copy()
# Verify if stopPrice works for your exchange, else configure stop_price_param # Verify if stopPrice works for your exchange, else configure stop_price_param
params.update({self._ft_has["stop_price_param"]: stop_price}) params.update({self._ft_has["stop_price_param"]: stop_price})
@@ -1363,10 +1364,10 @@ class Exchange:
pair: str, pair: str,
amount: float, amount: float,
stop_price: float, stop_price: float,
order_types: Dict, order_types: dict,
side: BuySell, side: BuySell,
leverage: float, leverage: float,
) -> Dict: ) -> dict:
""" """
creates a stoploss order. creates a stoploss order.
requires `_ft_has['stoploss_order_types']` to be set as a dict mapping limit and market requires `_ft_has['stoploss_order_types']` to be set as a dict mapping limit and market
@@ -1459,7 +1460,7 @@ class Exchange:
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(e) from e raise OperationalException(e) from e
def fetch_order_emulated(self, order_id: str, pair: str, params: Dict) -> Dict: def fetch_order_emulated(self, order_id: str, pair: str, params: dict) -> dict:
""" """
Emulated fetch_order if the exchange doesn't support fetch_order, but requires separate Emulated fetch_order if the exchange doesn't support fetch_order, but requires separate
calls for open and closed orders. calls for open and closed orders.
@@ -1493,7 +1494,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@retrier(retries=API_FETCH_ORDER_RETRY_COUNT) @retrier(retries=API_FETCH_ORDER_RETRY_COUNT)
def fetch_order(self, order_id: str, pair: str, params: Optional[Dict] = None) -> Dict: def fetch_order(self, order_id: str, pair: str, params: Optional[dict] = None) -> dict:
if self._config["dry_run"]: if self._config["dry_run"]:
return self.fetch_dry_run_order(order_id) return self.fetch_dry_run_order(order_id)
if params is None: if params is None:
@@ -1522,12 +1523,12 @@ class Exchange:
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(e) from e raise OperationalException(e) from e
def fetch_stoploss_order(self, order_id: str, pair: str, params: Optional[Dict] = None) -> Dict: def fetch_stoploss_order(self, order_id: str, pair: str, params: Optional[dict] = None) -> dict:
return self.fetch_order(order_id, pair, params) return self.fetch_order(order_id, pair, params)
def fetch_order_or_stoploss_order( def fetch_order_or_stoploss_order(
self, order_id: str, pair: str, stoploss_order: bool = False self, order_id: str, pair: str, stoploss_order: bool = False
) -> Dict: ) -> dict:
""" """
Simple wrapper calling either fetch_order or fetch_stoploss_order depending on Simple wrapper calling either fetch_order or fetch_stoploss_order depending on
the stoploss_order parameter the stoploss_order parameter
@@ -1539,7 +1540,7 @@ class Exchange:
return self.fetch_stoploss_order(order_id, pair) return self.fetch_stoploss_order(order_id, pair)
return self.fetch_order(order_id, pair) return self.fetch_order(order_id, pair)
def check_order_canceled_empty(self, order: Dict) -> bool: def check_order_canceled_empty(self, order: dict) -> bool:
""" """
Verify if an order has been cancelled without being partially filled Verify if an order has been cancelled without being partially filled
:param order: Order dict as returned from fetch_order() :param order: Order dict as returned from fetch_order()
@@ -1548,7 +1549,7 @@ class Exchange:
return order.get("status") in NON_OPEN_EXCHANGE_STATES and order.get("filled") == 0.0 return order.get("status") in NON_OPEN_EXCHANGE_STATES and order.get("filled") == 0.0
@retrier @retrier
def cancel_order(self, order_id: str, pair: str, params: Optional[Dict] = None) -> Dict: def cancel_order(self, order_id: str, pair: str, params: Optional[dict] = None) -> dict:
if self._config["dry_run"]: if self._config["dry_run"]:
try: try:
order = self.fetch_dry_run_order(order_id) order = self.fetch_dry_run_order(order_id)
@@ -1577,8 +1578,8 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
def cancel_stoploss_order( def cancel_stoploss_order(
self, order_id: str, pair: str, params: Optional[Dict] = None self, order_id: str, pair: str, params: Optional[dict] = None
) -> Dict: ) -> dict:
return self.cancel_order(order_id, pair, params) return self.cancel_order(order_id, pair, params)
def is_cancel_order_result_suitable(self, corder) -> bool: def is_cancel_order_result_suitable(self, corder) -> bool:
@@ -1588,7 +1589,7 @@ class Exchange:
required = ("fee", "status", "amount") required = ("fee", "status", "amount")
return all(corder.get(k, None) is not None for k in required) return all(corder.get(k, None) is not None for k in required)
def cancel_order_with_result(self, order_id: str, pair: str, amount: float) -> Dict: def cancel_order_with_result(self, order_id: str, pair: str, amount: float) -> dict:
""" """
Cancel order returning a result. Cancel order returning a result.
Creates a fake result if cancel order returns a non-usable result Creates a fake result if cancel order returns a non-usable result
@@ -1619,7 +1620,7 @@ class Exchange:
return order return order
def cancel_stoploss_order_with_result(self, order_id: str, pair: str, amount: float) -> Dict: def cancel_stoploss_order_with_result(self, order_id: str, pair: str, amount: float) -> dict:
""" """
Cancel stoploss order returning a result. Cancel stoploss order returning a result.
Creates a fake result if cancel order returns a non-usable result Creates a fake result if cancel order returns a non-usable result
@@ -1661,7 +1662,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@retrier @retrier
def fetch_positions(self, pair: Optional[str] = None) -> List[CcxtPosition]: def fetch_positions(self, pair: Optional[str] = None) -> list[CcxtPosition]:
""" """
Fetch positions from the exchange. Fetch positions from the exchange.
If no pair is given, all positions are returned. If no pair is given, all positions are returned.
@@ -1673,7 +1674,7 @@ class Exchange:
symbols = [] symbols = []
if pair: if pair:
symbols.append(pair) symbols.append(pair)
positions: List[CcxtPosition] = self._api.fetch_positions(symbols) positions: list[CcxtPosition] = self._api.fetch_positions(symbols)
self._log_exchange_response("fetch_positions", positions) self._log_exchange_response("fetch_positions", positions)
return positions return positions
except ccxt.DDoSProtection as e: except ccxt.DDoSProtection as e:
@@ -1685,7 +1686,7 @@ class Exchange:
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(e) from e raise OperationalException(e) from e
def _fetch_orders_emulate(self, pair: str, since_ms: int) -> List[Dict]: def _fetch_orders_emulate(self, pair: str, since_ms: int) -> list[dict]:
orders = [] orders = []
if self.exchange_has("fetchClosedOrders"): if self.exchange_has("fetchClosedOrders"):
orders = self._api.fetch_closed_orders(pair, since=since_ms) orders = self._api.fetch_closed_orders(pair, since=since_ms)
@@ -1695,7 +1696,7 @@ class Exchange:
return orders return orders
@retrier(retries=0) @retrier(retries=0)
def fetch_orders(self, pair: str, since: datetime, params: Optional[Dict] = None) -> List[Dict]: def fetch_orders(self, pair: str, since: datetime, params: Optional[dict] = None) -> list[dict]:
""" """
Fetch all orders for a pair "since" Fetch all orders for a pair "since"
:param pair: Pair for the query :param pair: Pair for the query
@@ -1711,7 +1712,7 @@ class Exchange:
if not params: if not params:
params = {} params = {}
try: try:
orders: List[Dict] = self._api.fetch_orders(pair, since=since_ms, params=params) orders: list[dict] = self._api.fetch_orders(pair, since=since_ms, params=params)
except ccxt.NotSupported: except ccxt.NotSupported:
# Some exchanges don't support fetchOrders # Some exchanges don't support fetchOrders
# attempt to fetch open and closed orders separately # attempt to fetch open and closed orders separately
@@ -1731,7 +1732,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@retrier @retrier
def fetch_trading_fees(self) -> Dict[str, Any]: def fetch_trading_fees(self) -> dict[str, Any]:
""" """
Fetch user account trading fees Fetch user account trading fees
Can be cached, should not update often. Can be cached, should not update often.
@@ -1743,7 +1744,7 @@ class Exchange:
): ):
return {} return {}
try: try:
trading_fees: Dict[str, Any] = self._api.fetch_trading_fees() trading_fees: dict[str, Any] = self._api.fetch_trading_fees()
self._log_exchange_response("fetch_trading_fees", trading_fees) self._log_exchange_response("fetch_trading_fees", trading_fees)
return trading_fees return trading_fees
except ccxt.DDoSProtection as e: except ccxt.DDoSProtection as e:
@@ -1756,7 +1757,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@retrier @retrier
def fetch_bids_asks(self, symbols: Optional[List[str]] = None, cached: bool = False) -> Dict: def fetch_bids_asks(self, symbols: Optional[list[str]] = None, cached: bool = False) -> dict:
""" """
:param symbols: List of symbols to fetch :param symbols: List of symbols to fetch
:param cached: Allow cached result :param cached: Allow cached result
@@ -1789,7 +1790,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@retrier @retrier
def get_tickers(self, symbols: Optional[List[str]] = None, cached: bool = False) -> Tickers: def get_tickers(self, symbols: Optional[list[str]] = None, cached: bool = False) -> Tickers:
""" """
:param cached: Allow cached result :param cached: Allow cached result
:return: fetch_tickers result :return: fetch_tickers result
@@ -1849,7 +1850,7 @@ class Exchange:
@staticmethod @staticmethod
def get_next_limit_in_list( def get_next_limit_in_list(
limit: int, limit_range: Optional[List[int]], range_required: bool = True limit: int, limit_range: Optional[list[int]], range_required: bool = True
): ):
""" """
Get next greater value in the list. Get next greater value in the list.
@@ -1890,7 +1891,7 @@ class Exchange:
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(e) from e raise OperationalException(e) from e
def _get_price_side(self, side: str, is_short: bool, conf_strategy: Dict) -> BidAsk: def _get_price_side(self, side: str, is_short: bool, conf_strategy: dict) -> BidAsk:
price_side = conf_strategy["price_side"] price_side = conf_strategy["price_side"]
if price_side in ("same", "other"): if price_side in ("same", "other"):
@@ -1962,7 +1963,7 @@ class Exchange:
return rate return rate
def _get_rate_from_ticker( def _get_rate_from_ticker(
self, side: EntryExit, ticker: Ticker, conf_strategy: Dict[str, Any], price_side: BidAsk self, side: EntryExit, ticker: Ticker, conf_strategy: dict[str, Any], price_side: BidAsk
) -> Optional[float]: ) -> Optional[float]:
""" """
Get rate from ticker. Get rate from ticker.
@@ -2008,7 +2009,7 @@ class Exchange:
) )
return rate return rate
def get_rates(self, pair: str, refresh: bool, is_short: bool) -> Tuple[float, float]: def get_rates(self, pair: str, refresh: bool, is_short: bool) -> tuple[float, float]:
entry_rate = None entry_rate = None
exit_rate = None exit_rate = None
if not refresh: if not refresh:
@@ -2042,8 +2043,8 @@ class Exchange:
@retrier @retrier
def get_trades_for_order( def get_trades_for_order(
self, order_id: str, pair: str, since: datetime, params: Optional[Dict] = None self, order_id: str, pair: str, since: datetime, params: Optional[dict] = None
) -> List: ) -> list:
""" """
Fetch Orders using the "fetch_my_trades" endpoint and filter them by order-id. Fetch Orders using the "fetch_my_trades" endpoint and filter them by order-id.
The "since" argument passed in is coming from the database and is in UTC, The "since" argument passed in is coming from the database and is in UTC,
@@ -2089,7 +2090,7 @@ class Exchange:
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(e) from e raise OperationalException(e) from e
def get_order_id_conditional(self, order: Dict[str, Any]) -> str: def get_order_id_conditional(self, order: dict[str, Any]) -> str:
return order["id"] return order["id"]
@retrier @retrier
@@ -2138,7 +2139,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@staticmethod @staticmethod
def order_has_fee(order: Dict) -> bool: def order_has_fee(order: dict) -> bool:
""" """
Verifies if the passed in order dict has the needed keys to extract fees, Verifies if the passed in order dict has the needed keys to extract fees,
and that these keys (currency, cost) are not empty. and that these keys (currency, cost) are not empty.
@@ -2156,7 +2157,7 @@ class Exchange:
) )
def calculate_fee_rate( def calculate_fee_rate(
self, fee: Dict, symbol: str, cost: float, amount: float self, fee: dict, symbol: str, cost: float, amount: float
) -> Optional[float]: ) -> Optional[float]:
""" """
Calculate fee rate if it's not given by the exchange. Calculate fee rate if it's not given by the exchange.
@@ -2196,8 +2197,8 @@ class Exchange:
return round((fee_cost * fee_to_quote_rate) / cost, 8) return round((fee_cost * fee_to_quote_rate) / cost, 8)
def extract_cost_curr_rate( def extract_cost_curr_rate(
self, fee: Dict, symbol: str, cost: float, amount: float self, fee: dict, symbol: str, cost: float, amount: float
) -> Tuple[float, str, Optional[float]]: ) -> tuple[float, str, Optional[float]]:
""" """
Extract tuple of cost, currency, rate. Extract tuple of cost, currency, rate.
Requires order_has_fee to run first! Requires order_has_fee to run first!
@@ -2277,7 +2278,7 @@ class Exchange:
for since in range(since_ms, until_ms or dt_ts(), one_call) for since in range(since_ms, until_ms or dt_ts(), one_call)
] ]
data: List = [] data: list = []
# Chunk requests into batches of 100 to avoid overwhelming ccxt Throttling # Chunk requests into batches of 100 to avoid overwhelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100): for input_coro in chunks(input_coroutines, 100):
results = await asyncio.gather(*input_coro, return_exceptions=True) results = await asyncio.gather(*input_coro, return_exceptions=True)
@@ -2371,11 +2372,11 @@ class Exchange:
def _build_ohlcv_dl_jobs( def _build_ohlcv_dl_jobs(
self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], cache: bool self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], cache: bool
) -> Tuple[List[Coroutine], List[PairWithTimeframe]]: ) -> tuple[list[Coroutine], list[PairWithTimeframe]]:
""" """
Build Coroutines to execute as part of refresh_latest_ohlcv Build Coroutines to execute as part of refresh_latest_ohlcv
""" """
input_coroutines: List[Coroutine[Any, Any, OHLCVResponse]] = [] input_coroutines: list[Coroutine[Any, Any, OHLCVResponse]] = []
cached_pairs = [] cached_pairs = []
for pair, timeframe, candle_type in set(pair_list): for pair, timeframe, candle_type in set(pair_list):
if timeframe not in self.timeframes and candle_type in ( if timeframe not in self.timeframes and candle_type in (
@@ -2411,7 +2412,7 @@ class Exchange:
pair: str, pair: str,
timeframe: str, timeframe: str,
c_type: CandleType, c_type: CandleType,
ticks: List[List], ticks: list[list],
cache: bool, cache: bool,
drop_incomplete: bool, drop_incomplete: bool,
) -> DataFrame: ) -> DataFrame:
@@ -2450,7 +2451,7 @@ class Exchange:
since_ms: Optional[int] = None, since_ms: Optional[int] = None,
cache: bool = True, cache: bool = True,
drop_incomplete: Optional[bool] = None, drop_incomplete: Optional[bool] = None,
) -> Dict[PairWithTimeframe, DataFrame]: ) -> dict[PairWithTimeframe, DataFrame]:
""" """
Refresh in-memory OHLCV asynchronously and set `_klines` with the result Refresh in-memory OHLCV asynchronously and set `_klines` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
@@ -2499,8 +2500,8 @@ class Exchange:
return results_df return results_df
def refresh_ohlcv_with_cache( def refresh_ohlcv_with_cache(
self, pairs: List[PairWithTimeframe], since_ms: int self, pairs: list[PairWithTimeframe], since_ms: int
) -> Dict[PairWithTimeframe, DataFrame]: ) -> dict[PairWithTimeframe, DataFrame]:
""" """
Refresh ohlcv data for all pairs in needed_pairs if necessary. Refresh ohlcv data for all pairs in needed_pairs if necessary.
Caches data with expiring per timeframe. Caches data with expiring per timeframe.
@@ -2618,7 +2619,7 @@ class Exchange:
timeframe: str, timeframe: str,
limit: int, limit: int,
since_ms: Optional[int] = None, since_ms: Optional[int] = None,
) -> List[List]: ) -> list[list]:
""" """
Fetch funding rate history - used to selectively override this by subclasses. Fetch funding rate history - used to selectively override this by subclasses.
""" """
@@ -2652,7 +2653,7 @@ class Exchange:
pair: str, pair: str,
timeframe: str, timeframe: str,
c_type: CandleType, c_type: CandleType,
ticks: List[List], ticks: list[list],
cache: bool, cache: bool,
first_required_candle_date: int, first_required_candle_date: int,
) -> DataFrame: ) -> DataFrame:
@@ -2676,13 +2677,13 @@ class Exchange:
async def _build_trades_dl_jobs( async def _build_trades_dl_jobs(
self, pairwt: PairWithTimeframe, data_handler, cache: bool self, pairwt: PairWithTimeframe, data_handler, cache: bool
) -> Tuple[PairWithTimeframe, Optional[DataFrame]]: ) -> tuple[PairWithTimeframe, Optional[DataFrame]]:
""" """
Build coroutines to refresh trades for (they're then called through async.gather) Build coroutines to refresh trades for (they're then called through async.gather)
""" """
pair, timeframe, candle_type = pairwt pair, timeframe, candle_type = pairwt
since_ms = None since_ms = None
new_ticks: List = [] new_ticks: list = []
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"]) all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"])
first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type) first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type)
# refresh, if # refresh, if
@@ -2767,7 +2768,7 @@ class Exchange:
pair_list: ListPairsWithTimeframes, pair_list: ListPairsWithTimeframes,
*, *,
cache: bool = True, cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]: ) -> dict[PairWithTimeframe, DataFrame]:
""" """
Refresh in-memory TRADES asynchronously and set `_trades` with the result Refresh in-memory TRADES asynchronously and set `_trades` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
@@ -2821,7 +2822,7 @@ class Exchange:
@retrier_async @retrier_async
async def _async_fetch_trades( async def _async_fetch_trades(
self, pair: str, since: Optional[int] = None, params: Optional[dict] = None self, pair: str, since: Optional[int] = None, params: Optional[dict] = None
) -> Tuple[List[List], Any]: ) -> tuple[list[list], Any]:
""" """
Asynchronously gets trade history using fetch_trades. Asynchronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange. Handles exchange errors, does one call to the exchange.
@@ -2867,7 +2868,7 @@ class Exchange:
""" """
return True return True
def _get_trade_pagination_next_value(self, trades: List[Dict]): def _get_trade_pagination_next_value(self, trades: list[dict]):
""" """
Extract pagination id for the next "from_id" value Extract pagination id for the next "from_id" value
Applies only to fetch_trade_history by id. Applies only to fetch_trade_history by id.
@@ -2881,7 +2882,7 @@ class Exchange:
async def _async_get_trade_history_id( async def _async_get_trade_history_id(
self, pair: str, until: int, since: Optional[int] = None, from_id: Optional[str] = None self, pair: str, until: int, since: Optional[int] = None, from_id: Optional[str] = None
) -> Tuple[str, List[List]]: ) -> tuple[str, list[list]]:
""" """
Asynchronously gets trade history using fetch_trades Asynchronously gets trade history using fetch_trades
use this when exchange uses id-based iteration (check `self._trades_pagination`) use this when exchange uses id-based iteration (check `self._trades_pagination`)
@@ -2892,7 +2893,7 @@ class Exchange:
returns tuple: (pair, trades-list) returns tuple: (pair, trades-list)
""" """
trades: List[List] = [] trades: list[list] = []
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp # DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id # DEFAULT_TRADES_COLUMNS: 1 -> id
has_overlap = self._ft_has.get("trades_pagination_overlap", True) has_overlap = self._ft_has.get("trades_pagination_overlap", True)
@@ -2936,7 +2937,7 @@ class Exchange:
async def _async_get_trade_history_time( async def _async_get_trade_history_time(
self, pair: str, until: int, since: Optional[int] = None self, pair: str, until: int, since: Optional[int] = None
) -> Tuple[str, List[List]]: ) -> tuple[str, list[list]]:
""" """
Asynchronously gets trade history using fetch_trades, Asynchronously gets trade history using fetch_trades,
when the exchange uses time-based iteration (check `self._trades_pagination`) when the exchange uses time-based iteration (check `self._trades_pagination`)
@@ -2946,7 +2947,7 @@ class Exchange:
returns tuple: (pair, trades-list) returns tuple: (pair, trades-list)
""" """
trades: List[List] = [] trades: list[list] = []
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp # DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id # DEFAULT_TRADES_COLUMNS: 1 -> id
while True: while True:
@@ -2979,7 +2980,7 @@ class Exchange:
since: Optional[int] = None, since: Optional[int] = None,
until: Optional[int] = None, until: Optional[int] = None,
from_id: Optional[str] = None, from_id: Optional[str] = None,
) -> Tuple[str, List[List]]: ) -> tuple[str, list[list]]:
""" """
Async wrapper handling downloading trades using either time or id based methods. Async wrapper handling downloading trades using either time or id based methods.
""" """
@@ -3010,7 +3011,7 @@ class Exchange:
since: Optional[int] = None, since: Optional[int] = None,
until: Optional[int] = None, until: Optional[int] = None,
from_id: Optional[str] = None, from_id: Optional[str] = None,
) -> Tuple[str, List]: ) -> tuple[str, list]:
""" """
Get trade history data using asyncio. Get trade history data using asyncio.
Handles all async work and returns the list of candles. Handles all async work and returns the list of candles.
@@ -3070,7 +3071,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@retrier @retrier
def get_leverage_tiers(self) -> Dict[str, List[Dict]]: def get_leverage_tiers(self) -> dict[str, list[dict]]:
try: try:
return self._api.fetch_leverage_tiers() return self._api.fetch_leverage_tiers()
except ccxt.DDoSProtection as e: except ccxt.DDoSProtection as e:
@@ -3083,7 +3084,7 @@ class Exchange:
raise OperationalException(e) from e raise OperationalException(e) from e
@retrier_async @retrier_async
async def get_market_leverage_tiers(self, symbol: str) -> Tuple[str, List[Dict]]: async def get_market_leverage_tiers(self, symbol: str) -> tuple[str, list[dict]]:
"""Leverage tiers per symbol""" """Leverage tiers per symbol"""
try: try:
tier = await self._api_async.fetch_market_leverage_tiers(symbol) tier = await self._api_async.fetch_market_leverage_tiers(symbol)
@@ -3098,7 +3099,7 @@ class Exchange:
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(e) from e raise OperationalException(e) from e
def load_leverage_tiers(self) -> Dict[str, List[Dict]]: def load_leverage_tiers(self) -> dict[str, list[dict]]:
if self.trading_mode == TradingMode.FUTURES: if self.trading_mode == TradingMode.FUTURES:
if self.exchange_has("fetchLeverageTiers"): if self.exchange_has("fetchLeverageTiers"):
# Fetch all leverage tiers at once # Fetch all leverage tiers at once
@@ -3117,7 +3118,7 @@ class Exchange:
) )
] ]
tiers: Dict[str, List[Dict]] = {} tiers: dict[str, list[dict]] = {}
tiers_cached = self.load_cached_leverage_tiers(self._config["stake_currency"]) tiers_cached = self.load_cached_leverage_tiers(self._config["stake_currency"])
if tiers_cached: if tiers_cached:
@@ -3158,7 +3159,7 @@ class Exchange:
return tiers return tiers
return {} return {}
def cache_leverage_tiers(self, tiers: Dict[str, List[Dict]], stake_currency: str) -> None: def cache_leverage_tiers(self, tiers: dict[str, list[dict]], stake_currency: str) -> None:
filename = self._config["datadir"] / "futures" / f"leverage_tiers_{stake_currency}.json" filename = self._config["datadir"] / "futures" / f"leverage_tiers_{stake_currency}.json"
if not filename.parent.is_dir(): if not filename.parent.is_dir():
filename.parent.mkdir(parents=True) filename.parent.mkdir(parents=True)
@@ -3170,7 +3171,7 @@ class Exchange:
def load_cached_leverage_tiers( def load_cached_leverage_tiers(
self, stake_currency: str, cache_time: Optional[timedelta] = None self, stake_currency: str, cache_time: Optional[timedelta] = None
) -> Optional[Dict[str, List[Dict]]]: ) -> Optional[dict[str, list[dict]]]:
""" """
Load cached leverage tiers from disk Load cached leverage tiers from disk
:param cache_time: The maximum age of the cache before it is considered outdated :param cache_time: The maximum age of the cache before it is considered outdated
@@ -3205,7 +3206,7 @@ class Exchange:
pair_tiers.append(self.parse_leverage_tier(tier)) pair_tiers.append(self.parse_leverage_tier(tier))
self._leverage_tiers[pair] = pair_tiers self._leverage_tiers[pair] = pair_tiers
def parse_leverage_tier(self, tier) -> Dict: def parse_leverage_tier(self, tier) -> dict:
info = tier.get("info", {}) info = tier.get("info", {})
return { return {
"minNotional": tier["minNotional"], "minNotional": tier["minNotional"],
@@ -3345,7 +3346,7 @@ class Exchange:
pair: str, pair: str,
margin_mode: MarginMode, margin_mode: MarginMode,
accept_fail: bool = False, accept_fail: bool = False,
params: Optional[Dict] = None, params: Optional[dict] = None,
): ):
""" """
Set's the margin mode on the exchange to cross or isolated for a specific pair Set's the margin mode on the exchange to cross or isolated for a specific pair
@@ -3632,7 +3633,7 @@ class Exchange:
self, self,
pair: str, pair: str,
notional_value: float, notional_value: float,
) -> Tuple[float, Optional[float]]: ) -> tuple[float, Optional[float]]:
""" """
Important: Must be fetching data from cached values as this is used by backtesting! Important: Must be fetching data from cached values as this is used by backtesting!
:param pair: Market symbol :param pair: Market symbol

View File

@@ -1,11 +1,11 @@
from typing import Dict, List, Optional, Tuple, TypedDict from typing import Optional, TypedDict
from freqtrade.enums import CandleType from freqtrade.enums import CandleType
class FtHas(TypedDict, total=False): class FtHas(TypedDict, total=False):
order_time_in_force: List[str] order_time_in_force: list[str]
exchange_has_overrides: Dict[str, bool] exchange_has_overrides: dict[str, bool]
marketOrderRequiresPrice: bool marketOrderRequiresPrice: bool
# Stoploss on exchange # Stoploss on exchange
@@ -13,16 +13,16 @@ class FtHas(TypedDict, total=False):
stop_price_param: str stop_price_param: str
stop_price_prop: str stop_price_prop: str
stop_price_type_field: str stop_price_type_field: str
stop_price_type_value_mapping: Dict stop_price_type_value_mapping: dict
stoploss_order_types: Dict[str, str] stoploss_order_types: dict[str, str]
# ohlcv # ohlcv
ohlcv_params: Dict ohlcv_params: dict
ohlcv_candle_limit: int ohlcv_candle_limit: int
ohlcv_has_history: bool ohlcv_has_history: bool
ohlcv_partial_candle: bool ohlcv_partial_candle: bool
ohlcv_require_since: bool ohlcv_require_since: bool
ohlcv_volume_currency: str ohlcv_volume_currency: str
ohlcv_candle_limit_per_timeframe: Dict[str, int] ohlcv_candle_limit_per_timeframe: dict[str, int]
# Tickers # Tickers
tickers_have_quoteVolume: bool tickers_have_quoteVolume: bool
tickers_have_percentage: bool tickers_have_percentage: bool
@@ -35,7 +35,7 @@ class FtHas(TypedDict, total=False):
trades_has_history: bool trades_has_history: bool
trades_pagination_overlap: bool trades_pagination_overlap: bool
# Orderbook # Orderbook
l2_limit_range: Optional[List[int]] l2_limit_range: Optional[list[int]]
l2_limit_range_required: bool l2_limit_range_required: bool
# Futures # Futures
ccxt_futures_name: str # usually swap ccxt_futures_name: str # usually swap
@@ -44,7 +44,7 @@ class FtHas(TypedDict, total=False):
funding_fee_timeframe: str funding_fee_timeframe: str
floor_leverage: bool floor_leverage: bool
needs_trading_fees: bool needs_trading_fees: bool
order_props_in_contracts: List[str] order_props_in_contracts: list[str]
# Websocket control # Websocket control
ws_enabled: bool ws_enabled: bool
@@ -63,13 +63,13 @@ class Ticker(TypedDict):
# Several more - only listing required. # Several more - only listing required.
Tickers = Dict[str, Ticker] Tickers = dict[str, Ticker]
class OrderBook(TypedDict): class OrderBook(TypedDict):
symbol: str symbol: str
bids: List[Tuple[float, float]] bids: list[tuple[float, float]]
asks: List[Tuple[float, float]] asks: list[tuple[float, float]]
timestamp: Optional[int] timestamp: Optional[int]
datetime: Optional[str] datetime: Optional[str]
nonce: Optional[int] nonce: Optional[int]
@@ -81,7 +81,7 @@ class CcxtBalance(TypedDict):
total: float total: float
CcxtBalances = Dict[str, CcxtBalance] CcxtBalances = dict[str, CcxtBalance]
class CcxtPosition(TypedDict): class CcxtPosition(TypedDict):
@@ -95,4 +95,4 @@ class CcxtPosition(TypedDict):
# pair, timeframe, candleType, OHLCV, drop last?, # pair, timeframe, candleType, OHLCV, drop last?,
OHLCVResponse = Tuple[str, str, CandleType, List, bool] OHLCVResponse = tuple[str, str, CandleType, list, bool]

View File

@@ -5,7 +5,7 @@ Exchange support utils
import inspect import inspect
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from math import ceil, floor from math import ceil, floor
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Optional
import ccxt import ccxt
from ccxt import ( from ccxt import (
@@ -39,14 +39,14 @@ def is_exchange_known_ccxt(
return exchange_name in ccxt_exchanges(ccxt_module) return exchange_name in ccxt_exchanges(ccxt_module)
def ccxt_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[str]: def ccxt_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> list[str]:
""" """
Return the list of all exchanges known to ccxt Return the list of all exchanges known to ccxt
""" """
return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges
def available_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[str]: def available_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> list[str]:
""" """
Return exchanges available to the bot, i.e. non-bad exchanges in the ccxt list Return exchanges available to the bot, i.e. non-bad exchanges in the ccxt list
""" """
@@ -54,7 +54,7 @@ def available_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[st
return [x for x in exchanges if validate_exchange(x)[0]] return [x for x in exchanges if validate_exchange(x)[0]]
def validate_exchange(exchange: str) -> Tuple[bool, str, Optional[ccxt.Exchange]]: def validate_exchange(exchange: str) -> tuple[bool, str, Optional[ccxt.Exchange]]:
""" """
returns: can_use, reason, exchange_object returns: can_use, reason, exchange_object
with Reason including both missing and missing_opt with Reason including both missing and missing_opt
@@ -91,7 +91,7 @@ def validate_exchange(exchange: str) -> Tuple[bool, str, Optional[ccxt.Exchange]
def _build_exchange_list_entry( def _build_exchange_list_entry(
exchange_name: str, exchangeClasses: Dict[str, Any] exchange_name: str, exchangeClasses: dict[str, Any]
) -> ValidExchangesType: ) -> ValidExchangesType:
valid, comment, ex_mod = validate_exchange(exchange_name) valid, comment, ex_mod = validate_exchange(exchange_name)
result: ValidExchangesType = { result: ValidExchangesType = {
@@ -121,7 +121,7 @@ def _build_exchange_list_entry(
return result return result
def list_available_exchanges(all_exchanges: bool) -> List[ValidExchangesType]: def list_available_exchanges(all_exchanges: bool) -> list[ValidExchangesType]:
""" """
:return: List of tuples with exchangename, valid, reason. :return: List of tuples with exchangename, valid, reason.
""" """
@@ -130,7 +130,7 @@ def list_available_exchanges(all_exchanges: bool) -> List[ValidExchangesType]:
subclassed = {e["name"].lower(): e for e in ExchangeResolver.search_all_objects({}, False)} subclassed = {e["name"].lower(): e for e in ExchangeResolver.search_all_objects({}, False)}
exchanges_valid: List[ValidExchangesType] = [ exchanges_valid: list[ValidExchangesType] = [
_build_exchange_list_entry(e, subclassed) for e in exchanges _build_exchange_list_entry(e, subclassed) for e in exchanges
] ]
@@ -155,7 +155,7 @@ def date_minus_candles(
return new_date return new_date
def market_is_active(market: Dict) -> bool: def market_is_active(market: dict) -> bool:
""" """
Return True if the market is active. Return True if the market is active.
""" """

View File

@@ -4,7 +4,6 @@ import time
from copy import deepcopy from copy import deepcopy
from functools import partial from functools import partial
from threading import Thread from threading import Thread
from typing import Dict, Set
import ccxt import ccxt
@@ -22,12 +21,12 @@ class ExchangeWS:
def __init__(self, config: Config, ccxt_object: ccxt.Exchange) -> None: def __init__(self, config: Config, ccxt_object: ccxt.Exchange) -> None:
self.config = config self.config = config
self.ccxt_object = ccxt_object self.ccxt_object = ccxt_object
self._background_tasks: Set[asyncio.Task] = set() self._background_tasks: set[asyncio.Task] = set()
self._klines_watching: Set[PairWithTimeframe] = set() self._klines_watching: set[PairWithTimeframe] = set()
self._klines_scheduled: Set[PairWithTimeframe] = set() self._klines_scheduled: set[PairWithTimeframe] = set()
self.klines_last_refresh: Dict[PairWithTimeframe, float] = {} self.klines_last_refresh: dict[PairWithTimeframe, float] = {}
self.klines_last_request: Dict[PairWithTimeframe, float] = {} self.klines_last_request: dict[PairWithTimeframe, float] = {}
self._thread = Thread(name="ccxt_ws", target=self._start_forever) self._thread = Thread(name="ccxt_ws", target=self._start_forever)
self._thread.start() self._thread.start()
self.__cleanup_called = False self.__cleanup_called = False

View File

@@ -2,7 +2,7 @@
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Optional
from freqtrade.constants import BuySell from freqtrade.constants import BuySell
from freqtrade.enums import MarginMode, PriceType, TradingMode from freqtrade.enums import MarginMode, PriceType, TradingMode
@@ -46,7 +46,7 @@ class Gate(Exchange):
}, },
} }
_supported_trading_mode_margin_pairs: List[Tuple[TradingMode, MarginMode]] = [ _supported_trading_mode_margin_pairs: list[tuple[TradingMode, MarginMode]] = [
# TradingMode.SPOT always supported and not required in this list # TradingMode.SPOT always supported and not required in this list
# (TradingMode.MARGIN, MarginMode.CROSS), # (TradingMode.MARGIN, MarginMode.CROSS),
# (TradingMode.FUTURES, MarginMode.CROSS), # (TradingMode.FUTURES, MarginMode.CROSS),
@@ -60,7 +60,7 @@ class Gate(Exchange):
leverage: float, leverage: float,
reduceOnly: bool, reduceOnly: bool,
time_in_force: str = "GTC", time_in_force: str = "GTC",
) -> Dict: ) -> dict:
params = super()._get_params( params = super()._get_params(
side=side, side=side,
ordertype=ordertype, ordertype=ordertype,
@@ -74,8 +74,8 @@ class Gate(Exchange):
return params return params
def get_trades_for_order( def get_trades_for_order(
self, order_id: str, pair: str, since: datetime, params: Optional[Dict] = None self, order_id: str, pair: str, since: datetime, params: Optional[dict] = None
) -> List: ) -> list:
trades = super().get_trades_for_order(order_id, pair, since, params) trades = super().get_trades_for_order(order_id, pair, since, params)
if self.trading_mode == TradingMode.FUTURES: if self.trading_mode == TradingMode.FUTURES:
@@ -99,10 +99,10 @@ class Gate(Exchange):
} }
return trades return trades
def get_order_id_conditional(self, order: Dict[str, Any]) -> str: def get_order_id_conditional(self, order: dict[str, Any]) -> str:
return safe_value_fallback2(order, order, "id_stop", "id") return safe_value_fallback2(order, order, "id_stop", "id")
def fetch_stoploss_order(self, order_id: str, pair: str, params: Optional[Dict] = None) -> Dict: def fetch_stoploss_order(self, order_id: str, pair: str, params: Optional[dict] = None) -> dict:
order = self.fetch_order(order_id=order_id, pair=pair, params={"stop": True}) order = self.fetch_order(order_id=order_id, pair=pair, params={"stop": True})
if order.get("status", "open") == "closed": if order.get("status", "open") == "closed":
# Places a real order - which we need to fetch explicitly. # Places a real order - which we need to fetch explicitly.
@@ -120,6 +120,6 @@ class Gate(Exchange):
return order return order
def cancel_stoploss_order( def cancel_stoploss_order(
self, order_id: str, pair: str, params: Optional[Dict] = None self, order_id: str, pair: str, params: Optional[dict] = None
) -> Dict: ) -> dict:
return self.cancel_order(order_id=order_id, pair=pair, params={"stop": True}) return self.cancel_order(order_id=order_id, pair=pair, params={"stop": True})

View File

@@ -1,7 +1,6 @@
"""HTX exchange subclass""" """HTX exchange subclass"""
import logging import logging
from typing import Dict
from freqtrade.constants import BuySell from freqtrade.constants import BuySell
from freqtrade.exchange import Exchange from freqtrade.exchange import Exchange
@@ -32,7 +31,7 @@ class Htx(Exchange):
"trades_has_history": False, # Endpoint doesn't have a "since" parameter "trades_has_history": False, # Endpoint doesn't have a "since" parameter
} }
def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> Dict: def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> dict:
params = self._params.copy() params = self._params.copy()
params.update( params.update(
{ {

View File

@@ -1,7 +1,6 @@
"""Hyperliquid exchange subclass""" """Hyperliquid exchange subclass"""
import logging import logging
from typing import Dict
from freqtrade.enums import TradingMode from freqtrade.enums import TradingMode
from freqtrade.exchange import Exchange from freqtrade.exchange import Exchange
@@ -26,7 +25,7 @@ class Hyperliquid(Exchange):
} }
@property @property
def _ccxt_config(self) -> Dict: def _ccxt_config(self) -> dict:
# Parameters to add directly to ccxt sync/async initialization. # Parameters to add directly to ccxt sync/async initialization.
# ccxt defaults to swap mode. # ccxt defaults to swap mode.
config = {} config = {}

View File

@@ -2,7 +2,7 @@
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Optional
import ccxt import ccxt
from pandas import DataFrame from pandas import DataFrame
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
class Kraken(Exchange): class Kraken(Exchange):
_params: Dict = {"trading_agreement": "agree"} _params: dict = {"trading_agreement": "agree"}
_ft_has: FtHas = { _ft_has: FtHas = {
"stoploss_on_exchange": True, "stoploss_on_exchange": True,
"stop_price_param": "stopLossPrice", "stop_price_param": "stopLossPrice",
@@ -35,13 +35,13 @@ class Kraken(Exchange):
"mark_ohlcv_timeframe": "4h", "mark_ohlcv_timeframe": "4h",
} }
_supported_trading_mode_margin_pairs: List[Tuple[TradingMode, MarginMode]] = [ _supported_trading_mode_margin_pairs: list[tuple[TradingMode, MarginMode]] = [
# TradingMode.SPOT always supported and not required in this list # TradingMode.SPOT always supported and not required in this list
# (TradingMode.MARGIN, MarginMode.CROSS), # (TradingMode.MARGIN, MarginMode.CROSS),
# (TradingMode.FUTURES, MarginMode.CROSS) # (TradingMode.FUTURES, MarginMode.CROSS)
] ]
def market_is_tradable(self, market: Dict[str, Any]) -> bool: def market_is_tradable(self, market: dict[str, Any]) -> bool:
""" """
Check if the market symbol is tradable by Freqtrade. Check if the market symbol is tradable by Freqtrade.
Default checks + check if pair is darkpool pair. Default checks + check if pair is darkpool pair.
@@ -50,7 +50,7 @@ class Kraken(Exchange):
return parent_check and market.get("darkpool", False) is False return parent_check and market.get("darkpool", False) is False
def get_tickers(self, symbols: Optional[List[str]] = None, cached: bool = False) -> Tickers: def get_tickers(self, symbols: Optional[list[str]] = None, cached: bool = False) -> Tickers:
# Only fetch tickers for current stake currency # Only fetch tickers for current stake currency
# Otherwise the request for kraken becomes too large. # Otherwise the request for kraken becomes too large.
symbols = list(self.get_markets(quote_currencies=[self._config["stake_currency"]])) symbols = list(self.get_markets(quote_currencies=[self._config["stake_currency"]]))
@@ -115,7 +115,7 @@ class Kraken(Exchange):
leverage: float, leverage: float,
reduceOnly: bool, reduceOnly: bool,
time_in_force: str = "GTC", time_in_force: str = "GTC",
) -> Dict: ) -> dict:
params = super()._get_params( params = super()._get_params(
side=side, side=side,
ordertype=ordertype, ordertype=ordertype,
@@ -165,7 +165,7 @@ class Kraken(Exchange):
return fees if is_short else -fees return fees if is_short else -fees
def _get_trade_pagination_next_value(self, trades: List[Dict]): def _get_trade_pagination_next_value(self, trades: list[dict]):
""" """
Extract pagination id for the next "from_id" value Extract pagination id for the next "from_id" value
Applies only to fetch_trade_history by id. Applies only to fetch_trade_history by id.

View File

@@ -1,7 +1,6 @@
"""Kucoin exchange subclass.""" """Kucoin exchange subclass."""
import logging import logging
from typing import Dict
from freqtrade.constants import BuySell from freqtrade.constants import BuySell
from freqtrade.exchange import Exchange from freqtrade.exchange import Exchange
@@ -32,7 +31,7 @@ class Kucoin(Exchange):
"ohlcv_candle_limit": 1500, "ohlcv_candle_limit": 1500,
} }
def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> Dict: def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> dict:
params = self._params.copy() params = self._params.copy()
params.update({"stopPrice": stop_price, "stop": "loss"}) params.update({"stopPrice": stop_price, "stop": "loss"})
return params return params
@@ -48,7 +47,7 @@ class Kucoin(Exchange):
leverage: float, leverage: float,
reduceOnly: bool = False, reduceOnly: bool = False,
time_in_force: str = "GTC", time_in_force: str = "GTC",
) -> Dict: ) -> dict:
res = super().create_order( res = super().create_order(
pair=pair, pair=pair,
ordertype=ordertype, ordertype=ordertype,

View File

@@ -1,6 +1,6 @@
import logging import logging
from datetime import timedelta from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Optional
import ccxt import ccxt
@@ -48,7 +48,7 @@ class Okx(Exchange):
"ws_enabled": True, "ws_enabled": True,
} }
_supported_trading_mode_margin_pairs: List[Tuple[TradingMode, MarginMode]] = [ _supported_trading_mode_margin_pairs: list[tuple[TradingMode, MarginMode]] = [
# TradingMode.SPOT always supported and not required in this list # TradingMode.SPOT always supported and not required in this list
# (TradingMode.MARGIN, MarginMode.CROSS), # (TradingMode.MARGIN, MarginMode.CROSS),
# (TradingMode.FUTURES, MarginMode.CROSS), # (TradingMode.FUTURES, MarginMode.CROSS),
@@ -57,7 +57,7 @@ class Okx(Exchange):
net_only = True net_only = True
_ccxt_params: Dict = {"options": {"brokerId": "ffb5405ad327SUDE"}} _ccxt_params: dict = {"options": {"brokerId": "ffb5405ad327SUDE"}}
def ohlcv_candle_limit( def ohlcv_candle_limit(
self, timeframe: str, candle_type: CandleType, since_ms: Optional[int] = None self, timeframe: str, candle_type: CandleType, since_ms: Optional[int] = None
@@ -119,7 +119,7 @@ class Okx(Exchange):
leverage: float, leverage: float,
reduceOnly: bool, reduceOnly: bool,
time_in_force: str = "GTC", time_in_force: str = "GTC",
) -> Dict: ) -> dict:
params = super()._get_params( params = super()._get_params(
side=side, side=side,
ordertype=ordertype, ordertype=ordertype,
@@ -184,14 +184,14 @@ class Okx(Exchange):
pair_tiers = self._leverage_tiers[pair] pair_tiers = self._leverage_tiers[pair]
return pair_tiers[-1]["maxNotional"] / leverage return pair_tiers[-1]["maxNotional"] / leverage
def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> Dict: def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> dict:
params = super()._get_stop_params(side, ordertype, stop_price) params = super()._get_stop_params(side, ordertype, stop_price)
if self.trading_mode == TradingMode.FUTURES and self.margin_mode: if self.trading_mode == TradingMode.FUTURES and self.margin_mode:
params["tdMode"] = self.margin_mode.value params["tdMode"] = self.margin_mode.value
params["posSide"] = self._get_posSide(side, True) params["posSide"] = self._get_posSide(side, True)
return params return params
def _convert_stop_order(self, pair: str, order_id: str, order: Dict) -> Dict: def _convert_stop_order(self, pair: str, order_id: str, order: dict) -> dict:
if ( if (
order.get("status", "open") == "closed" order.get("status", "open") == "closed"
and (real_order_id := order.get("info", {}).get("ordId")) is not None and (real_order_id := order.get("info", {}).get("ordId")) is not None
@@ -209,7 +209,7 @@ class Okx(Exchange):
return order return order
@retrier(retries=API_RETRY_COUNT) @retrier(retries=API_RETRY_COUNT)
def fetch_stoploss_order(self, order_id: str, pair: str, params: Optional[Dict] = None) -> Dict: def fetch_stoploss_order(self, order_id: str, pair: str, params: Optional[dict] = None) -> dict:
if self._config["dry_run"]: if self._config["dry_run"]:
return self.fetch_dry_run_order(order_id) return self.fetch_dry_run_order(order_id)
@@ -231,7 +231,7 @@ class Okx(Exchange):
return self._fetch_stop_order_fallback(order_id, pair) return self._fetch_stop_order_fallback(order_id, pair)
def _fetch_stop_order_fallback(self, order_id: str, pair: str) -> Dict: def _fetch_stop_order_fallback(self, order_id: str, pair: str) -> dict:
params2 = {"stop": True, "ordType": "conditional"} params2 = {"stop": True, "ordType": "conditional"}
for method in ( for method in (
self._api.fetch_open_orders, self._api.fetch_open_orders,
@@ -256,14 +256,14 @@ class Okx(Exchange):
raise OperationalException(e) from e raise OperationalException(e) from e
raise RetryableOrderError(f"StoplossOrder not found (pair: {pair} id: {order_id}).") raise RetryableOrderError(f"StoplossOrder not found (pair: {pair} id: {order_id}).")
def get_order_id_conditional(self, order: Dict[str, Any]) -> str: def get_order_id_conditional(self, order: dict[str, Any]) -> str:
if order.get("type", "") == "stop": if order.get("type", "") == "stop":
return safe_value_fallback2(order, order, "id_stop", "id") return safe_value_fallback2(order, order, "id_stop", "id")
return order["id"] return order["id"]
def cancel_stoploss_order( def cancel_stoploss_order(
self, order_id: str, pair: str, params: Optional[Dict] = None self, order_id: str, pair: str, params: Optional[dict] = None
) -> Dict: ) -> dict:
params1 = {"stop": True} params1 = {"stop": True}
# 'ordType': 'conditional' # 'ordType': 'conditional'
# #
@@ -273,7 +273,7 @@ class Okx(Exchange):
params=params1, params=params1,
) )
def _fetch_orders_emulate(self, pair: str, since_ms: int) -> List[Dict]: def _fetch_orders_emulate(self, pair: str, since_ms: int) -> list[dict]:
orders = [] orders = []
orders = self._api.fetch_closed_orders(pair, since=since_ms) orders = self._api.fetch_closed_orders(pair, since=since_ms)