diff --git a/freqtrade/exchange/binance_public_data.py b/freqtrade/exchange/binance_public_data.py index 42eda09bf..56eae7917 100644 --- a/freqtrade/exchange/binance_public_data.py +++ b/freqtrade/exchange/binance_public_data.py @@ -3,6 +3,7 @@ Fetch daily-archived OHLCV data from https://data.binance.vision/ """ import asyncio +import csv import logging import zipfile from datetime import date, timedelta @@ -212,6 +213,19 @@ def binance_vision_ohlcv_zip_url( return url +def binance_vision_trades_zip_url(asset_type_url_segment: str, symbol: str, date: date) -> str: + """ + example urls: + https://data.binance.vision/data/spot/daily/aggTrades/BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip + https://data.binance.vision/data/futures/um/daily/aggTrades/BTCUSDT/BTCUSDT-aggTrades-2023-10-27.zip + """ + url = ( + f"https://data.binance.vision/data/{asset_type_url_segment}/daily/aggTrades/{symbol}" + f"/{symbol}-aggTrades-{date.strftime('%Y-%m-%d')}.zip" + ) + return url + + async def get_daily_ohlcv( symbol: str, timeframe: str, @@ -280,3 +294,185 @@ async def get_daily_ohlcv( if isinstance(e, Http404) or retry > retry_count: logger.debug(f"Failed to get data from {url}: {e}") raise + + +async def download_archive_trades( + candle_type: CandleType, + pair: str, + *, + since_ms: int, + until_ms: int | None, + markets: dict[str, Any], + stop_on_404: bool = True, +) -> tuple[str, list[list]]: + try: + asset_type_url_segment = candle_type_to_url_segment(candle_type) + symbol = markets[pair]["id"] + + last_available_date = dt_now() - timedelta(days=2) + + start = dt_from_ts(since_ms) + end = dt_from_ts(until_ms) if until_ms else dt_now() + end = min(end, last_available_date) + if start >= end: + return DataFrame() + await _download_archive_ohlcv(asset_type_url_segment, symbol, pair, start, end, stop_on_404) + + except Exception as e: + logger.warning( + "An exception occurred during fast traes download from Binance, falling back to " + "the slower REST API, this can take a lot more time.", + exc_info=e, + ) + return pair, [] + + +async def get_daily_trades( + asset_type_url_segment: str, + symbol: str, + date: date, + session: aiohttp.ClientSession, + retry_count: int = 3, + retry_delay: float = 0.0, +) -> list[list]: + """ + Get daily OHLCV from https://data.binance.vision + See https://github.com/binance/binance-public-data + + :asset_type_url_segment: `spot` or `futures/um` + :symbol: binance symbol name, e.g. BTCUSDT + :date: the returned DataFrame will cover the entire day of `date` in UTC + :session: an aiohttp.ClientSession instance + :retry_count: times to retry before returning the exceptions + :retry_delay: the time to wait before every retry + :return: a list containing trades in DEFAULT_TRADES_COLUMNS format + """ + + url = binance_vision_trades_zip_url(asset_type_url_segment, symbol, date) + + logger.debug(f"download trades data from binance: {url}") + + retry = 0 + while True: + if retry > 0: + sleep_secs = retry * retry_delay + logger.debug( + f"[{retry}/{retry_count}] retry to download {url} after {sleep_secs} seconds" + ) + await asyncio.sleep(sleep_secs) + try: + async with session.get(url) as resp: + if resp.status == 200: + content = await resp.read() + logger.debug(f"Successfully downloaded {url}") + with zipfile.ZipFile(BytesIO(content)) as zipf: + with zipf.open(zipf.namelist()[0]) as csvf: + # https://github.com/binance/binance-public-data/issues/283 + first_byte = csvf.read(1)[0] + if chr(first_byte).isdigit(): + # spot + header = [ + "id", + "price", + "qty", + "quote_qty", + "time", + "is_buyer_maker", + "is_best_match", + ] + else: + # futures + header = 0 + names = [ + "id", + "price", + "qty", + "quote_qty", + "time", + "is_buyer_maker", + ] + csvf.seek(0) + + df = pd.read_csv( + csvf, + usecols=[0, 1, 2, 3, 4, 5], + names=names, + header=header, + ) + df["cost"] = df["price"] * df["qty"] + + return df[].to_records(index=False).tolist() + elif resp.status == 404: + logger.debug(f"Failed to download {url}") + raise Http404(f"404: {url}", date, url) + else: + raise BadHttpStatus(f"{resp.status} - {resp.reason}") + except Exception as e: + retry += 1 + if isinstance(e, Http404) or retry > retry_count: + logger.debug(f"Failed to get data from {url}: {e}") + raise + + +async def _download_archive_trades( + asset_type_url_segment: str, + symbol: str, + pair: str, + start: date, + end: date, + stop_on_404: bool, +) -> list[list]: + # daily dataframes, `None` indicates missing data in that day (when `stop_on_404` is False) + result: list[list] = [] + # the current day being processing, starting at 1. + current_day = 0 + + connector = aiohttp.TCPConnector(limit=100) + async with aiohttp.ClientSession(connector=connector, trust_env=True) as session: + # the HTTP connections has been throttled by TCPConnector + for dates in chunks(list(date_range(start, end)), 1000): + tasks = [ + asyncio.create_task(get_daily_trades(asset_type_url_segment, symbol, date, session)) + for date in dates + ] + for task in tasks: + current_day += 1 + try: + df = await task + except Http404 as e: + if stop_on_404: + logger.debug(f"Failed to download {e.url} due to 404.") + + # A 404 error on the first day indicates missing data + # on https://data.binance.vision, we provide the warning and the advice. + # https://github.com/freqtrade/freqtrade/blob/acc53065e5fa7ab5197073276306dc9dc3adbfa3/tests/exchange_online/test_binance_compare_ohlcv.py#L7 + if current_day == 1: + logger.warning( + f"Fast download is unavailable due to missing data: " + f"{e.url}. Falling back to the slower REST API, " + "which may take more time." + ) + # if pair in ["BTC/USDT:USDT", "ETH/USDT:USDT", "BCH/USDT:USDT"]: + # logger.warning( + # f"To avoid the delay, you can first download {pair} using " + # "`--timerange -20200101`, and then download the " + # "remaining data with `--timerange 20200101-`." + # ) + else: + logger.warning( + f"Binance fast download for {pair} stopped at {e.date} due to " + f"missing data: {e.url}, falling back to rest API for the " + "remaining data, this can take more time." + ) + await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :]) + return concat_safe(dfs) + else: + dfs.append(None) + except BaseException as e: + logger.warning(f"An exception raised: : {e}") + # Directly return the existing data, do not allow the gap within the data + await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :]) + return concat_safe(dfs) + else: + dfs.append(df) + return concat_safe(dfs)