Move extracting trade pagination id to fetch_trades

This commit is contained in:
Matthias
2024-01-21 15:22:03 +01:00
parent b56c663bea
commit c167575098

View File

@@ -2216,13 +2216,13 @@ class Exchange:
@retrier_async @retrier_async
async def _async_fetch_trades(self, pair: str, async def _async_fetch_trades(self, pair: str,
since: Optional[int] = None, since: Optional[int] = None,
params: Optional[dict] = None) -> List[List]: params: Optional[dict] = None) -> Tuple[List[List], Any]:
""" """
Asyncronously gets trade history using fetch_trades. Asyncronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange. Handles exchange errors, does one call to the exchange.
:param pair: Pair to fetch trade data for :param pair: Pair to fetch trade data for
:param since: Since as integer timestamp in milliseconds :param since: Since as integer timestamp in milliseconds
returns: List of dicts containing trades returns: List of dicts containing trades, the next iteration value (new "since" or trade_id)
""" """
try: try:
# fetch trades asynchronously # fetch trades asynchronously
@@ -2237,7 +2237,8 @@ class Exchange:
) )
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000) trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
trades = self._trades_contracts_to_amount(trades) trades = self._trades_contracts_to_amount(trades)
return trades_dict_to_list(trades) pagination_value = self._get_trade_pagination_next_value(trades)
return trades_dict_to_list(trades), pagination_value
except ccxt.NotSupported as e: except ccxt.NotSupported as e:
raise OperationalException( raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical trade data.' f'Exchange {self._api.name} does not support fetching historical trade data.'
@@ -2257,6 +2258,18 @@ class Exchange:
""" """
return True return True
def _get_trade_pagination_next_value(self, trades: List[Dict]):
"""
Extract pagination id for the next "from_id" value
Applies only to fetch_trade_history by id.
"""
if not trades:
return None
if self._trades_pagination == 'id':
return trades[-1].get('id')
else:
return trades[-1].get('timestamp')
async def _async_get_trade_history_id(self, pair: str, async def _async_get_trade_history_id(self, pair: str,
until: int, until: int,
since: Optional[int] = None, since: Optional[int] = None,
@@ -2272,6 +2285,8 @@ class Exchange:
""" """
trades: List[List] = [] trades: List[List] = []
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
if not from_id or not self._valid_trade_pagination_id(pair, from_id): if not from_id or not self._valid_trade_pagination_id(pair, from_id):
# Fetch first elements using timebased method to get an ID to paginate on # Fetch first elements using timebased method to get an ID to paginate on
@@ -2279,26 +2294,23 @@ class Exchange:
# of up to an hour. # of up to an hour.
# e.g. Binance returns the "last 1000" candles within a 1h time interval # e.g. Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first trades. # - so we will miss the first trades.
t = await self._async_fetch_trades(pair, since=since) t, from_id = await self._async_fetch_trades(pair, since=since)
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
from_id = t[-1][1]
trades.extend(t[:-1]) trades.extend(t[:-1])
while True: while True:
try: try:
t = await self._async_fetch_trades(pair, t, from_id_next = await self._async_fetch_trades(
params={self._trades_pagination_arg: from_id}) pair, params={self._trades_pagination_arg: from_id})
if t: if t:
# Skip last id since its the key for the next call # Skip last id since its the key for the next call
trades.extend(t[:-1]) trades.extend(t[:-1])
if from_id == t[-1][1] or t[-1][0] > until: if from_id == from_id_next or t[-1][0] > until:
logger.debug(f"Stopping because from_id did not change. " logger.debug(f"Stopping because from_id did not change. "
f"Reached {t[-1][0]} > {until}") f"Reached {t[-1][0]} > {until}")
# Reached the end of the defined-download period - add last trade as well. # Reached the end of the defined-download period - add last trade as well.
trades.extend(t[-1:]) trades.extend(t[-1:])
break break
from_id = t[-1][1] from_id = from_id_next
else: else:
logger.debug("Stopping as no more trades were returned.") logger.debug("Stopping as no more trades were returned.")
break break
@@ -2324,19 +2336,19 @@ class Exchange:
# DEFAULT_TRADES_COLUMNS: 1 -> id # DEFAULT_TRADES_COLUMNS: 1 -> id
while True: while True:
try: try:
t = await self._async_fetch_trades(pair, since=since) t, since_next = await self._async_fetch_trades(pair, since=since)
if t: if t:
# No more trades to download available at the exchange, # No more trades to download available at the exchange,
# So we repeatedly get the same trade over and over again. # So we repeatedly get the same trade over and over again.
if since == t[-1][0] and len(t) == 1: if since == since_next and len(t) == 1:
logger.debug("Stopping because no more trades are available.") logger.debug("Stopping because no more trades are available.")
break break
since = t[-1][0] since = since_next
trades.extend(t) trades.extend(t)
# Reached the end of the defined-download period # Reached the end of the defined-download period
if until and t[-1][0] > until: if until and since_next > until:
logger.debug( logger.debug(
f"Stopping because until was reached. {t[-1][0]} > {until}") f"Stopping because until was reached. {since_next} > {until}")
break break
else: else:
logger.debug("Stopping as no more trades were returned.") logger.debug("Stopping as no more trades were returned.")