Merge pull request #11289 from freqtrade/feat/binance_trades_fast

Binance: Download trades "fast" from binance.vision
This commit is contained in:
Matthias
2025-03-02 14:43:47 +01:00
committed by GitHub
7 changed files with 479 additions and 1 deletions

View File

@@ -11,7 +11,11 @@ from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.enums import CandleType, MarginMode, PriceType, TradingMode from freqtrade.enums import CandleType, MarginMode, PriceType, TradingMode
from freqtrade.exceptions import DDosProtection, OperationalException, TemporaryError from freqtrade.exceptions import DDosProtection, OperationalException, TemporaryError
from freqtrade.exchange import Exchange from freqtrade.exchange import Exchange
from freqtrade.exchange.binance_public_data import concat_safe, download_archive_ohlcv from freqtrade.exchange.binance_public_data import (
concat_safe,
download_archive_ohlcv,
download_archive_trades,
)
from freqtrade.exchange.common import retrier from freqtrade.exchange.common import retrier
from freqtrade.exchange.exchange_types import FtHas, Tickers from freqtrade.exchange.exchange_types import FtHas, Tickers
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_msecs from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_msecs
@@ -379,3 +383,48 @@ class Binance(Exchange):
if not t: if not t:
return [], "0" return [], "0"
return t, from_id return t, from_id
async def _async_get_trade_history_id(
self, pair: str, until: int, since: int, from_id: str | None = None
) -> tuple[str, list[list]]:
logger.info(f"Fetching trades from Binance, {from_id=}, {since=}, {until=}")
if not self._config["exchange"].get("only_from_ccxt", False):
if from_id is None or not since:
trades = await self._api_async.fetch_trades(
pair,
params={
self._trades_pagination_arg: "0",
},
limit=5,
)
listing_date: int = trades[0]["timestamp"]
since = max(since, listing_date)
_, res = await download_archive_trades(
CandleType.SPOT,
pair,
since_ms=since,
until_ms=until,
markets=self.markets,
)
if not res:
end_time = since
end_id = from_id
else:
end_time = res[-1][0]
end_id = res[-1][1]
if end_time and end_time >= until:
return pair, res
else:
_, res2 = await super()._async_get_trade_history_id(
pair, until=until, since=end_time, from_id=end_id
)
res.extend(res2)
return pair, res
return await super()._async_get_trade_history_id(
pair, until=until, since=since, from_id=from_id
)

View File

