Merge pull request #11289 from freqtrade/feat/binance_trades_fast

Binance: Download trades "fast" from binance.vision
This commit is contained in:
Matthias
2025-03-02 14:43:47 +01:00
committed by GitHub
7 changed files with 479 additions and 1 deletions

View File

@@ -11,7 +11,11 @@ from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.enums import CandleType, MarginMode, PriceType, TradingMode
from freqtrade.exceptions import DDosProtection, OperationalException, TemporaryError
from freqtrade.exchange import Exchange
from freqtrade.exchange.binance_public_data import concat_safe, download_archive_ohlcv
from freqtrade.exchange.binance_public_data import (
concat_safe,
download_archive_ohlcv,
download_archive_trades,
)
from freqtrade.exchange.common import retrier
from freqtrade.exchange.exchange_types import FtHas, Tickers
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_msecs
@@ -379,3 +383,48 @@ class Binance(Exchange):
if not t:
return [], "0"
return t, from_id
async def _async_get_trade_history_id(
self, pair: str, until: int, since: int, from_id: str | None = None
) -> tuple[str, list[list]]:
logger.info(f"Fetching trades from Binance, {from_id=}, {since=}, {until=}")
if not self._config["exchange"].get("only_from_ccxt", False):
if from_id is None or not since:
trades = await self._api_async.fetch_trades(
pair,
params={
self._trades_pagination_arg: "0",
},
limit=5,
)
listing_date: int = trades[0]["timestamp"]
since = max(since, listing_date)
_, res = await download_archive_trades(
CandleType.SPOT,
pair,
since_ms=since,
until_ms=until,
markets=self.markets,
)
if not res:
end_time = since
end_id = from_id
else:
end_time = res[-1][0]
end_id = res[-1][1]
if end_time and end_time >= until:
return pair, res
else:
_, res2 = await super()._async_get_trade_history_id(
pair, until=until, since=end_time, from_id=end_id
)
res.extend(res2)
return pair, res
return await super()._async_get_trade_history_id(
pair, until=until, since=since, from_id=from_id
)

View File

