Merge pull request #10858 from xzmeng/binance-public-data

Improve binance historic OHLCV download speed
This commit is contained in:
Matthias
2024-12-01 19:41:40 +01:00
committed by GitHub
11 changed files with 957 additions and 56 deletions

View File

@@ -128,8 +128,8 @@ def test_load_data_with_new_pair_1min(
"""
Test load_pair_history() with 1 min timeframe
"""
mocker.patch(f"{EXMS}.get_historic_ohlcv", return_value=ohlcv_history)
exchange = get_patched_exchange(mocker, default_conf)
mocker.patch.object(exchange, "get_historic_ohlcv", return_value=ohlcv_history)
file = tmp_path / "MEME_BTC-1m.feather"
# do not download a new pair if refresh_pairs isn't set
@@ -306,8 +306,8 @@ def test_load_cached_data_for_updating(testdatadir) -> None:
def test_download_pair_history(
ohlcv_history, mocker, default_conf, tmp_path, candle_type, subdir, file_tail
) -> None:
mocker.patch(f"{EXMS}.get_historic_ohlcv", return_value=ohlcv_history)
exchange = get_patched_exchange(mocker, default_conf)
mocker.patch.object(exchange, "get_historic_ohlcv", return_value=ohlcv_history)
file1_1 = tmp_path / f"{subdir}MEME_BTC-1m{file_tail}.feather"
file1_5 = tmp_path / f"{subdir}MEME_BTC-5m{file_tail}.feather"
file2_1 = tmp_path / f"{subdir}CFI_BTC-1m{file_tail}.feather"
@@ -357,8 +357,8 @@ def test_download_pair_history2(mocker, default_conf, testdatadir, ohlcv_history
"freqtrade.data.history.datahandlers.featherdatahandler.FeatherDataHandler.ohlcv_store",
return_value=None,
)
mocker.patch(f"{EXMS}.get_historic_ohlcv", return_value=ohlcv_history)
exchange = get_patched_exchange(mocker, default_conf)
mocker.patch.object(exchange, "get_historic_ohlcv", return_value=ohlcv_history)
_download_pair_history(
datadir=testdatadir,
exchange=exchange,

View File

@@ -1,14 +1,17 @@
from datetime import datetime, timezone
from datetime import datetime, timedelta
from random import randint
from unittest.mock import MagicMock, PropertyMock
import ccxt
import pandas as pd
import pytest
from freqtrade.enums import CandleType, MarginMode, TradingMode
from freqtrade.exceptions import DependencyException, InvalidOrderException, OperationalException
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_seconds
from freqtrade.persistence import Trade
from tests.conftest import EXMS, get_mock_coro, get_patched_exchange, log_has_re
from freqtrade.util.datetime_helpers import dt_from_ts, dt_ts, dt_utc
from tests.conftest import EXMS, get_patched_exchange
from tests.exchange.test_exchange import ccxt_exceptionhandlers
@@ -731,42 +734,243 @@ def test__set_leverage_binance(mocker, default_conf):
)
@pytest.mark.parametrize("candle_type", [CandleType.MARK, ""])
async def test__async_get_historic_ohlcv_binance(default_conf, mocker, caplog, candle_type):
ohlcv = [
[
int((datetime.now(timezone.utc).timestamp() - 1000) * 1000),
1, # open
2, # high
3, # low
4, # close
5, # volume (in quote currency)
def patch_binance_vision_ohlcv(mocker, start, archive_end, api_end, timeframe):
def make_storage(start: datetime, end: datetime, timeframe: str):
date = pd.date_range(start, end, freq=timeframe.replace("m", "min"))
df = pd.DataFrame(
data=dict(date=date, open=1.0, high=1.0, low=1.0, close=1.0),
)
return df
archive_storage = make_storage(start, archive_end, timeframe)
api_storage = make_storage(start, api_end, timeframe)
ohlcv = [[dt_ts(start), 1, 1, 1, 1]]
# (pair, timeframe, candle_type, ohlcv, True)
candle_history = [None, None, None, ohlcv, None]
def get_historic_ohlcv(
# self,
pair: str,
timeframe: str,
since_ms: int,
candle_type: CandleType,
is_new_pair: bool = False,
until_ms: int | None = None,
):
since = dt_from_ts(since_ms)
until = dt_from_ts(until_ms) if until_ms else api_end + timedelta(seconds=1)
return api_storage.loc[(api_storage["date"] >= since) & (api_storage["date"] < until)]
async def download_archive_ohlcv(
candle_type,
pair,
timeframe,
since_ms,
until_ms,
markets=None,
stop_on_404=False,
):
since = dt_from_ts(since_ms)
until = dt_from_ts(until_ms) if until_ms else archive_end + timedelta(seconds=1)
if since < start:
pass
return archive_storage.loc[
(archive_storage["date"] >= since) & (archive_storage["date"] < until)
]
]
candle_mock = mocker.patch(f"{EXMS}._async_get_candle_history", return_value=candle_history)
api_mock = mocker.patch(f"{EXMS}.get_historic_ohlcv", side_effect=get_historic_ohlcv)
archive_mock = mocker.patch(
"freqtrade.exchange.binance.download_archive_ohlcv", side_effect=download_archive_ohlcv
)
return candle_mock, api_mock, archive_mock
@pytest.mark.parametrize(
"timeframe,is_new_pair,since,until,first_date,last_date,candle_called,archive_called,"
"api_called",
[
(
"1m",
True,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59),
True,
True,
False,
),
(
"1m",
True,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 3),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2, 23, 59),
True,
True,
True,
),
(
"1m",
True,
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 2, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 2, 0, 59),
True,
False,
True,
),
(
"1m",
False,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59),
False,
True,
False,
),
(
"1m",
True,
dt_utc(2019, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59),
True,
True,
False,
),
(
"1m",
False,
dt_utc(2019, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59),
False,
True,
False,
),
(
"1m",
False,
dt_utc(2019, 1, 1),
dt_utc(2019, 1, 2),
None,
None,
False,
True,
True,
),
(
"1m",
True,
dt_utc(2019, 1, 1),
dt_utc(2019, 1, 2),
None,
None,
True,
False,
False,
),
(
"1m",
False,
dt_utc(2021, 1, 1),
dt_utc(2021, 1, 2),
None,
None,
False,
False,
False,
),
(
"1m",
True,
dt_utc(2021, 1, 1),
dt_utc(2021, 1, 2),
None,
None,
True,
False,
False,
),
(
"1h",
False,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23),
False,
False,
True,
),
(
"1m",
False,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 3, 50, 30),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 3, 50),
False,
True,
False,
),
],
)
def test_get_historic_ohlcv_binance(
mocker,
default_conf,
timeframe,
is_new_pair,
since,
until,
first_date,
last_date,
candle_called,
archive_called,
api_called,
):
exchange = get_patched_exchange(mocker, default_conf, exchange="binance")
# Monkey-patch async function
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
pair = "ETH/BTC"
respair, restf, restype, res, _ = await exchange._async_get_historic_ohlcv(
pair, "5m", 1500000000000, is_new_pair=False, candle_type=candle_type
)
assert respair == pair
assert restf == "5m"
assert restype == candle_type
# Call with very old timestamp - causes tons of requests
assert exchange._api_async.fetch_ohlcv.call_count > 400
# assert res == ohlcv
exchange._api_async.fetch_ohlcv.reset_mock()
_, _, _, res, _ = await exchange._async_get_historic_ohlcv(
pair, "5m", 1500000000000, is_new_pair=True, candle_type=candle_type
start = dt_utc(2020, 1, 1)
archive_end = dt_utc(2020, 1, 2)
api_end = dt_utc(2020, 1, 3)
candle_mock, api_mock, archive_mock = patch_binance_vision_ohlcv(
mocker, start=start, archive_end=archive_end, api_end=api_end, timeframe=timeframe
)
# Called twice - one "init" call - and one to get the actual data.
assert exchange._api_async.fetch_ohlcv.call_count == 2
assert res == ohlcv
assert log_has_re(r"Candle-data for ETH/BTC available starting with .*", caplog)
candle_type = CandleType.SPOT
pair = "BTC/USDT"
since_ms = dt_ts(since)
until_ms = dt_ts(until)
df = exchange.get_historic_ohlcv(pair, timeframe, since_ms, candle_type, is_new_pair, until_ms)
if df.empty:
assert first_date is None
assert last_date is None
else:
assert df["date"].iloc[0] == first_date
assert df["date"].iloc[-1] == last_date
assert (
df["date"].diff().iloc[1:] == timedelta(seconds=timeframe_to_seconds(timeframe))
).all()
if candle_called:
candle_mock.assert_called_once()
if archive_called:
archive_mock.assert_called_once()
if api_called:
api_mock.assert_called_once()
@pytest.mark.parametrize(

View File

@@ -0,0 +1,337 @@
import asyncio
import datetime
import io
import re
import sys
import zipfile
from datetime import timedelta
import aiohttp
import pandas as pd
import pytest
from freqtrade.enums import CandleType
from freqtrade.exchange.binance_public_data import (
BadHttpStatus,
Http404,
binance_vision_zip_name,
download_archive_ohlcv,
get_daily_ohlcv,
)
from freqtrade.util.datetime_helpers import dt_ts, dt_utc
@pytest.fixture(scope="module")
def event_loop_policy(request):
if sys.platform == "win32":
return asyncio.WindowsSelectorEventLoopPolicy()
else:
return asyncio.DefaultEventLoopPolicy()
class MockResponse:
"""AioHTTP response mock"""
def __init__(self, content, status, reason=""):
self._content = content
self.status = status
self.reason = reason
async def read(self):
return self._content
async def __aexit__(self, exc_type, exc, tb):
pass
async def __aenter__(self):
return self
# spot klines archive csv file format, the futures/um klines don't have the header line
#
# open_time,open,high,low,close,volume,close_time,quote_volume,count,taker_buy_volume,taker_buy_quote_volume,ignore # noqa: E501
# 1698364800000,34161.6,34182.5,33977.4,34024.2,409953,1698368399999,1202.97118037,15095,192220,564.12041453,0 # noqa: E501
# 1698368400000,34024.2,34060.1,33776.4,33848.4,740960,1698371999999,2183.75671155,23938,368266,1085.17080793,0 # noqa: E501
# 1698372000000,33848.5,34150.0,33815.1,34094.2,390376,1698375599999,1147.73267094,13854,231446,680.60405822,0 # noqa: E501
def make_response_from_url(start_date, end_date):
def make_daily_df(date, timeframe):
start = dt_utc(date.year, date.month, date.day)
end = start + timedelta(days=1)
date_col = pd.date_range(start, end, freq=timeframe.replace("m", "min"), inclusive="left")
cols = (
"open_time,open,high,low,close,volume,close_time,quote_volume,count,taker_buy_volume,"
"taker_buy_quote_volume,ignore"
)
df = pd.DataFrame(columns=cols.split(","), dtype=float)
df["open_time"] = date_col.astype("int64") // 10**6
df["open"] = df["high"] = df["low"] = df["close"] = df["volume"] = 1.0
return df
def make_daily_zip(asset_type_url_segment, symbol, timeframe, date) -> bytes:
df = make_daily_df(date, timeframe)
if asset_type_url_segment == "spot":
header = True
elif asset_type_url_segment == "futures/um":
header = None
else:
raise ValueError
csv = df.to_csv(index=False, header=header)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zipf:
zipf.writestr(binance_vision_zip_name(symbol, timeframe, date), csv)
return zip_buffer.getvalue()
def make_response(url):
pattern = (
r"https://data.binance.vision/data/(?P<asset_type_url_segment>spot|futures/um)"
r"/daily/klines/(?P<symbol>.*?)/(?P<timeframe>.*?)/(?P=symbol)-(?P=timeframe)-"
r"(?P<date>\d{4}-\d{2}-\d{2}).zip"
)
m = re.match(pattern, url)
if not m:
return MockResponse(content="", status=404)
date = datetime.datetime.strptime(m["date"], "%Y-%m-%d").date()
if date < start_date or date > end_date:
return MockResponse(content="", status=404)
zip_file = make_daily_zip(m["asset_type_url_segment"], m["symbol"], m["timeframe"], date)
return MockResponse(content=zip_file, status=200)
return make_response
@pytest.mark.parametrize(
"candle_type,pair,since,until,first_date,last_date,stop_on_404",
[
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59, 59),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 5),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 3, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2019, 12, 25),
dt_utc(2020, 1, 5),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 3, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2019, 1, 1),
dt_utc(2019, 1, 5),
None,
None,
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2021, 1, 1),
dt_utc(2021, 1, 5),
None,
None,
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 2),
None,
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 3, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 5),
dt_utc(2020, 1, 1),
None,
None,
False,
),
(
CandleType.FUTURES,
"BTC/USDT:USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59, 59),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23),
False,
),
(
CandleType.INDEX,
"N/A",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59, 59),
None,
None,
False,
),
# stop_on_404 = True
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2019, 12, 25),
dt_utc(2020, 1, 5),
None,
None,
True,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 5),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 3, 23),
True,
),
(
CandleType.FUTURES,
"BTC/USDT:USDT",
dt_utc(2019, 12, 25),
dt_utc(2020, 1, 5),
None,
None,
True,
),
],
)
async def test_download_archive_ohlcv(
mocker, candle_type, pair, since, until, first_date, last_date, stop_on_404
):
history_start = dt_utc(2020, 1, 1).date()
history_end = dt_utc(2020, 1, 3).date()
timeframe = "1h"
since_ms = dt_ts(since)
until_ms = dt_ts(until)
mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
side_effect=make_response_from_url(history_start, history_end),
)
markets = {"BTC/USDT": {"id": "BTCUSDT"}, "BTC/USDT:USDT": {"id": "BTCUSDT"}}
df = await download_archive_ohlcv(
candle_type,
pair,
timeframe,
since_ms=since_ms,
until_ms=until_ms,
markets=markets,
stop_on_404=stop_on_404,
)
if df.empty:
assert first_date is None and last_date is None
else:
assert candle_type in [CandleType.SPOT, CandleType.FUTURES]
assert df["date"].iloc[0] == first_date
assert df["date"].iloc[-1] == last_date
async def test_download_archive_ohlcv_exception(mocker):
timeframe = "1h"
pair = "BTC/USDT"
since_ms = dt_ts(dt_utc(2020, 1, 1))
until_ms = dt_ts(dt_utc(2020, 1, 2))
markets = {"BTC/USDT": {"id": "BTCUSDT"}, "BTC/USDT:USDT": {"id": "BTCUSDT"}}
mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get", side_effect=RuntimeError
)
df = await download_archive_ohlcv(
CandleType.SPOT, pair, timeframe, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert df.empty
async def test_get_daily_ohlcv(mocker, testdatadir):
symbol = "BTCUSDT"
timeframe = "1h"
date = dt_utc(2024, 10, 28).date()
first_date = dt_utc(2024, 10, 28)
last_date = dt_utc(2024, 10, 28, 23)
async with aiohttp.ClientSession() as session:
spot_path = (
testdatadir / "binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip"
)
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(spot_path.read_bytes(), 200),
)
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
assert get.call_count == 1
assert df["date"].iloc[0] == first_date
assert df["date"].iloc[-1] == last_date
futures_path = (
testdatadir / "binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip"
)
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(futures_path.read_bytes(), 200),
)
df = await get_daily_ohlcv("futures/um", symbol, timeframe, date, session)
assert get.call_count == 1
assert df["date"].iloc[0] == first_date
assert df["date"].iloc[-1] == last_date
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"", 404),
)
with pytest.raises(Http404):
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session, retry_delay=0)
assert get.call_count == 1
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"", 500),
)
mocker.patch("asyncio.sleep")
with pytest.raises(BadHttpStatus):
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
assert get.call_count == 4 # 1 + 3 default retries
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"nop", 200),
)
with pytest.raises(zipfile.BadZipFile):
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
assert get.call_count == 4 # 1 + 3 default retries