@@ -1,5 +1,6 @@
""" """
Fetch daily-archived OHLCV data from https://data.binance.vision/ Fetch daily-archived OHLCV data from https://data.binance.vision/
Documentation can be found in https://github.com/binance/binance-public-data
""" """
import asyncio import asyncio
@@ -10,9 +11,11 @@ from io import BytesIO
from typing import Any from typing import Any
import aiohttp import aiohttp
import numpy as np
import pandas as pd import pandas as pd
from pandas import DataFrame from pandas import DataFrame
from freqtrade.constants import DEFAULT_TRADES_COLUMNS
from freqtrade.enums import CandleType from freqtrade.enums import CandleType
from freqtrade.misc import chunks from freqtrade.misc import chunks
from freqtrade.util.datetime_helpers import dt_from_ts, dt_now from freqtrade.util.datetime_helpers import dt_from_ts, dt_now
@@ -212,6 +215,20 @@ def binance_vision_ohlcv_zip_url(
return url return url
def binance_vision_trades_zip_url(symbol: str, candle_type: CandleType, date: date) -> str:
"""
example urls:
https://data.binance.vision/data/spot/daily/aggTrades/BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip
https://data.binance.vision/data/futures/um/daily/aggTrades/BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip
"""
asset_type_url_segment = candle_type_to_url_segment(candle_type)
url = (
f"https://data.binance.vision/data/{asset_type_url_segment}/daily/aggTrades/{symbol}"
f"/{symbol}-aggTrades-{date.strftime('%Y-%m-%d')}.zip"
)
return url
async def get_daily_ohlcv( async def get_daily_ohlcv(
symbol: str, symbol: str,
timeframe: str, timeframe: str,
@@ -280,3 +297,203 @@ async def get_daily_ohlcv(
if isinstance(e, Http404) or retry > retry_count: if isinstance(e, Http404) or retry > retry_count:
logger.debug(f"Failed to get data from {url}: {e}") logger.debug(f"Failed to get data from {url}: {e}")
raise raise
async def download_archive_trades(
candle_type: CandleType,
pair: str,
*,
since_ms: int,
until_ms: int | None,
markets: dict[str, Any],
stop_on_404: bool = True,
) -> tuple[str, list[list]]:
try:
symbol = markets[pair]["id"]
last_available_date = dt_now() - timedelta(days=2)
start = dt_from_ts(since_ms)
end = dt_from_ts(until_ms) if until_ms else dt_now()
end = min(end, last_available_date)
if start >= end:
return pair, []
result_list = await _download_archive_trades(
symbol, pair, candle_type, start, end, stop_on_404
)
return pair, result_list
except Exception as e:
logger.warning(
"An exception occurred during fast trades download from Binance, falling back to "
"the slower REST API, this can take a lot more time.",
exc_info=e,
)
return pair, []
def parse_trades_from_zip(csvf):
# https://github.com/binance/binance-public-data/issues/283
first_byte = csvf.read(1)[0]
if chr(first_byte).isdigit():
# spot
header = None
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
"is_best_match",
]
else:
# futures
header = 0
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
]
csvf.seek(0)
df = pd.read_csv(
csvf,
names=names,
header=header,
)
df.loc[:, "cost"] = df["price"] * df["amount"]
# Side is reversed intentionally
# based on ccxt parseTrade logic.
df.loc[:, "side"] = np.where(df["is_buyer_maker"], "sell", "buy")
df.loc[:, "type"] = None
# Convert timestamp to ms
df.loc[:, "timestamp"] = np.where(
df["timestamp"] > 10000000000000,
df["timestamp"] // 1000,
df["timestamp"],
)
return df.loc[:, DEFAULT_TRADES_COLUMNS].to_records(index=False).tolist()
async def get_daily_trades(
symbol: str,
candle_type: CandleType,
date: date,
session: aiohttp.ClientSession,
retry_count: int = 3,
retry_delay: float = 0.0,
) -> list[list]:
"""
Get daily OHLCV from https://data.binance.vision
See https://github.com/binance/binance-public-data
:symbol: binance symbol name, e.g. BTCUSDT
:candle_type: SPOT or FUTURES
:date: the returned DataFrame will cover the entire day of `date` in UTC
:session: an aiohttp.ClientSession instance
:retry_count: times to retry before returning the exceptions
:retry_delay: the time to wait before every retry
:return: a list containing trades in DEFAULT_TRADES_COLUMNS format
"""
url = binance_vision_trades_zip_url(symbol, candle_type, date)
logger.debug(f"download trades data from binance: {url}")
retry = 0
while True:
if retry > 0:
sleep_secs = retry * retry_delay
logger.debug(
f"[{retry}/{retry_count}] retry to download {url} after {sleep_secs} seconds"
)
await asyncio.sleep(sleep_secs)
try:
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.read()
logger.debug(f"Successfully downloaded {url}")
with zipfile.ZipFile(BytesIO(content)) as zipf:
with zipf.open(zipf.namelist()[0]) as csvf:
return parse_trades_from_zip(csvf)
elif resp.status == 404:
logger.debug(f"Failed to download {url}")
raise Http404(f"404: {url}", date, url)
else:
raise BadHttpStatus(f"{resp.status} - {resp.reason}")
except Exception as e:
logger.info("download Daily_trades raised: %s", e)
retry += 1
if isinstance(e, Http404) or retry > retry_count:
logger.debug(f"Failed to get data from {url}: {e}")
raise
async def _download_archive_trades(
symbol: str,
pair: str,
candle_type: CandleType,
start: date,
end: date,
stop_on_404: bool,
) -> list[list]:
# daily dataframes, `None` indicates missing data in that day (when `stop_on_404` is False)
results: list[list] = []
# the current day being processing, starting at 1.
current_day = 0
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector, trust_env=True) as session:
# the HTTP connections has been throttled by TCPConnector
for dates in chunks(list(date_range(start, end)), 30):
tasks = [
asyncio.create_task(get_daily_trades(symbol, candle_type, date, session))
for date in dates
]
for task in tasks:
current_day += 1
try:
result = await task
except Http404 as e:
if stop_on_404:
logger.debug(f"Failed to download {e.url} due to 404.")
# A 404 error on the first day indicates missing data
# on https://data.binance.vision, we provide the warning and the advice.
# https://github.com/freqtrade/freqtrade/blob/acc53065e5fa7ab5197073276306dc9dc3adbfa3/tests/exchange_online/test_binance_compare_ohlcv.py#L7
if current_day == 1:
logger.warning(
f"Fast download is unavailable due to missing data: "
f"{e.url}. Falling back to the slower REST API, "
"which may take more time."
)
if pair in ["BTC/USDT:USDT", "ETH/USDT:USDT", "BCH/USDT:USDT"]:
logger.warning(
f"To avoid the delay, you can first download {pair} using "
"`--timerange <start date>-20200101`, and then download the "
"remaining data with `--timerange 20200101-<end date>`."
)
else:
logger.warning(
f"Binance fast download for {pair} stopped at {e.date} due to "
f"missing data: {e.url}, falling back to rest API for the "
"remaining data, this can take more time."
)
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return results
except BaseException as e:
logger.warning(f"An exception raised: : {e}")
# Directly return the existing data, do not allow the gap within the data
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return results
else:
# Happy case
results.extend(result)
return results

View File

@@ -6,6 +6,7 @@ import ccxt
import pandas as pd import pandas as pd
import pytest import pytest
from freqtrade.data.converter.trade_converter import trades_dict_to_list
from freqtrade.enums import CandleType, MarginMode, TradingMode from freqtrade.enums import CandleType, MarginMode, TradingMode
from freqtrade.exceptions import DependencyException, InvalidOrderException, OperationalException from freqtrade.exceptions import DependencyException, InvalidOrderException, OperationalException
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_seconds from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_seconds
@@ -1002,6 +1003,7 @@ def test_get_maintenance_ratio_and_amt_binance(
async def test__async_get_trade_history_id_binance(default_conf_usdt, mocker, fetch_trades_result): async def test__async_get_trade_history_id_binance(default_conf_usdt, mocker, fetch_trades_result):
default_conf_usdt["exchange"]["only_from_ccxt"] = True
exchange = get_patched_exchange(mocker, default_conf_usdt, exchange="binance") exchange = get_patched_exchange(mocker, default_conf_usdt, exchange="binance")
async def mock_get_trade_hist(pair, *args, **kwargs): async def mock_get_trade_hist(pair, *args, **kwargs):
@@ -1056,3 +1058,53 @@ async def test__async_get_trade_history_id_binance(default_conf_usdt, mocker, fe
# Clean up event loop to avoid warnings # Clean up event loop to avoid warnings
exchange.close() exchange.close()
async def test__async_get_trade_history_id_binance_fast(
default_conf_usdt, mocker, fetch_trades_result
):
default_conf_usdt["exchange"]["only_from_ccxt"] = False
exchange = get_patched_exchange(mocker, default_conf_usdt, exchange="binance")
async def mock_get_trade_hist(pair, *args, **kwargs):
if "since" in kwargs:
pass
# older than initial call
# if kwargs["since"] < 1565798399752:
# return []
# else:
# # Don't expect to get here
# raise ValueError("Unexpected call")
# # return fetch_trades_result[:-2]
elif kwargs.get("params", {}).get(exchange._trades_pagination_arg) == "0":
# Return first 3
return fetch_trades_result[:-2]
# elif kwargs.get("params", {}).get(exchange._trades_pagination_arg) in (
# fetch_trades_result[-3]["id"],
# 1565798399752,
# ):
# # Return 2
# return fetch_trades_result[-3:-1]
# else:
# # Return last 2
# return fetch_trades_result[-2:]
pair = "ETH/BTC"
mocker.patch(
"freqtrade.exchange.binance.download_archive_trades",
return_value=(pair, trades_dict_to_list(fetch_trades_result[-2:])),
)
exchange._api_async.fetch_trades = MagicMock(side_effect=mock_get_trade_hist)
ret = await exchange._async_get_trade_history(
pair,
since=fetch_trades_result[0]["timestamp"],
until=fetch_trades_result[-1]["timestamp"] - 1,
)
assert ret[0] == pair
assert isinstance(ret[1], list)
# Clean up event loop to avoid warnings
exchange.close()

View File

@@ -14,11 +14,15 @@ from freqtrade.enums import CandleType
from freqtrade.exchange.binance_public_data import ( from freqtrade.exchange.binance_public_data import (
BadHttpStatus, BadHttpStatus,
Http404, Http404,
binance_vision_trades_zip_url,
binance_vision_zip_name, binance_vision_zip_name,
download_archive_ohlcv, download_archive_ohlcv,
download_archive_trades,
get_daily_ohlcv, get_daily_ohlcv,
get_daily_trades,
) )
from freqtrade.util.datetime_helpers import dt_ts, dt_utc from freqtrade.util.datetime_helpers import dt_ts, dt_utc
from ft_client.test_client.test_rest_client import log_has_re
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@@ -337,3 +341,156 @@ async def test_get_daily_ohlcv(mocker, testdatadir):
with pytest.raises(zipfile.BadZipFile): with pytest.raises(zipfile.BadZipFile):
df = await get_daily_ohlcv(symbol, timeframe, CandleType.SPOT, date, session) df = await get_daily_ohlcv(symbol, timeframe, CandleType.SPOT, date, session)
assert get.call_count == 4 # 1 + 3 default retries assert get.call_count == 4 # 1 + 3 default retries
async def test_download_archive_trades(mocker, caplog):
pair = "BTC/USDT"
since_ms = dt_ts(dt_utc(2020, 1, 1))
until_ms = dt_ts(dt_utc(2020, 1, 2))
markets = {"BTC/USDT": {"id": "BTCUSDT"}, "BTC/USDT:USDT": {"id": "BTCUSDT"}}
mocker.patch("freqtrade.exchange.binance_public_data.get_daily_trades", return_value=[[2, 3]])
pair1, res = await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == [[2, 3], [2, 3]]
mocker.patch(
"freqtrade.exchange.binance_public_data.get_daily_trades",
side_effect=Http404("xxx", dt_utc(2020, 1, 1), "http://example.com/something"),
)
pair1, res = await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == []
# exit on day 1
assert log_has_re("Fast download is unavailable", caplog)
# Test fail on day 2
caplog.clear()
mocker.patch(
"freqtrade.exchange.binance_public_data.get_daily_trades",
side_effect=[
[[2, 3]],
[[2, 3]],
Http404("xxx", dt_utc(2020, 1, 2), "http://example.com/something"),
[[2, 3]],
],
)
# Download 3 days
until_ms = dt_ts(dt_utc(2020, 1, 3))
pair1, res = await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == [[2, 3], [2, 3]]
assert log_has_re(r"Binance fast download .*stopped", caplog)
async def test_download_archive_trades_exception(mocker, caplog):
pair = "BTC/USDT"
since_ms = dt_ts(dt_utc(2020, 1, 1))
until_ms = dt_ts(dt_utc(2020, 1, 2))
markets = {"BTC/USDT": {"id": "BTCUSDT"}, "BTC/USDT:USDT": {"id": "BTCUSDT"}}
mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get", side_effect=RuntimeError
)
pair1, res = await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == []
mocker.patch(
"freqtrade.exchange.binance_public_data._download_archive_trades", side_effect=RuntimeError
)
await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == []
assert log_has_re("An exception occurred during fast trades download", caplog)
async def test_binance_vision_trades_zip_url():
url = binance_vision_trades_zip_url("BTCUSDT", CandleType.SPOT, dt_utc(2023, 10, 27))
assert (
url == "https://data.binance.vision/data/spot/daily/aggTrades/"
"BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip"
)
url = binance_vision_trades_zip_url("BTCUSDT", CandleType.FUTURES, dt_utc(2023, 10, 28))
assert (
url == "https://data.binance.vision/data/futures/um/daily/aggTrades/"
"BTCUSDT/BTCUSDT-aggTrades-2023-10-28.zip"
)
async def test_get_daily_trades(mocker, testdatadir):
symbol = "PEPEUSDT"
symbol_futures = "APEUSDT"
date = dt_utc(2024, 10, 28).date()
first_date = 1729987202368
last_date = 1730073596350
async with aiohttp.ClientSession() as session:
spot_path = (
testdatadir / "binance/binance_public_data/spot-PEPEUSDT-aggTrades-2024-10-27.zip"
)
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(spot_path.read_bytes(), 200),
)
res = await get_daily_trades(symbol, CandleType.SPOT, date, session)
assert get.call_count == 1
assert res[0][0] == first_date
assert res[-1][0] == last_date
futures_path = (
testdatadir / "binance/binance_public_data/futures-APEUSDT-aggTrades-2024-10-18.zip"
)
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(futures_path.read_bytes(), 200),
)
res_fut = await get_daily_trades(symbol_futures, CandleType.FUTURES, date, session)
assert get.call_count == 1
assert res_fut[0][0] == 1729209603958
assert res_fut[-1][0] == 1729295981272
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"", 404),
)
with pytest.raises(Http404):
await get_daily_trades(symbol, CandleType.SPOT, date, session, retry_delay=0)
assert get.call_count == 1
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"", 500),
)
mocker.patch("asyncio.sleep")
with pytest.raises(BadHttpStatus):
await get_daily_trades(symbol, CandleType.SPOT, date, session)
assert get.call_count == 4 # 1 + 3 default retries
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"nop", 200),
)
with pytest.raises(zipfile.BadZipFile):
await get_daily_trades(symbol, CandleType.SPOT, date, session)
assert get.call_count == 4 # 1 + 3 default retries