@@ -1,5 +1,6 @@
"""
Fetch daily-archived OHLCV data from https://data.binance.vision/
Documentation can be found in https://github.com/binance/binance-public-data
"""
import asyncio
@@ -10,9 +11,11 @@ from io import BytesIO
from typing import Any
import aiohttp
import numpy as np
import pandas as pd
from pandas import DataFrame
from freqtrade.constants import DEFAULT_TRADES_COLUMNS
from freqtrade.enums import CandleType
from freqtrade.misc import chunks
from freqtrade.util.datetime_helpers import dt_from_ts, dt_now
@@ -212,6 +215,20 @@ def binance_vision_ohlcv_zip_url(
return url
def binance_vision_trades_zip_url(symbol: str, candle_type: CandleType, date: date) -> str:
"""
example urls:
https://data.binance.vision/data/spot/daily/aggTrades/BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip
https://data.binance.vision/data/futures/um/daily/aggTrades/BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip
"""
asset_type_url_segment = candle_type_to_url_segment(candle_type)
url = (
f"https://data.binance.vision/data/{asset_type_url_segment}/daily/aggTrades/{symbol}"
f"/{symbol}-aggTrades-{date.strftime('%Y-%m-%d')}.zip"
)
return url
async def get_daily_ohlcv(
symbol: str,
timeframe: str,
@@ -280,3 +297,203 @@ async def get_daily_ohlcv(
if isinstance(e, Http404) or retry > retry_count:
logger.debug(f"Failed to get data from {url}: {e}")
raise
async def download_archive_trades(
candle_type: CandleType,
pair: str,
*,
since_ms: int,
until_ms: int | None,
markets: dict[str, Any],
stop_on_404: bool = True,
) -> tuple[str, list[list]]:
try:
symbol = markets[pair]["id"]
last_available_date = dt_now() - timedelta(days=2)
start = dt_from_ts(since_ms)
end = dt_from_ts(until_ms) if until_ms else dt_now()
end = min(end, last_available_date)
if start >= end:
return pair, []
result_list = await _download_archive_trades(
symbol, pair, candle_type, start, end, stop_on_404
)
return pair, result_list
except Exception as e:
logger.warning(
"An exception occurred during fast trades download from Binance, falling back to "
"the slower REST API, this can take a lot more time.",
exc_info=e,
)
return pair, []
def parse_trades_from_zip(csvf):
# https://github.com/binance/binance-public-data/issues/283
first_byte = csvf.read(1)[0]
if chr(first_byte).isdigit():
# spot
header = None
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
"is_best_match",
]
else:
# futures
header = 0
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
]
csvf.seek(0)
df = pd.read_csv(
csvf,
names=names,
header=header,
)
df.loc[:, "cost"] = df["price"] * df["amount"]
# Side is reversed intentionally
# based on ccxt parseTrade logic.
df.loc[:, "side"] = np.where(df["is_buyer_maker"], "sell", "buy")
df.loc[:, "type"] = None
# Convert timestamp to ms
df.loc[:, "timestamp"] = np.where(
df["timestamp"] > 10000000000000,
df["timestamp"] // 1000,
df["timestamp"],
)
return df.loc[:, DEFAULT_TRADES_COLUMNS].to_records(index=False).tolist()
async def get_daily_trades(
symbol: str,
candle_type: CandleType,
date: date,
session: aiohttp.ClientSession,
retry_count: int = 3,
retry_delay: float = 0.0,
) -> list[list]:
"""
Get daily OHLCV from https://data.binance.vision
See https://github.com/binance/binance-public-data
:symbol: binance symbol name, e.g. BTCUSDT
:candle_type: SPOT or FUTURES
:date: the returned DataFrame will cover the entire day of `date` in UTC
:session: an aiohttp.ClientSession instance
:retry_count: times to retry before returning the exceptions
:retry_delay: the time to wait before every retry
:return: a list containing trades in DEFAULT_TRADES_COLUMNS format
"""
url = binance_vision_trades_zip_url(symbol, candle_type, date)
logger.debug(f"download trades data from binance: {url}")
retry = 0
while True:
if retry > 0:
sleep_secs = retry * retry_delay
logger.debug(
f"[{retry}/{retry_count}] retry to download {url} after {sleep_secs} seconds"
)
await asyncio.sleep(sleep_secs)
try:
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.read()
logger.debug(f"Successfully downloaded {url}")
with zipfile.ZipFile(BytesIO(content)) as zipf:
with zipf.open(zipf.namelist()[0]) as csvf:
return parse_trades_from_zip(csvf)
elif resp.status == 404:
logger.debug(f"Failed to download {url}")
raise Http404(f"404: {url}", date, url)
else:
raise BadHttpStatus(f"{resp.status} - {resp.reason}")
except Exception as e:
logger.info("download Daily_trades raised: %s", e)
retry += 1
if isinstance(e, Http404) or retry > retry_count:
logger.debug(f"Failed to get data from {url}: {e}")
raise
async def _download_archive_trades(
symbol: str,
pair: str,
candle_type: CandleType,
start: date,
end: date,
stop_on_404: bool,
) -> list[list]:
# daily dataframes, `None` indicates missing data in that day (when `stop_on_404` is False)
results: list[list] = []
# the current day being processing, starting at 1.
current_day = 0
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector, trust_env=True) as session:
# the HTTP connections has been throttled by TCPConnector
for dates in chunks(list(date_range(start, end)), 30):
tasks = [
asyncio.create_task(get_daily_trades(symbol, candle_type, date, session))
for date in dates
]
for task in tasks:
current_day += 1
try:
result = await task
except Http404 as e:
if stop_on_404:
logger.debug(f"Failed to download {e.url} due to 404.")
# A 404 error on the first day indicates missing data
# on https://data.binance.vision, we provide the warning and the advice.
# https://github.com/freqtrade/freqtrade/blob/acc53065e5fa7ab5197073276306dc9dc3adbfa3/tests/exchange_online/test_binance_compare_ohlcv.py#L7
if current_day == 1:
logger.warning(
f"Fast download is unavailable due to missing data: "
f"{e.url}. Falling back to the slower REST API, "
"which may take more time."
)
if pair in ["BTC/USDT:USDT", "ETH/USDT:USDT", "BCH/USDT:USDT"]:
logger.warning(
f"To avoid the delay, you can first download {pair} using "
"`--timerange <start date>-20200101`, and then download the "
"remaining data with `--timerange 20200101-<end date>`."
)
else:
logger.warning(
f"Binance fast download for {pair} stopped at {e.date} due to "
f"missing data: {e.url}, falling back to rest API for the "
"remaining data, this can take more time."
)
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return results
except BaseException as e:
logger.warning(f"An exception raised: : {e}")
# Directly return the existing data, do not allow the gap within the data
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return results
else:
# Happy case
results.extend(result)
return results

