Merge remote-tracking branch 'upstream/develop' into feature/fetch-public-trades

This commit is contained in:
Joe Schr
2024-02-01 11:46:38 +01:00
145 changed files with 2773 additions and 1641 deletions

View File

@@ -81,6 +81,7 @@ class Exchange:
"l2_limit_range_required": True, # Allow Empty L2 limit (kucoin)
"mark_ohlcv_price": "mark",
"mark_ohlcv_timeframe": "8h",
"funding_fee_timeframe": "8h",
"ccxt_futures_name": "swap",
"needs_trading_fees": False, # use fetch_trading_fees to cache fees
"order_props_in_contracts": ['amount', 'filled', 'remaining'],
@@ -123,11 +124,12 @@ class Exchange:
# Cache for 10 minutes ...
self._cache_lock = Lock()
self._fetch_tickers_cache: TTLCache = TTLCache(maxsize=2, ttl=60 * 10)
# Cache values for 1800 to avoid frequent polling of the exchange for prices
# Cache values for 300 to avoid frequent polling of the exchange for prices
# Caching only applies to RPC methods, so prices for open trades are still
# refreshed once every iteration.
self._exit_rate_cache: TTLCache = TTLCache(maxsize=100, ttl=1800)
self._entry_rate_cache: TTLCache = TTLCache(maxsize=100, ttl=1800)
# Shouldn't be too high either, as it'll freeze UI updates in case of open orders.
self._exit_rate_cache: TTLCache = TTLCache(maxsize=100, ttl=300)
self._entry_rate_cache: TTLCache = TTLCache(maxsize=100, ttl=300)
# Holds candles
self._klines: Dict[PairWithTimeframe, DataFrame] = {}
@@ -328,10 +330,11 @@ class Exchange:
"""
pass
def _log_exchange_response(self, endpoint, response) -> None:
def _log_exchange_response(self, endpoint: str, response, *, add_info=None) -> None:
""" Log exchange responses """
if self.log_responses:
logger.info(f"API {endpoint}: {response}")
add_info_str = "" if add_info is None else f" {add_info}: "
logger.info(f"API {endpoint}: {add_info_str}{response}")
def ohlcv_candle_limit(
self, timeframe: str, candle_type: CandleType, since_ms: Optional[int] = None) -> int:
@@ -339,6 +342,7 @@ class Exchange:
Exchange ohlcv candle limit
Uses ohlcv_candle_limit_per_timeframe if the exchange has different limits
per timeframe (e.g. bittrex), otherwise falls back to ohlcv_candle_limit
TODO: this is most likely no longer needed since only bittrex needed this.
:param timeframe: Timeframe to check
:param candle_type: Candle-type
:param since_ms: Starting timestamp
@@ -1418,7 +1422,7 @@ class Exchange:
order = self.fetch_stoploss_order(order_id, pair)
except InvalidOrderException:
logger.warning(f"Could not fetch cancelled stoploss order {order_id}.")
order = {'fee': {}, 'status': 'canceled', 'amount': amount, 'info': {}}
order = {'id': order_id, 'fee': {}, 'status': 'canceled', 'amount': amount, 'info': {}}
return order
@@ -2564,13 +2568,13 @@ class Exchange:
@retrier_async
async def _async_fetch_trades(self, pair: str,
since: Optional[int] = None,
params: Optional[dict] = None) -> List[List]:
params: Optional[dict] = None) -> Tuple[List[List], Any]:
"""
Asyncronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange.
:param pair: Pair to fetch trade data for
:param since: Since as integer timestamp in milliseconds
returns: List of dicts containing trades
returns: List of dicts containing trades, the next iteration value (new "since" or trade_id)
"""
try:
candle_limit = self.trades_candle_limit("1m", candle_type=CandleType.FUTURES, since_ms=since)
@@ -2586,10 +2590,8 @@ class Exchange:
)
trades = await self._api_async.fetch_trades(pair, since=since, limit=candle_limit)
trades = self._trades_contracts_to_amount(trades)
if trades:
logger.debug("Fetched trades for pair %s, datetime: %s (%d).", pair, trades[0]['datetime'], trades[0]['timestamp'] )
return trades_dict_to_list(trades)
pagination_value = self._get_trade_pagination_next_value(trades)
return trades_dict_to_list(trades), pagination_value
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical trade data.'
@@ -2602,6 +2604,25 @@ class Exchange:
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e
def _valid_trade_pagination_id(self, pair: str, from_id: str) -> bool:
"""
Verify trade-pagination id is valid.
Workaround for odd Kraken issue where ID is sometimes wrong.
"""
return True
def _get_trade_pagination_next_value(self, trades: List[Dict]):
"""
Extract pagination id for the next "from_id" value
Applies only to fetch_trade_history by id.
"""
if not trades:
return None
if self._trades_pagination == 'id':
return trades[-1].get('id')
else:
return trades[-1].get('timestamp')
async def _async_get_trade_history_id(self, pair: str,
until: int,
since: Optional[int] = None,
@@ -2618,39 +2639,37 @@ class Exchange:
"""
trades: List[List] = []
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
has_overlap = self._ft_has.get('trades_pagination_overlap', True)
# Skip last trade by default since its the key for the next call
x = slice(None, -1) if has_overlap else slice(None)
if not until and not stop_on_from_id:
raise "stop_on_from_id must be set if until is not set"
if not from_id:
if not from_id or not self._valid_trade_pagination_id(pair, from_id):
# Fetch first elements using timebased method to get an ID to paginate on
# Depending on the Exchange, this can introduce a drift at the start of the interval
# of up to an hour.
# e.g. Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first trades.
trade = await self._async_fetch_trades(pair, since=since)
if trade:
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
from_id = trade[-1][1]
trades.extend(trade[:-1])
else:
return (pair, trades)
t, from_id = await self._async_fetch_trades(pair, since=since)
trades.extend(t[x])
while True:
try:
t = await self._async_fetch_trades(pair,
params={self._trades_pagination_arg: from_id})
t, from_id_next = await self._async_fetch_trades(
pair, params={self._trades_pagination_arg: from_id})
if t:
# Skip last id since its the key for the next call
trades.extend(t[:-1])
if from_id == t[-1][1] or t[-1][0] > until:
trades.extend(t[x])
if from_id == from_id_next or t[-1][0] > until:
logger.debug(f"Stopping because from_id did not change. "
f"Reached {t[-1][0]} > {until}")
# Reached the end of the defined-download period - add last trade as well.
trades.extend(t[-1:])
if has_overlap:
trades.extend(t[-1:])
break
from_id = t[-1][1]
from_id = from_id_next
else:
logger.debug("Stopping as no more trades were returned.")
break
@@ -2676,19 +2695,19 @@ class Exchange:
# DEFAULT_TRADES_COLUMNS: 1 -> id
while True:
try:
t = await self._async_fetch_trades(pair, since=since)
t, since_next = await self._async_fetch_trades(pair, since=since)
if t:
# No more trades to download available at the exchange,
# So we repeatedly get the same trade over and over again.
if since == t[-1][0] and len(t) == 1:
if since == since_next and len(t) == 1:
logger.debug("Stopping because no more trades are available.")
break
since = t[-1][0]
since = since_next
trades.extend(t)
# Reached the end of the defined-download period
if until and t[-1][0] > until:
if until and since_next > until:
logger.debug(
f"Stopping because until was reached. {t[-1][0]} > {until}")
f"Stopping because until was reached. {since_next} > {until}")
break
else:
logger.debug("Stopping as no more trades were returned.")
@@ -2806,6 +2825,8 @@ class Exchange:
symbol=pair,
since=since
)
self._log_exchange_response('funding_history', funding_history,
add_info=f"pair: {pair}, since: {since}")
return sum(fee['amount'] for fee in funding_history)
except ccxt.DDoSProtection as e:
raise DDosProtection(e) from e
@@ -3122,17 +3143,16 @@ class Exchange:
# Only really relevant for trades very close to the full hour
open_date = timeframe_to_prev_date('1h', open_date)
timeframe = self._ft_has['mark_ohlcv_timeframe']
timeframe_ff = self._ft_has.get('funding_fee_timeframe',
self._ft_has['mark_ohlcv_timeframe'])
timeframe_ff = self._ft_has['funding_fee_timeframe']
mark_price_type = CandleType.from_string(self._ft_has["mark_ohlcv_price"])
if not close_date:
close_date = datetime.now(timezone.utc)
since_ms = int(timeframe_to_prev_date(timeframe, open_date).timestamp()) * 1000
mark_comb: PairWithTimeframe = (
pair, timeframe, CandleType.from_string(self._ft_has["mark_ohlcv_price"]))
mark_comb: PairWithTimeframe = (pair, timeframe, mark_price_type)
funding_comb: PairWithTimeframe = (pair, timeframe_ff, CandleType.FUNDING_RATE)
candle_histories = self.refresh_latest_ohlcv(
[mark_comb, funding_comb],
since_ms=since_ms,