diff --git a/freqtrade/exchange/binance_public_data.py b/freqtrade/exchange/binance_public_data.py index 23cb02989..adc1ea511 100644 --- a/freqtrade/exchange/binance_public_data.py +++ b/freqtrade/exchange/binance_public_data.py @@ -1,2 +1,179 @@ -async def fetch_ohlcv(*args, **kwargs): +""" +Fetch daily-archived OHLCV data from https://data.binance.vision/ +""" + +import asyncio +import datetime +import io +import itertools +import logging +import zipfile + +import aiohttp +import pandas as pd +from pandas import DataFrame + +from freqtrade.enums import CandleType +from freqtrade.util.datetime_helpers import dt_from_ts, dt_now + + +logger = logging.getLogger(__name__) + + +class BadHttpStatus(Exception): + """Not 200/404""" + pass + + +async def fetch_ohlcv( + candle_type: CandleType, pair: str, timeframe: str, since_ms: int, until_ms: int | None +) -> DataFrame: + """ + Fetch OHLCV data from https://data.binance.vision/ + :candle_type: Currently only spot and futures are supported + :param until_ms: `None` indicates the timestamp of the latest available data + :return: None if no data available in the time range + """ + if candle_type == CandleType.SPOT: + asset_type = "spot" + elif candle_type == CandleType.FUTURES: + asset_type = "futures/um" + else: + raise ValueError(f"Unsupported CandleType: {candle_type}") + symbol = symbol_ccxt_to_binance(pair) + start = dt_from_ts(since_ms) + end = dt_from_ts(until_ms) if until_ms else dt_now() + + # We use two days ago as the last available day because the daily archives are daily uploaded + # and have several hours delay + last_available_date = dt_now() - datetime.timedelta(days=2) + end = min(end, last_available_date) + if start >= end: + return DataFrame() + return await _fetch_ohlcv(asset_type, symbol, timeframe, start, end) + + +def symbol_ccxt_to_binance(symbol: str) -> str: + """ + Convert ccxt symbol notation to binance notation + e.g. BTC/USDT -> BTCUSDT, BTC/USDT:USDT -> BTCUSDT + """ + if ":" in symbol: + parts = symbol.split() + if len(parts) != 2: + raise ValueError(f"Cannot recognize symbol: {symbol}") + return parts[0].replace("/", "") + else: + return symbol.replace("/", "") + + +def concat(dfs) -> DataFrame: + if all(df is None for df in dfs): + return DataFrame() + else: + return pd.concat(dfs) + + +async def _fetch_ohlcv(asset_type, symbol, timeframe, start, end) -> DataFrame: + dfs: list[DataFrame | None] = [] + + connector = aiohttp.TCPConnector(limit=100) + async with aiohttp.ClientSession(connector=connector) as session: + coroutines = [ + get_daily_ohlcv(asset_type, symbol, timeframe, date, session) + for date in date_range(start, end) + ] + # the HTTP connections has been throttled by TCPConnector + for batch in itertools.batched(coroutines, 1000): + results = await asyncio.gather(*batch) + for result in results: + if isinstance(result, BaseException): + logger.warning(f"An exception raised: : {result}") + # Directly return the existing data, do not allow the gap + # between the data + return concat(dfs) + else: + dfs.append(result) + return concat(dfs) + + +def date_range(start: datetime.date, end: datetime.date): + date = start + while date <= end: + yield date + date += datetime.timedelta(days=1) + + +def format_date(date: datetime.date) -> str: + return date.strftime("%Y-%m-%d") + + +def zip_name(symbol: str, timeframe: str, date: datetime.date) -> str: + return f"{symbol}-{timeframe}-{format_date(date)}.zip" + + +async def get_daily_ohlcv( + asset_type: str, + symbol: str, + timeframe: str, + date: datetime.date, + session: aiohttp.ClientSession, + retry_count: int = 3, +) -> DataFrame | None: + """ + Get daily OHLCV from https://data.binance.vision + See https://github.com/binance/binance-public-data + """ + + # example urls: + # https://data.binance.vision/data/spot/daily/klines/BTCUSDT/1s/BTCUSDT-1s-2023-10-27.zip + # https://data.binance.vision/data/futures/um/daily/klines/BTCUSDT/1h/BTCUSDT-1h-2023-10-27.zip + url = ( + f"https://data.binance.vision/data/{asset_type}/daily/klines/{symbol}/{timeframe}/" + f"{zip_name(symbol, timeframe, date)}" + ) + + logger.debug(f"download data from binance: {url}") + + retry = 0 + while True: + if retry > 0: + sleep_secs = retry * 0.5 + logger.debug( + f"[{retry}/{retry_count}] retry to download {url} after {sleep_secs} seconds" + ) + await asyncio.sleep(sleep_secs) + try: + async with session.get(url) as resp: + if resp.status == 200: + content = await resp.read() + logger.debug(f"Successfully downloaded {url}") + with zipfile.ZipFile(io.BytesIO(content)) as zipf: + with zipf.open(zipf.namelist()[0]) as csvf: + # https://github.com/binance/binance-public-data/issues/283 + first_byte = csvf.read(1)[0] + if chr(first_byte).isdigit(): + header = None + else: + header = 0 + csvf.seek(0) + + df = pd.read_csv( + csvf, + usecols=[0, 1, 2, 3, 4, 5], + names=["date", "open", "high", "low", "close", "volume"], + header=header, + ) + df["date"] = pd.to_datetime(df["date"], unit="ms", utc=True) + return df + elif resp.status == 404: + logger.warning(f"No data available for {symbol} in {format_date(date)}") + return None + else: + raise BadHttpStatus(f"{resp.status} - {resp.reason}") + except Exception as e: + retry += 1 + if retry >= retry_count: + logger.warning(f"Failed to get data from {url}: {e}") + raise diff --git a/tests/exchange/test_binance_public_data.py b/tests/exchange/test_binance_public_data.py new file mode 100644 index 000000000..5e0a414f2 --- /dev/null +++ b/tests/exchange/test_binance_public_data.py @@ -0,0 +1,192 @@ +import datetime +import io +import re +import zipfile +from datetime import timedelta + +import aiohttp +import pandas as pd +import pytest + +from freqtrade.enums import CandleType +from freqtrade.exchange.binance_public_data import ( + BadHttpStatus, + fetch_ohlcv, + get_daily_ohlcv, + zip_name, +) +from freqtrade.util.datetime_helpers import dt_ts, dt_utc + + +# spot klines archive csv file format, the futures/um klines don't have the header line +# +# open_time,open,high,low,close,volume,close_time,quote_volume,count,taker_buy_volume,taker_buy_quote_volume,ignore # noqa: E501 +# 1698364800000,34161.6,34182.5,33977.4,34024.2,409953,1698368399999,1202.97118037,15095,192220,564.12041453,0 # noqa: E501 +# 1698368400000,34024.2,34060.1,33776.4,33848.4,740960,1698371999999,2183.75671155,23938,368266,1085.17080793,0 # noqa: E501 +# 1698372000000,33848.5,34150.0,33815.1,34094.2,390376,1698375599999,1147.73267094,13854,231446,680.60405822,0 # noqa: E501 + + +def make_daily_df(date, timeframe): + start = dt_utc(date.year, date.month, date.day) + end = start + timedelta(days=1) + date_col = pd.date_range(start, end, freq=timeframe.replace("m", "min"), inclusive="left") + cols = ( + "open_time,open,high,low,close,volume,close_time,quote_volume,count,taker_buy_volume," + "taker_buy_quote_volume,ignore" + ) + df = pd.DataFrame(columns=cols.split(","), dtype=float) + df["open_time"] = date_col.astype("int64") // 10**6 + df["open"] = df["high"] = df["low"] = df["close"] = df["volume"] = 1.0 + return df + + +def make_daily_zip(asset_type, symbol, timeframe, date) -> bytes: + df = make_daily_df(date, timeframe) + if asset_type == "spot": + header = True + elif asset_type == "futures/um": + header = None + else: + raise ValueError + csv = df.to_csv(index=False, header=header) + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "w") as zipf: + zipf.writestr(zip_name(symbol, timeframe, date), csv) + return zip_buffer.getvalue() + + +class MockResponse: + def __init__(self, content, status, reason=""): + self._content = content + self.status = status + self.reason = reason + + async def read(self): + return self._content + + async def __aexit__(self, exc_type, exc, tb): + pass + + async def __aenter__(self): + return self + + +def make_response_from_url(start_date, end_date): + def make_response(url): + pattern = ( + r"https://data.binance.vision/data/(?Pspot|futures/um)/daily/klines/" + r"(?P.*?)/(?P.*?)/(?P=symbol)-(?P=timeframe)-" + r"(?P\d{4}-\d{2}-\d{2}).zip" + ) + m = re.match(pattern, url) + if not m: + return MockResponse(content="", status=404) + + date = datetime.datetime.strptime(m["date"], "%Y-%m-%d").date() + if date < start_date or date > end_date: + return MockResponse(content="", status=404) + + zip_file = make_daily_zip(m["asset_type"], m["symbol"], m["timeframe"], date) + return MockResponse(content=zip_file, status=200) + + return make_response + + +@pytest.mark.parametrize( + "since,until,first_date,last_date", + [ + (dt_utc(2020, 1, 1), dt_utc(2020, 1, 2), dt_utc(2020, 1, 1), dt_utc(2020, 1, 2, 23)), + ( + dt_utc(2020, 1, 1), + dt_utc(2020, 1, 1, 23, 59, 59), + dt_utc(2020, 1, 1), + dt_utc(2020, 1, 1, 23), + ), + ( + dt_utc(2020, 1, 1), + dt_utc(2020, 1, 5), + dt_utc(2020, 1, 1), + dt_utc(2020, 1, 3, 23), + ), + ( + dt_utc(2019, 1, 1), + dt_utc(2020, 1, 5), + dt_utc(2020, 1, 1), + dt_utc(2020, 1, 3, 23), + ), + ( + dt_utc(2019, 1, 1), + dt_utc(2019, 1, 5), + None, + None, + ), + ( + dt_utc(2021, 1, 1), + dt_utc(2021, 1, 5), + None, + None, + ), + ( + dt_utc(2020, 1, 2), + None, + dt_utc(2020, 1, 2), + dt_utc(2020, 1, 3, 23), + ), + ], +) +async def test_fetch_ohlcv(mocker, since, until, first_date, last_date): + history_start = dt_utc(2020, 1, 1).date() + history_end = dt_utc(2020, 1, 3).date() + candle_type = CandleType.SPOT + pair = "BTC/USDT" + timeframe = "1h" + + since_ms = dt_ts(since) + until_ms = dt_ts(until) + + mocker.patch( + "aiohttp.ClientSession.get", side_effect=make_response_from_url(history_start, history_end) + ) + df = await fetch_ohlcv(candle_type, pair, timeframe, since_ms, until_ms) + + if df.empty: + assert first_date is None and last_date is None + else: + assert df["date"].iloc[0] == first_date + assert df["date"].iloc[-1] == last_date + + +async def test_get_daily_ohlcv(mocker, testdatadir): + symbol = "BTCUSDT" + timeframe = "1h" + date = dt_utc(2024, 10, 28).date() + first_date = dt_utc(2024, 10, 28) + last_date = dt_utc(2024, 10, 28, 23) + + async with aiohttp.ClientSession() as session: + path = testdatadir / "binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip" + mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(path.read_bytes(), 200)) + df = await get_daily_ohlcv("spot", symbol, timeframe, date, session) + assert df["date"].iloc[0] == first_date + assert df["date"].iloc[-1] == last_date + + path = ( + testdatadir / "binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip" + ) + mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(path.read_bytes(), 200)) + df = await get_daily_ohlcv("futures/um", symbol, timeframe, date, session) + assert df["date"].iloc[0] == first_date + assert df["date"].iloc[-1] == last_date + + mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(b"", 404)) + df = await get_daily_ohlcv("spot", symbol, timeframe, date, session) + assert df is None + + mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(b"", 500)) + mocker.patch("asyncio.sleep") + with pytest.raises(BadHttpStatus): + df = await get_daily_ohlcv("spot", symbol, timeframe, date, session) + + mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(b"nop", 200)) + with pytest.raises(zipfile.BadZipFile): + df = await get_daily_ohlcv("spot", symbol, timeframe, date, session) diff --git a/tests/testdata/binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip b/tests/testdata/binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip new file mode 100644 index 000000000..5bda1b271 Binary files /dev/null and b/tests/testdata/binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip differ diff --git a/tests/testdata/binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip b/tests/testdata/binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip new file mode 100644 index 000000000..b94090741 Binary files /dev/null and b/tests/testdata/binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip differ