mirror of
https://github.com/freqtrade/freqtrade.git
synced 2025-12-03 10:33:08 +00:00
feat: implement fetch data from data.binance.vision
This commit is contained in:
@@ -1,2 +1,179 @@
|
||||
async def fetch_ohlcv(*args, **kwargs):
|
||||
"""
|
||||
Fetch daily-archived OHLCV data from https://data.binance.vision/
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import io
|
||||
import itertools
|
||||
import logging
|
||||
import zipfile
|
||||
|
||||
import aiohttp
|
||||
import pandas as pd
|
||||
from pandas import DataFrame
|
||||
|
||||
from freqtrade.enums import CandleType
|
||||
from freqtrade.util.datetime_helpers import dt_from_ts, dt_now
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BadHttpStatus(Exception):
|
||||
"""Not 200/404"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
async def fetch_ohlcv(
|
||||
candle_type: CandleType, pair: str, timeframe: str, since_ms: int, until_ms: int | None
|
||||
) -> DataFrame:
|
||||
"""
|
||||
Fetch OHLCV data from https://data.binance.vision/
|
||||
:candle_type: Currently only spot and futures are supported
|
||||
:param until_ms: `None` indicates the timestamp of the latest available data
|
||||
:return: None if no data available in the time range
|
||||
"""
|
||||
if candle_type == CandleType.SPOT:
|
||||
asset_type = "spot"
|
||||
elif candle_type == CandleType.FUTURES:
|
||||
asset_type = "futures/um"
|
||||
else:
|
||||
raise ValueError(f"Unsupported CandleType: {candle_type}")
|
||||
symbol = symbol_ccxt_to_binance(pair)
|
||||
start = dt_from_ts(since_ms)
|
||||
end = dt_from_ts(until_ms) if until_ms else dt_now()
|
||||
|
||||
# We use two days ago as the last available day because the daily archives are daily uploaded
|
||||
# and have several hours delay
|
||||
last_available_date = dt_now() - datetime.timedelta(days=2)
|
||||
end = min(end, last_available_date)
|
||||
if start >= end:
|
||||
return DataFrame()
|
||||
return await _fetch_ohlcv(asset_type, symbol, timeframe, start, end)
|
||||
|
||||
|
||||
def symbol_ccxt_to_binance(symbol: str) -> str:
|
||||
"""
|
||||
Convert ccxt symbol notation to binance notation
|
||||
e.g. BTC/USDT -> BTCUSDT, BTC/USDT:USDT -> BTCUSDT
|
||||
"""
|
||||
if ":" in symbol:
|
||||
parts = symbol.split()
|
||||
if len(parts) != 2:
|
||||
raise ValueError(f"Cannot recognize symbol: {symbol}")
|
||||
return parts[0].replace("/", "")
|
||||
else:
|
||||
return symbol.replace("/", "")
|
||||
|
||||
|
||||
def concat(dfs) -> DataFrame:
|
||||
if all(df is None for df in dfs):
|
||||
return DataFrame()
|
||||
else:
|
||||
return pd.concat(dfs)
|
||||
|
||||
|
||||
async def _fetch_ohlcv(asset_type, symbol, timeframe, start, end) -> DataFrame:
|
||||
dfs: list[DataFrame | None] = []
|
||||
|
||||
connector = aiohttp.TCPConnector(limit=100)
|
||||
async with aiohttp.ClientSession(connector=connector) as session:
|
||||
coroutines = [
|
||||
get_daily_ohlcv(asset_type, symbol, timeframe, date, session)
|
||||
for date in date_range(start, end)
|
||||
]
|
||||
# the HTTP connections has been throttled by TCPConnector
|
||||
for batch in itertools.batched(coroutines, 1000):
|
||||
results = await asyncio.gather(*batch)
|
||||
for result in results:
|
||||
if isinstance(result, BaseException):
|
||||
logger.warning(f"An exception raised: : {result}")
|
||||
# Directly return the existing data, do not allow the gap
|
||||
# between the data
|
||||
return concat(dfs)
|
||||
else:
|
||||
dfs.append(result)
|
||||
return concat(dfs)
|
||||
|
||||
|
||||
def date_range(start: datetime.date, end: datetime.date):
|
||||
date = start
|
||||
while date <= end:
|
||||
yield date
|
||||
date += datetime.timedelta(days=1)
|
||||
|
||||
|
||||
def format_date(date: datetime.date) -> str:
|
||||
return date.strftime("%Y-%m-%d")
|
||||
|
||||
|
||||
def zip_name(symbol: str, timeframe: str, date: datetime.date) -> str:
|
||||
return f"{symbol}-{timeframe}-{format_date(date)}.zip"
|
||||
|
||||
|
||||
async def get_daily_ohlcv(
|
||||
asset_type: str,
|
||||
symbol: str,
|
||||
timeframe: str,
|
||||
date: datetime.date,
|
||||
session: aiohttp.ClientSession,
|
||||
retry_count: int = 3,
|
||||
) -> DataFrame | None:
|
||||
"""
|
||||
Get daily OHLCV from https://data.binance.vision
|
||||
See https://github.com/binance/binance-public-data
|
||||
"""
|
||||
|
||||
# example urls:
|
||||
# https://data.binance.vision/data/spot/daily/klines/BTCUSDT/1s/BTCUSDT-1s-2023-10-27.zip
|
||||
# https://data.binance.vision/data/futures/um/daily/klines/BTCUSDT/1h/BTCUSDT-1h-2023-10-27.zip
|
||||
url = (
|
||||
f"https://data.binance.vision/data/{asset_type}/daily/klines/{symbol}/{timeframe}/"
|
||||
f"{zip_name(symbol, timeframe, date)}"
|
||||
)
|
||||
|
||||
logger.debug(f"download data from binance: {url}")
|
||||
|
||||
retry = 0
|
||||
while True:
|
||||
if retry > 0:
|
||||
sleep_secs = retry * 0.5
|
||||
logger.debug(
|
||||
f"[{retry}/{retry_count}] retry to download {url} after {sleep_secs} seconds"
|
||||
)
|
||||
await asyncio.sleep(sleep_secs)
|
||||
try:
|
||||
async with session.get(url) as resp:
|
||||
if resp.status == 200:
|
||||
content = await resp.read()
|
||||
logger.debug(f"Successfully downloaded {url}")
|
||||
with zipfile.ZipFile(io.BytesIO(content)) as zipf:
|
||||
with zipf.open(zipf.namelist()[0]) as csvf:
|
||||
# https://github.com/binance/binance-public-data/issues/283
|
||||
first_byte = csvf.read(1)[0]
|
||||
if chr(first_byte).isdigit():
|
||||
header = None
|
||||
else:
|
||||
header = 0
|
||||
csvf.seek(0)
|
||||
|
||||
df = pd.read_csv(
|
||||
csvf,
|
||||
usecols=[0, 1, 2, 3, 4, 5],
|
||||
names=["date", "open", "high", "low", "close", "volume"],
|
||||
header=header,
|
||||
)
|
||||
df["date"] = pd.to_datetime(df["date"], unit="ms", utc=True)
|
||||
return df
|
||||
elif resp.status == 404:
|
||||
logger.warning(f"No data available for {symbol} in {format_date(date)}")
|
||||
return None
|
||||
else:
|
||||
raise BadHttpStatus(f"{resp.status} - {resp.reason}")
|
||||
except Exception as e:
|
||||
retry += 1
|
||||
if retry >= retry_count:
|
||||
logger.warning(f"Failed to get data from {url}: {e}")
|
||||
raise
|
||||
|
||||
192
tests/exchange/test_binance_public_data.py
Normal file
192
tests/exchange/test_binance_public_data.py
Normal file
@@ -0,0 +1,192 @@
|
||||
import datetime
|
||||
import io
|
||||
import re
|
||||
import zipfile
|
||||
from datetime import timedelta
|
||||
|
||||
import aiohttp
|
||||
import pandas as pd
|
||||
import pytest
|
||||
|
||||
from freqtrade.enums import CandleType
|
||||
from freqtrade.exchange.binance_public_data import (
|
||||
BadHttpStatus,
|
||||
fetch_ohlcv,
|
||||
get_daily_ohlcv,
|
||||
zip_name,
|
||||
)
|
||||
from freqtrade.util.datetime_helpers import dt_ts, dt_utc
|
||||
|
||||
|
||||
# spot klines archive csv file format, the futures/um klines don't have the header line
|
||||
#
|
||||
# open_time,open,high,low,close,volume,close_time,quote_volume,count,taker_buy_volume,taker_buy_quote_volume,ignore # noqa: E501
|
||||
# 1698364800000,34161.6,34182.5,33977.4,34024.2,409953,1698368399999,1202.97118037,15095,192220,564.12041453,0 # noqa: E501
|
||||
# 1698368400000,34024.2,34060.1,33776.4,33848.4,740960,1698371999999,2183.75671155,23938,368266,1085.17080793,0 # noqa: E501
|
||||
# 1698372000000,33848.5,34150.0,33815.1,34094.2,390376,1698375599999,1147.73267094,13854,231446,680.60405822,0 # noqa: E501
|
||||
|
||||
|
||||
def make_daily_df(date, timeframe):
|
||||
start = dt_utc(date.year, date.month, date.day)
|
||||
end = start + timedelta(days=1)
|
||||
date_col = pd.date_range(start, end, freq=timeframe.replace("m", "min"), inclusive="left")
|
||||
cols = (
|
||||
"open_time,open,high,low,close,volume,close_time,quote_volume,count,taker_buy_volume,"
|
||||
"taker_buy_quote_volume,ignore"
|
||||
)
|
||||
df = pd.DataFrame(columns=cols.split(","), dtype=float)
|
||||
df["open_time"] = date_col.astype("int64") // 10**6
|
||||
df["open"] = df["high"] = df["low"] = df["close"] = df["volume"] = 1.0
|
||||
return df
|
||||
|
||||
|
||||
def make_daily_zip(asset_type, symbol, timeframe, date) -> bytes:
|
||||
df = make_daily_df(date, timeframe)
|
||||
if asset_type == "spot":
|
||||
header = True
|
||||
elif asset_type == "futures/um":
|
||||
header = None
|
||||
else:
|
||||
raise ValueError
|
||||
csv = df.to_csv(index=False, header=header)
|
||||
zip_buffer = io.BytesIO()
|
||||
with zipfile.ZipFile(zip_buffer, "w") as zipf:
|
||||
zipf.writestr(zip_name(symbol, timeframe, date), csv)
|
||||
return zip_buffer.getvalue()
|
||||
|
||||
|
||||
class MockResponse:
|
||||
def __init__(self, content, status, reason=""):
|
||||
self._content = content
|
||||
self.status = status
|
||||
self.reason = reason
|
||||
|
||||
async def read(self):
|
||||
return self._content
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
pass
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def make_response_from_url(start_date, end_date):
|
||||
def make_response(url):
|
||||
pattern = (
|
||||
r"https://data.binance.vision/data/(?P<asset_type>spot|futures/um)/daily/klines/"
|
||||
r"(?P<symbol>.*?)/(?P<timeframe>.*?)/(?P=symbol)-(?P=timeframe)-"
|
||||
r"(?P<date>\d{4}-\d{2}-\d{2}).zip"
|
||||
)
|
||||
m = re.match(pattern, url)
|
||||
if not m:
|
||||
return MockResponse(content="", status=404)
|
||||
|
||||
date = datetime.datetime.strptime(m["date"], "%Y-%m-%d").date()
|
||||
if date < start_date or date > end_date:
|
||||
return MockResponse(content="", status=404)
|
||||
|
||||
zip_file = make_daily_zip(m["asset_type"], m["symbol"], m["timeframe"], date)
|
||||
return MockResponse(content=zip_file, status=200)
|
||||
|
||||
return make_response
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"since,until,first_date,last_date",
|
||||
[
|
||||
(dt_utc(2020, 1, 1), dt_utc(2020, 1, 2), dt_utc(2020, 1, 1), dt_utc(2020, 1, 2, 23)),
|
||||
(
|
||||
dt_utc(2020, 1, 1),
|
||||
dt_utc(2020, 1, 1, 23, 59, 59),
|
||||
dt_utc(2020, 1, 1),
|
||||
dt_utc(2020, 1, 1, 23),
|
||||
),
|
||||
(
|
||||
dt_utc(2020, 1, 1),
|
||||
dt_utc(2020, 1, 5),
|
||||
dt_utc(2020, 1, 1),
|
||||
dt_utc(2020, 1, 3, 23),
|
||||
),
|
||||
(
|
||||
dt_utc(2019, 1, 1),
|
||||
dt_utc(2020, 1, 5),
|
||||
dt_utc(2020, 1, 1),
|
||||
dt_utc(2020, 1, 3, 23),
|
||||
),
|
||||
(
|
||||
dt_utc(2019, 1, 1),
|
||||
dt_utc(2019, 1, 5),
|
||||
None,
|
||||
None,
|
||||
),
|
||||
(
|
||||
dt_utc(2021, 1, 1),
|
||||
dt_utc(2021, 1, 5),
|
||||
None,
|
||||
None,
|
||||
),
|
||||
(
|
||||
dt_utc(2020, 1, 2),
|
||||
None,
|
||||
dt_utc(2020, 1, 2),
|
||||
dt_utc(2020, 1, 3, 23),
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_fetch_ohlcv(mocker, since, until, first_date, last_date):
|
||||
history_start = dt_utc(2020, 1, 1).date()
|
||||
history_end = dt_utc(2020, 1, 3).date()
|
||||
candle_type = CandleType.SPOT
|
||||
pair = "BTC/USDT"
|
||||
timeframe = "1h"
|
||||
|
||||
since_ms = dt_ts(since)
|
||||
until_ms = dt_ts(until)
|
||||
|
||||
mocker.patch(
|
||||
"aiohttp.ClientSession.get", side_effect=make_response_from_url(history_start, history_end)
|
||||
)
|
||||
df = await fetch_ohlcv(candle_type, pair, timeframe, since_ms, until_ms)
|
||||
|
||||
if df.empty:
|
||||
assert first_date is None and last_date is None
|
||||
else:
|
||||
assert df["date"].iloc[0] == first_date
|
||||
assert df["date"].iloc[-1] == last_date
|
||||
|
||||
|
||||
async def test_get_daily_ohlcv(mocker, testdatadir):
|
||||
symbol = "BTCUSDT"
|
||||
timeframe = "1h"
|
||||
date = dt_utc(2024, 10, 28).date()
|
||||
first_date = dt_utc(2024, 10, 28)
|
||||
last_date = dt_utc(2024, 10, 28, 23)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
path = testdatadir / "binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip"
|
||||
mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(path.read_bytes(), 200))
|
||||
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
|
||||
assert df["date"].iloc[0] == first_date
|
||||
assert df["date"].iloc[-1] == last_date
|
||||
|
||||
path = (
|
||||
testdatadir / "binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip"
|
||||
)
|
||||
mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(path.read_bytes(), 200))
|
||||
df = await get_daily_ohlcv("futures/um", symbol, timeframe, date, session)
|
||||
assert df["date"].iloc[0] == first_date
|
||||
assert df["date"].iloc[-1] == last_date
|
||||
|
||||
mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(b"", 404))
|
||||
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
|
||||
assert df is None
|
||||
|
||||
mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(b"", 500))
|
||||
mocker.patch("asyncio.sleep")
|
||||
with pytest.raises(BadHttpStatus):
|
||||
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
|
||||
|
||||
mocker.patch("aiohttp.ClientSession.get", return_value=MockResponse(b"nop", 200))
|
||||
with pytest.raises(zipfile.BadZipFile):
|
||||
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
|
||||
BIN
tests/testdata/binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip
vendored
Normal file
BIN
tests/testdata/binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip
vendored
Normal file
Binary file not shown.
BIN
tests/testdata/binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip
vendored
Normal file
BIN
tests/testdata/binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip
vendored
Normal file
Binary file not shown.
Reference in New Issue
Block a user