View File

@@ -2373,6 +2373,8 @@ def test_refresh_latest_trades(
caplog.set_level(logging.DEBUG) caplog.set_level(logging.DEBUG)
use_trades_conf = default_conf use_trades_conf = default_conf
use_trades_conf["exchange"]["use_public_trades"] = True use_trades_conf["exchange"]["use_public_trades"] = True
use_trades_conf["exchange"]["only_from_ccxt"] = True
use_trades_conf["datadir"] = tmp_path use_trades_conf["datadir"] = tmp_path
use_trades_conf["orderflow"] = {"max_candles": 1500} use_trades_conf["orderflow"] = {"max_candles": 1500}
exchange = get_patched_exchange(mocker, use_trades_conf) exchange = get_patched_exchange(mocker, use_trades_conf)
@@ -3365,6 +3367,7 @@ async def test__async_fetch_trades_contract_size(
async def test__async_get_trade_history_id( async def test__async_get_trade_history_id(
default_conf, mocker, exchange_name, fetch_trades_result default_conf, mocker, exchange_name, fetch_trades_result
): ):
default_conf["exchange"]["only_from_ccxt"] = True
exchange = get_patched_exchange(mocker, default_conf, exchange=exchange_name) exchange = get_patched_exchange(mocker, default_conf, exchange=exchange_name)
if exchange._trades_pagination != "id": if exchange._trades_pagination != "id":
exchange.close() exchange.close()