Merge pull request #9706 from freqtrade/fix/kraken_datadl

improve data-download when using `--dl-trades`
This commit is contained in:
Matthias
2024-01-22 10:14:12 +01:00
committed by GitHub
5 changed files with 133 additions and 80 deletions

View File

@@ -2216,13 +2216,13 @@ class Exchange:
@retrier_async
async def _async_fetch_trades(self, pair: str,
since: Optional[int] = None,
params: Optional[dict] = None) -> List[List]:
params: Optional[dict] = None) -> Tuple[List[List], Any]:
"""
Asyncronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange.
:param pair: Pair to fetch trade data for
:param since: Since as integer timestamp in milliseconds
returns: List of dicts containing trades
returns: List of dicts containing trades, the next iteration value (new "since" or trade_id)
"""
try:
# fetch trades asynchronously
@@ -2237,7 +2237,8 @@ class Exchange:
)
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
trades = self._trades_contracts_to_amount(trades)
return trades_dict_to_list(trades)
pagination_value = self._get_trade_pagination_next_value(trades)
return trades_dict_to_list(trades), pagination_value
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical trade data.'
@@ -2250,6 +2251,25 @@ class Exchange:
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e
def _valid_trade_pagination_id(self, pair: str, from_id: str) -> bool:
"""
Verify trade-pagination id is valid.
Workaround for odd Kraken issue where ID is sometimes wrong.
"""
return True
def _get_trade_pagination_next_value(self, trades: List[Dict]):
"""
Extract pagination id for the next "from_id" value
Applies only to fetch_trade_history by id.
"""
if not trades:
return None
if self._trades_pagination == 'id':
return trades[-1].get('id')
else:
return trades[-1].get('timestamp')
async def _async_get_trade_history_id(self, pair: str,
until: int,
since: Optional[int] = None,
@@ -2265,33 +2285,35 @@ class Exchange:
"""
trades: List[List] = []
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
has_overlap = self._ft_has.get('trades_pagination_overlap', True)
# Skip last trade by default since its the key for the next call
x = slice(None, -1) if has_overlap else slice(None)
if not from_id:
if not from_id or not self._valid_trade_pagination_id(pair, from_id):
# Fetch first elements using timebased method to get an ID to paginate on
# Depending on the Exchange, this can introduce a drift at the start of the interval
# of up to an hour.
# e.g. Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first trades.
t = await self._async_fetch_trades(pair, since=since)
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
from_id = t[-1][1]
trades.extend(t[:-1])
t, from_id = await self._async_fetch_trades(pair, since=since)
trades.extend(t[x])
while True:
try:
t = await self._async_fetch_trades(pair,
params={self._trades_pagination_arg: from_id})
t, from_id_next = await self._async_fetch_trades(
pair, params={self._trades_pagination_arg: from_id})
if t:
# Skip last id since its the key for the next call
trades.extend(t[:-1])
if from_id == t[-1][1] or t[-1][0] > until:
trades.extend(t[x])
if from_id == from_id_next or t[-1][0] > until:
logger.debug(f"Stopping because from_id did not change. "
f"Reached {t[-1][0]} > {until}")
# Reached the end of the defined-download period - add last trade as well.
trades.extend(t[-1:])
if has_overlap:
trades.extend(t[-1:])
break
from_id = t[-1][1]
from_id = from_id_next
else:
logger.debug("Stopping as no more trades were returned.")
break
@@ -2317,19 +2339,19 @@ class Exchange:
# DEFAULT_TRADES_COLUMNS: 1 -> id
while True:
try:
t = await self._async_fetch_trades(pair, since=since)
t, since_next = await self._async_fetch_trades(pair, since=since)
if t:
# No more trades to download available at the exchange,
# So we repeatedly get the same trade over and over again.
if since == t[-1][0] and len(t) == 1:
if since == since_next and len(t) == 1:
logger.debug("Stopping because no more trades are available.")
break
since = t[-1][0]
since = since_next
trades.extend(t)
# Reached the end of the defined-download period
if until and t[-1][0] > until:
if until and since_next > until:
logger.debug(
f"Stopping because until was reached. {t[-1][0]} > {until}")
f"Stopping because until was reached. {since_next} > {until}")
break
else:
logger.debug("Stopping as no more trades were returned.")

View File

@@ -30,6 +30,7 @@ class Kraken(Exchange):
"ohlcv_has_history": False,
"trades_pagination": "id",
"trades_pagination_arg": "since",
"trades_pagination_overlap": False,
"mark_ohlcv_timeframe": "4h",
}
@@ -157,18 +158,30 @@ class Kraken(Exchange):
return fees if is_short else -fees
def _trades_contracts_to_amount(self, trades: List) -> List:
def _get_trade_pagination_next_value(self, trades: List[Dict]):
"""
Fix "last" id issue for kraken data downloads
This whole override can probably be removed once the following
issue is closed in ccxt: https://github.com/ccxt/ccxt/issues/15827
Extract pagination id for the next "from_id" value
Applies only to fetch_trade_history by id.
"""
super()._trades_contracts_to_amount(trades)
if (
len(trades) > 0
and isinstance(trades[-1].get('info'), list)
and len(trades[-1].get('info', [])) > 7
):
if len(trades) > 0:
if (
isinstance(trades[-1].get('info'), list)
and len(trades[-1].get('info', [])) > 7
):
# Trade response's "last" value.
return trades[-1].get('info', [])[-1]
# Fall back to timestamp if info is somehow empty.
return trades[-1].get('timestamp')
return None
trades[-1]['id'] = trades[-1].get('info', [])[-1]
return trades
def _valid_trade_pagination_id(self, pair: str, from_id: str) -> bool:
"""
Verify trade-pagination id is valid.
Workaround for odd Kraken issue where ID is sometimes wrong.
"""
# Regular id's are in timestamp format 1705443695120072285
# If the id is smaller than 19 characters, it's not a valid timestamp.
if len(from_id) >= 19:
return True
logger.debug(f"{pair} - trade-pagination id is not valid. Fallback to timestamp.")
return False

View File

@@ -2386,14 +2386,7 @@ def trades_history_df(trades_history):
@pytest.fixture(scope="function")
def fetch_trades_result():
return [{'info': {'a': 126181329,
'p': '0.01962700',
'q': '0.04000000',
'f': 138604155,
'l': 138604155,
'T': 1565798399463,
'm': False,
'M': True},
return [{'info': ['0.01962700', '0.04000000', '1565798399.4631551', 'b', 'm', '', '126181329'],
'timestamp': 1565798399463,
'datetime': '2019-08-14T15:59:59.463Z',
'symbol': 'ETH/BTC',
@@ -2406,14 +2399,7 @@ def fetch_trades_result():
'amount': 0.04,
'cost': 0.00078508,
'fee': None},
{'info': {'a': 126181330,
'p': '0.01962700',
'q': '0.24400000',
'f': 138604156,
'l': 138604156,
'T': 1565798399629,
'm': False,
'M': True},
{'info': ['0.01962700', '0.24400000', '1565798399.6291551', 'b', 'm', '', '126181330'],
'timestamp': 1565798399629,
'datetime': '2019-08-14T15:59:59.629Z',
'symbol': 'ETH/BTC',
@@ -2426,14 +2412,7 @@ def fetch_trades_result():
'amount': 0.244,
'cost': 0.004788987999999999,
'fee': None},
{'info': {'a': 126181331,
'p': '0.01962600',
'q': '0.01100000',
'f': 138604157,
'l': 138604157,
'T': 1565798399752,
'm': True,
'M': True},
{'info': ['0.01962600', '0.01100000', '1565798399.7521551', 's', 'm', '', '126181331'],
'timestamp': 1565798399752,
'datetime': '2019-08-14T15:59:59.752Z',
'symbol': 'ETH/BTC',
@@ -2446,14 +2425,7 @@ def fetch_trades_result():
'amount': 0.011,
'cost': 0.00021588599999999999,
'fee': None},
{'info': {'a': 126181332,
'p': '0.01962600',
'q': '0.01100000',
'f': 138604158,
'l': 138604158,
'T': 1565798399862,
'm': True,
'M': True},
{'info': ['0.01962600', '0.01100000', '1565798399.8621551', 's', 'm', '', '126181332'],
'timestamp': 1565798399862,
'datetime': '2019-08-14T15:59:59.862Z',
'symbol': 'ETH/BTC',
@@ -2466,14 +2438,8 @@ def fetch_trades_result():
'amount': 0.011,
'cost': 0.00021588599999999999,
'fee': None},
{'info': {'a': 126181333,
'p': '0.01952600',
'q': '0.01200000',
'f': 138604158,
'l': 138604158,
'T': 1565798399872,
'm': True,
'M': True},
{'info': ['0.01952600', '0.01200000', '1565798399.8721551', 's', 'm', '', '126181333',
1565798399872512133],
'timestamp': 1565798399872,
'datetime': '2019-08-14T15:59:59.872Z',
'symbol': 'ETH/BTC',

View File

@@ -2844,10 +2844,17 @@ async def test__async_fetch_trades(default_conf, mocker, caplog, exchange_name,
exchange._api_async.fetch_trades = get_mock_coro(fetch_trades_result)
pair = 'ETH/BTC'
res = await exchange._async_fetch_trades(pair, since=None, params=None)
res, pagid = await exchange._async_fetch_trades(pair, since=None, params=None)
assert isinstance(res, list)
assert isinstance(res[0], list)
assert isinstance(res[1], list)
if exchange._trades_pagination == 'id':
if exchange_name == 'kraken':
assert pagid == 1565798399872512133
else:
assert pagid == '126181333'
else:
assert pagid == 1565798399872
assert exchange._api_async.fetch_trades.call_count == 1
assert exchange._api_async.fetch_trades.call_args[0][0] == pair
@@ -2856,11 +2863,20 @@ async def test__async_fetch_trades(default_conf, mocker, caplog, exchange_name,
assert log_has_re(f"Fetching trades for pair {pair}, since .*", caplog)
caplog.clear()
exchange._api_async.fetch_trades.reset_mock()
res = await exchange._async_fetch_trades(pair, since=None, params={'from': '123'})
res, pagid = await exchange._async_fetch_trades(pair, since=None, params={'from': '123'})
assert exchange._api_async.fetch_trades.call_count == 1
assert exchange._api_async.fetch_trades.call_args[0][0] == pair
assert exchange._api_async.fetch_trades.call_args[1]['limit'] == 1000
assert exchange._api_async.fetch_trades.call_args[1]['params'] == {'from': '123'}
if exchange._trades_pagination == 'id':
if exchange_name == 'kraken':
assert pagid == 1565798399872512133
else:
assert pagid == '126181333'
else:
assert pagid == 1565798399872
assert log_has_re(f"Fetching trades for pair {pair}, params: .*", caplog)
exchange.close()
@@ -2915,8 +2931,9 @@ async def test__async_fetch_trades_contract_size(default_conf, mocker, caplog, e
)
pair = 'ETH/USDT:USDT'
res = await exchange._async_fetch_trades(pair, since=None, params=None)
res, pagid = await exchange._async_fetch_trades(pair, since=None, params=None)
assert res[0][5] == 300
assert pagid is not None
exchange.close()
@@ -2926,13 +2943,17 @@ async def test__async_get_trade_history_id(default_conf, mocker, exchange_name,
fetch_trades_result):
exchange = get_patched_exchange(mocker, default_conf, id=exchange_name)
if exchange._trades_pagination != 'id':
exchange.close()
pytest.skip("Exchange does not support pagination by trade id")
pagination_arg = exchange._trades_pagination_arg
async def mock_get_trade_hist(pair, *args, **kwargs):
if 'since' in kwargs:
# Return first 3
return fetch_trades_result[:-2]
elif kwargs.get('params', {}).get(pagination_arg) == fetch_trades_result[-3]['id']:
elif kwargs.get('params', {}).get(pagination_arg) in (
fetch_trades_result[-3]['id'], 1565798399752):
# Return 2
return fetch_trades_result[-3:-1]
else:
@@ -2948,7 +2969,8 @@ async def test__async_get_trade_history_id(default_conf, mocker, exchange_name,
assert isinstance(ret, tuple)
assert ret[0] == pair
assert isinstance(ret[1], list)
assert len(ret[1]) == len(fetch_trades_result)
if exchange_name != 'kraken':
assert len(ret[1]) == len(fetch_trades_result)
assert exchange._api_async.fetch_trades.call_count == 3
fetch_trades_cal = exchange._api_async.fetch_trades.call_args_list
# first call (using since, not fromId)
@@ -2961,6 +2983,21 @@ async def test__async_get_trade_history_id(default_conf, mocker, exchange_name,
assert exchange._ft_has['trades_pagination_arg'] in fetch_trades_cal[1][1]['params']
@pytest.mark.parametrize('trade_id, expected', [
('1234', True),
('170544369512007228', True),
('1705443695120072285', True),
('170544369512007228555', True),
])
@pytest.mark.parametrize("exchange_name", EXCHANGES)
def test__valid_trade_pagination_id(mocker, default_conf_usdt, exchange_name, trade_id, expected):
if exchange_name == 'kraken':
pytest.skip("Kraken has a different pagination id format, and an explicit test.")
exchange = get_patched_exchange(mocker, default_conf_usdt, id=exchange_name)
assert exchange._valid_trade_pagination_id('XRP/USDT', trade_id) == expected
@pytest.mark.asyncio
@pytest.mark.parametrize("exchange_name", EXCHANGES)
async def test__async_get_trade_history_time(default_conf, mocker, caplog, exchange_name,
@@ -2976,6 +3013,9 @@ async def test__async_get_trade_history_time(default_conf, mocker, caplog, excha
caplog.set_level(logging.DEBUG)
exchange = get_patched_exchange(mocker, default_conf, id=exchange_name)
if exchange._trades_pagination != 'time':
exchange.close()
pytest.skip("Exchange does not support pagination by timestamp")
# Monkey-patch async function
exchange._api_async.fetch_trades = MagicMock(side_effect=mock_get_trade_hist)
pair = 'ETH/BTC'
@@ -3008,9 +3048,9 @@ async def test__async_get_trade_history_time_empty(default_conf, mocker, caplog,
async def mock_get_trade_hist(pair, *args, **kwargs):
if kwargs['since'] == trades_history[0][0]:
return trades_history[:-1]
return trades_history[:-1], trades_history[:-1][-1][0]
else:
return []
return [], None
caplog.set_level(logging.DEBUG)
exchange = get_patched_exchange(mocker, default_conf, id=exchange_name)
@@ -5312,3 +5352,4 @@ def test_price_to_precision_with_default_conf(default_conf, mocker):
patched_ex = get_patched_exchange(mocker, conf)
prec_price = patched_ex.price_to_precision("XRP/USDT", 1.0000000101)
assert prec_price == 1.00000001
assert prec_price == 1.00000001

View File

@@ -271,3 +271,14 @@ def test_stoploss_adjust_kraken(mocker, default_conf, sl1, sl2, sl3, side):
# diff. order type ...
order['type'] = 'limit'
assert exchange.stoploss_adjust(sl3, order, side=side)
@pytest.mark.parametrize('trade_id, expected', [
('1234', False),
('170544369512007228', False),
('1705443695120072285', True),
('170544369512007228555', True),
])
def test__valid_trade_pagination_id_kraken(mocker, default_conf_usdt, trade_id, expected):
exchange = get_patched_exchange(mocker, default_conf_usdt, id='kraken')
assert exchange._valid_trade_pagination_id('XRP/USDT', trade_id) == expected