View File

@@ -6,6 +6,7 @@ import ccxt
import pandas as pd
import pytest
from freqtrade.data.converter.trade_converter import trades_dict_to_list
from freqtrade.enums import CandleType, MarginMode, TradingMode
from freqtrade.exceptions import DependencyException, InvalidOrderException, OperationalException
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_seconds
@@ -1002,6 +1003,7 @@ def test_get_maintenance_ratio_and_amt_binance(
async def test__async_get_trade_history_id_binance(default_conf_usdt, mocker, fetch_trades_result):
default_conf_usdt["exchange"]["only_from_ccxt"] = True
exchange = get_patched_exchange(mocker, default_conf_usdt, exchange="binance")
async def mock_get_trade_hist(pair, *args, **kwargs):
@@ -1056,3 +1058,53 @@ async def test__async_get_trade_history_id_binance(default_conf_usdt, mocker, fe
# Clean up event loop to avoid warnings
exchange.close()
async def test__async_get_trade_history_id_binance_fast(
default_conf_usdt, mocker, fetch_trades_result
):
default_conf_usdt["exchange"]["only_from_ccxt"] = False
exchange = get_patched_exchange(mocker, default_conf_usdt, exchange="binance")
async def mock_get_trade_hist(pair, *args, **kwargs):
if "since" in kwargs:
pass
# older than initial call
# if kwargs["since"] < 1565798399752:
# return []
# else:
# # Don't expect to get here
# raise ValueError("Unexpected call")
# # return fetch_trades_result[:-2]
elif kwargs.get("params", {}).get(exchange._trades_pagination_arg) == "0":
# Return first 3
return fetch_trades_result[:-2]
# elif kwargs.get("params", {}).get(exchange._trades_pagination_arg) in (
# fetch_trades_result[-3]["id"],
# 1565798399752,
# ):
# # Return 2
# return fetch_trades_result[-3:-1]
# else:
# # Return last 2
# return fetch_trades_result[-2:]
pair = "ETH/BTC"
mocker.patch(
"freqtrade.exchange.binance.download_archive_trades",
return_value=(pair, trades_dict_to_list(fetch_trades_result[-2:])),
)
exchange._api_async.fetch_trades = MagicMock(side_effect=mock_get_trade_hist)
ret = await exchange._async_get_trade_history(
pair,
since=fetch_trades_result[0]["timestamp"],
until=fetch_trades_result[-1]["timestamp"] - 1,
)
assert ret[0] == pair
assert isinstance(ret[1], list)
# Clean up event loop to avoid warnings
exchange.close()

View File

@@ -14,11 +14,15 @@ from freqtrade.enums import CandleType
from freqtrade.exchange.binance_public_data import (
BadHttpStatus,
Http404,
binance_vision_trades_zip_url,
binance_vision_zip_name,
download_archive_ohlcv,
download_archive_trades,
get_daily_ohlcv,
get_daily_trades,
)
from freqtrade.util.datetime_helpers import dt_ts, dt_utc
from ft_client.test_client.test_rest_client import log_has_re
@pytest.fixture(scope="module")
@@ -337,3 +341,156 @@ async def test_get_daily_ohlcv(mocker, testdatadir):
with pytest.raises(zipfile.BadZipFile):
df = await get_daily_ohlcv(symbol, timeframe, CandleType.SPOT, date, session)
assert get.call_count == 4 # 1 + 3 default retries
async def test_download_archive_trades(mocker, caplog):
pair = "BTC/USDT"
since_ms = dt_ts(dt_utc(2020, 1, 1))
until_ms = dt_ts(dt_utc(2020, 1, 2))
markets = {"BTC/USDT": {"id": "BTCUSDT"}, "BTC/USDT:USDT": {"id": "BTCUSDT"}}
mocker.patch("freqtrade.exchange.binance_public_data.get_daily_trades", return_value=[[2, 3]])
pair1, res = await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == [[2, 3], [2, 3]]
mocker.patch(
"freqtrade.exchange.binance_public_data.get_daily_trades",
side_effect=Http404("xxx", dt_utc(2020, 1, 1), "http://example.com/something"),
)
pair1, res = await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == []
# exit on day 1
assert log_has_re("Fast download is unavailable", caplog)
# Test fail on day 2
caplog.clear()
mocker.patch(
"freqtrade.exchange.binance_public_data.get_daily_trades",
side_effect=[
[[2, 3]],
[[2, 3]],
Http404("xxx", dt_utc(2020, 1, 2), "http://example.com/something"),
[[2, 3]],
],
)
# Download 3 days
until_ms = dt_ts(dt_utc(2020, 1, 3))
pair1, res = await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == [[2, 3], [2, 3]]
assert log_has_re(r"Binance fast download .*stopped", caplog)
async def test_download_archive_trades_exception(mocker, caplog):
pair = "BTC/USDT"
since_ms = dt_ts(dt_utc(2020, 1, 1))
until_ms = dt_ts(dt_utc(2020, 1, 2))
markets = {"BTC/USDT": {"id": "BTCUSDT"}, "BTC/USDT:USDT": {"id": "BTCUSDT"}}
mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get", side_effect=RuntimeError
)
pair1, res = await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == []
mocker.patch(
"freqtrade.exchange.binance_public_data._download_archive_trades", side_effect=RuntimeError
)
await download_archive_trades(
CandleType.SPOT, pair, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert pair1 == pair
assert res == []
assert log_has_re("An exception occurred during fast trades download", caplog)
async def test_binance_vision_trades_zip_url():
url = binance_vision_trades_zip_url("BTCUSDT", CandleType.SPOT, dt_utc(2023, 10, 27))
assert (
url == "https://data.binance.vision/data/spot/daily/aggTrades/"
"BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip"
)
url = binance_vision_trades_zip_url("BTCUSDT", CandleType.FUTURES, dt_utc(2023, 10, 28))
assert (
url == "https://data.binance.vision/data/futures/um/daily/aggTrades/"
"BTCUSDT/BTCUSDT-aggTrades-2023-10-28.zip"
)
async def test_get_daily_trades(mocker, testdatadir):
symbol = "PEPEUSDT"
symbol_futures = "APEUSDT"
date = dt_utc(2024, 10, 28).date()
first_date = 1729987202368
last_date = 1730073596350
async with aiohttp.ClientSession() as session:
spot_path = (
testdatadir / "binance/binance_public_data/spot-PEPEUSDT-aggTrades-2024-10-27.zip"
)
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(spot_path.read_bytes(), 200),
)
res = await get_daily_trades(symbol, CandleType.SPOT, date, session)
assert get.call_count == 1
assert res[0][0] == first_date
assert res[-1][0] == last_date
futures_path = (
testdatadir / "binance/binance_public_data/futures-APEUSDT-aggTrades-2024-10-18.zip"
)
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(futures_path.read_bytes(), 200),
)
res_fut = await get_daily_trades(symbol_futures, CandleType.FUTURES, date, session)
assert get.call_count == 1
assert res_fut[0][0] == 1729209603958
assert res_fut[-1][0] == 1729295981272
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"", 404),
)
with pytest.raises(Http404):
await get_daily_trades(symbol, CandleType.SPOT, date, session, retry_delay=0)
assert get.call_count == 1
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"", 500),
)
mocker.patch("asyncio.sleep")
with pytest.raises(BadHttpStatus):
await get_daily_trades(symbol, CandleType.SPOT, date, session)
assert get.call_count == 4 # 1 + 3 default retries
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"nop", 200),
)
with pytest.raises(zipfile.BadZipFile):
await get_daily_trades(symbol, CandleType.SPOT, date, session)
assert get.call_count == 4 # 1 + 3 default retries

View File

@@ -2373,6 +2373,8 @@ def test_refresh_latest_trades(
caplog.set_level(logging.DEBUG)
use_trades_conf = default_conf
use_trades_conf["exchange"]["use_public_trades"] = True
use_trades_conf["exchange"]["only_from_ccxt"] = True
use_trades_conf["datadir"] = tmp_path
use_trades_conf["orderflow"] = {"max_candles": 1500}
exchange = get_patched_exchange(mocker, use_trades_conf)
@@ -3365,6 +3367,7 @@ async def test__async_fetch_trades_contract_size(
async def test__async_get_trade_history_id(
default_conf, mocker, exchange_name, fetch_trades_result
):
default_conf["exchange"]["only_from_ccxt"] = True
exchange = get_patched_exchange(mocker, default_conf, exchange=exchange_name)
if exchange._trades_pagination != "id":
exchange.close()