View File

@@ -2091,6 +2091,7 @@ def test___now_is_time_to_refresh(default_conf, mocker, exchange_name, time_mach
@pytest.mark.parametrize("candle_type", ["mark", ""])
@pytest.mark.parametrize("exchange_name", EXCHANGES)
def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name, candle_type):
caplog.set_level(logging.DEBUG)
exchange = get_patched_exchange(mocker, default_conf, exchange=exchange_name)
pair = "ETH/BTC"
calls = 0
@@ -2123,7 +2124,7 @@ def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name, candle_
assert exchange._async_get_candle_history.call_count == 2
# Returns twice the above OHLCV data after truncating the open candle.
assert len(ret) == 2
assert log_has_re(r"Downloaded data for .* with length .*\.", caplog)
assert log_has_re(r"Downloaded data for .* from ccxt with length .*\.", caplog)
caplog.clear()
@@ -2156,7 +2157,7 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_
pair = "ETH/USDT"
respair, restf, _, res, _ = await exchange._async_get_historic_ohlcv(
pair, "5m", 1500000000000, candle_type=candle_type, is_new_pair=False
pair, "5m", 1500000000000, candle_type=candle_type
)
assert respair == pair
assert restf == "5m"
@@ -2168,7 +2169,7 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_
end_ts = 1_500_500_000_000
start_ts = 1_500_000_000_000
respair, restf, _, res, _ = await exchange._async_get_historic_ohlcv(
pair, "5m", since_ms=start_ts, candle_type=candle_type, is_new_pair=False, until_ms=end_ts
pair, "5m", since_ms=start_ts, candle_type=candle_type, until_ms=end_ts
)
# Required candles
candles = (end_ts - start_ts) / 300_000