Merge remote-tracking branch 'upstream/develop' into feature/10348

This commit is contained in:
jainanuj94
2024-07-28 22:26:58 +05:30
81 changed files with 7613 additions and 2010 deletions

View File

@@ -22,6 +22,7 @@ from pandas import DataFrame, concat
from freqtrade.constants import (
DEFAULT_AMOUNT_RESERVE_PERCENT,
DEFAULT_TRADES_COLUMNS,
NON_OPEN_EXCHANGE_STATES,
BidAsk,
BuySell,
@@ -33,7 +34,13 @@ from freqtrade.constants import (
OBLiteral,
PairWithTimeframe,
)
from freqtrade.data.converter import clean_ohlcv_dataframe, ohlcv_to_dataframe, trades_dict_to_list
from freqtrade.data.converter import (
clean_ohlcv_dataframe,
ohlcv_to_dataframe,
trades_df_remove_duplicates,
trades_dict_to_list,
trades_list_to_df,
)
from freqtrade.enums import (
OPTIMIZE_MODES,
TRADE_MODES,
@@ -124,6 +131,7 @@ class Exchange:
"tickers_have_percentage": True,
"tickers_have_bid_ask": True, # bid / ask empty for fetch_tickers
"tickers_have_price": True,
"trades_limit": 1000, # Limit for 1 call to fetch_trades
"trades_pagination": "time", # Possible are "time" or "id"
"trades_pagination_arg": "since",
"trades_has_history": False,
@@ -195,6 +203,9 @@ class Exchange:
self._klines: Dict[PairWithTimeframe, DataFrame] = {}
self._expiring_candle_cache: Dict[Tuple[str, int], PeriodicCache] = {}
# Holds public_trades
self._trades: Dict[PairWithTimeframe, DataFrame] = {}
# Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {}
@@ -223,6 +234,8 @@ class Exchange:
# Assign this directly for easy access
self._ohlcv_partial_candle = self._ft_has["ohlcv_partial_candle"]
self._max_trades_limit = self._ft_has["trades_limit"]
self._trades_pagination = self._ft_has["trades_pagination"]
self._trades_pagination_arg = self._ft_has["trades_pagination_arg"]
@@ -316,6 +329,7 @@ class Exchange:
self.validate_trading_mode_and_margin_mode(self.trading_mode, self.margin_mode)
self.validate_pricing(config["exit_pricing"])
self.validate_pricing(config["entry_pricing"])
self.validate_orderflow(config["exchange"])
def _init_ccxt(
self, exchange_config: Dict[str, Any], sync: bool, ccxt_kwargs: Dict[str, Any]
@@ -339,10 +353,14 @@ class Exchange:
raise OperationalException(f"Exchange {name} is not supported by ccxt")
ex_config = {
"apiKey": exchange_config.get("key"),
"apiKey": exchange_config.get("apiKey", exchange_config.get("key")),
"secret": exchange_config.get("secret"),
"password": exchange_config.get("password"),
"uid": exchange_config.get("uid", ""),
"accountId": exchange_config.get("accountId", ""),
# DEX attributes:
"walletAddress": exchange_config.get("walletAddress"),
"privateKey": exchange_config.get("privateKey"),
}
if ccxt_kwargs:
logger.info("Applying additional ccxt config: %s", ccxt_kwargs)
@@ -517,6 +535,15 @@ class Exchange:
else:
return DataFrame()
def trades(self, pair_interval: PairWithTimeframe, copy: bool = True) -> DataFrame:
if pair_interval in self._trades:
if copy:
return self._trades[pair_interval].copy()
else:
return self._trades[pair_interval]
else:
return DataFrame()
def get_contract_size(self, pair: str) -> Optional[float]:
if self.trading_mode == TradingMode.FUTURES:
market = self.markets.get(pair, {})
@@ -770,6 +797,14 @@ class Exchange:
f"Time in force policies are not supported for {self.name} yet."
)
def validate_orderflow(self, exchange: Dict) -> None:
if exchange.get("use_public_trades", False) and (
not self.exchange_has("fetchTrades") or not self._ft_has["trades_has_history"]
):
raise ConfigurationError(
f"Trade data not available for {self.name}. Can't use orderflow feature."
)
def validate_required_startup_candles(self, startup_candles: int, timeframe: str) -> int:
"""
Checks if required startup_candles is more than ohlcv_candle_limit().
@@ -2597,6 +2632,171 @@ class Exchange:
data = [[x["timestamp"], x["fundingRate"], 0, 0, 0, 0] for x in data]
return data
# fetch Trade data stuff
def needed_candle_for_trades_ms(self, timeframe: str, candle_type: CandleType) -> int:
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
tf_s = timeframe_to_seconds(timeframe)
candles_fetched = candle_limit * self.required_candle_call_count
max_candles = self._config["orderflow"]["max_candles"]
required_candles = min(max_candles, candles_fetched)
move_to = (
tf_s * candle_limit * required_candles
if required_candles > candle_limit
else (max_candles + 1) * tf_s
)
now = timeframe_to_next_date(timeframe)
return int((now - timedelta(seconds=move_to)).timestamp() * 1000)
def _process_trades_df(
self,
pair: str,
timeframe: str,
c_type: CandleType,
ticks: List[List],
cache: bool,
first_required_candle_date: int,
) -> DataFrame:
# keeping parsed dataframe in cache
trades_df = trades_list_to_df(ticks, True)
if cache:
if (pair, timeframe, c_type) in self._trades:
old = self._trades[(pair, timeframe, c_type)]
# Reassign so we return the updated, combined df
combined_df = concat([old, trades_df], axis=0)
logger.debug(f"Clean duplicated ticks from Trades data {pair}")
trades_df = DataFrame(
trades_df_remove_duplicates(combined_df), columns=combined_df.columns
)
# Age out old candles
trades_df = trades_df[first_required_candle_date < trades_df["timestamp"]]
trades_df = trades_df.reset_index(drop=True)
self._trades[(pair, timeframe, c_type)] = trades_df
return trades_df
def refresh_latest_trades(
self,
pair_list: ListPairsWithTimeframes,
*,
cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]:
"""
Refresh in-memory TRADES asynchronously and set `_trades` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
Only used in the dataprovider.refresh() method.
:param pair_list: List of 3 element tuples containing (pair, timeframe, candle_type)
:param cache: Assign result to _trades. Useful for one-off downloads like for pairlists
:return: Dict of [{(pair, timeframe): Dataframe}]
"""
from freqtrade.data.history import get_datahandler
data_handler = get_datahandler(
self._config["datadir"], data_format=self._config["dataformat_trades"]
)
logger.debug("Refreshing TRADES data for %d pairs", len(pair_list))
since_ms = None
results_df = {}
for pair, timeframe, candle_type in set(pair_list):
new_ticks: List = []
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"])
first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type)
# refresh, if
# a. not in _trades
# b. no cache used
# c. need new data
is_in_cache = (pair, timeframe, candle_type) in self._trades
if (
not is_in_cache
or not cache
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)
):
logger.debug(f"Refreshing TRADES data for {pair}")
# fetch trades since latest _trades and
# store together with existing trades
try:
until = None
from_id = None
if is_in_cache:
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"]
until = dt_ts() # now
else:
until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000
all_stored_ticks_df = data_handler.trades_load(
f"{pair}-cached", self.trading_mode
)
if not all_stored_ticks_df.empty:
if (
all_stored_ticks_df.iloc[-1]["timestamp"] > first_candle_ms
and all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms
):
# Use cache and populate further
last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"]
from_id = all_stored_ticks_df.iloc[-1]["id"]
# only use cached if it's closer than first_candle_ms
since_ms = (
last_cached_ms
if last_cached_ms > first_candle_ms
else first_candle_ms
)
else:
# Skip cache, it's too old
all_stored_ticks_df = DataFrame(
columns=DEFAULT_TRADES_COLUMNS + ["date"]
)
# from_id overrules with exchange set to id paginate
[_, new_ticks] = self.get_historic_trades(
pair,
since=since_ms if since_ms else first_candle_ms,
until=until,
from_id=from_id,
)
except Exception:
logger.exception(f"Refreshing TRADES data for {pair} failed")
continue
if new_ticks:
all_stored_ticks_list = all_stored_ticks_df[
DEFAULT_TRADES_COLUMNS
].values.tolist()
all_stored_ticks_list.extend(new_ticks)
trades_df = self._process_trades_df(
pair,
timeframe,
candle_type,
all_stored_ticks_list,
cache,
first_required_candle_date=first_candle_ms,
)
results_df[(pair, timeframe, candle_type)] = trades_df
data_handler.trades_store(
f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode
)
else:
logger.error(f"No new ticks for {pair}")
return results_df
def _now_is_time_to_refresh_trades(
self, pair: str, timeframe: str, candle_type: CandleType
) -> bool: # Timeframe in seconds
trades = self.trades((pair, timeframe, candle_type), False)
pair_last_refreshed = int(trades.iloc[-1]["timestamp"])
full_candle = (
int(timeframe_to_next_date(timeframe, dt_from_ts(pair_last_refreshed)).timestamp())
* 1000
)
now = dt_ts()
return full_candle <= now
# Fetch historic trades
@retrier_async
@@ -2611,10 +2811,11 @@ class Exchange:
returns: List of dicts containing trades, the next iteration value (new "since" or trade_id)
"""
try:
trades_limit = self._max_trades_limit
# fetch trades asynchronously
if params:
logger.debug("Fetching trades for pair %s, params: %s ", pair, params)
trades = await self._api_async.fetch_trades(pair, params=params, limit=1000)
trades = await self._api_async.fetch_trades(pair, params=params, limit=trades_limit)
else:
logger.debug(
"Fetching trades for pair %s, since %s %s...",
@@ -2622,7 +2823,7 @@ class Exchange:
since,
"(" + dt_from_ts(since).isoformat() + ") " if since is not None else "",
)
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
trades = await self._api_async.fetch_trades(pair, since=since, limit=trades_limit)
trades = self._trades_contracts_to_amount(trades)
pagination_value = self._get_trade_pagination_next_value(trades)
return trades_dict_to_list(trades), pagination_value
@@ -3417,13 +3618,12 @@ class Exchange:
def get_maintenance_ratio_and_amt(
self,
pair: str,
nominal_value: float,
notional_value: float,
) -> Tuple[float, Optional[float]]:
"""
Important: Must be fetching data from cached values as this is used by backtesting!
:param pair: Market symbol
:param nominal_value: The total trade amount in quote currency including leverage
maintenance amount only on Binance
:param notional_value: The total trade amount in quote currency
:return: (maintenance margin ratio, maintenance amount)
"""
@@ -3440,7 +3640,7 @@ class Exchange:
pair_tiers = self._leverage_tiers[pair]
for tier in reversed(pair_tiers):
if nominal_value >= tier["minNotional"]:
if notional_value >= tier["minNotional"]:
return (tier["maintenanceMarginRate"], tier["maintAmt"])
raise ExchangeError("nominal value can not be lower than 0")
@@ -3448,4 +3648,3 @@ class Exchange:
# describes the min amt for a tier, and the lowest tier will always go down to 0
else:
raise ExchangeError(f"Cannot get maintenance ratio using {self.name}")
raise ExchangeError(f"Cannot get maintenance ratio using {self.name}")