Merge pull request #10858 from xzmeng/binance-public-data

Improve binance historic OHLCV download speed
This commit is contained in:
Matthias
2024-12-01 19:41:40 +01:00
committed by GitHub
11 changed files with 957 additions and 56 deletions

View File

@@ -225,6 +225,7 @@ Mandatory parameters are marked as **Required**, which means that they are requi
| `exchange.skip_open_order_update` | Skips open order updates on startup should the exchange cause problems. Only relevant in live conditions.<br>*Defaults to `false`*<br> **Datatype:** Boolean
| `exchange.unknown_fee_rate` | Fallback value to use when calculating trading fees. This can be useful for exchanges which have fees in non-tradable currencies. The value provided here will be multiplied with the "fee cost".<br>*Defaults to `None`<br> **Datatype:** float
| `exchange.log_responses` | Log relevant exchange responses. For debug mode only - use with care.<br>*Defaults to `false`*<br> **Datatype:** Boolean
| `exchange.only_from_ccxt` | Prevent data-download from data.binance.vision. Leaving this as false can greatly speed up downloads, but may be problematic if the site is not available.<br>*Defaults to `false`*<br> **Datatype:** Boolean
| `experimental.block_bad_exchanges` | Block exchanges known to not work with freqtrade. Leave on default unless you want to test if that exchange works now. <br>*Defaults to `true`.* <br> **Datatype:** Boolean
| | **Plugins**
| `edge.*` | Please refer to [edge configuration document](edge.md) for detailed explanation of all possible configuration options.

View File

@@ -285,6 +285,7 @@ def _download_pair_history(
candle_type=candle_type,
until_ms=until_ms if until_ms else None,
)
logger.info(f"Downloaded data for {pair} with length {len(new_dataframe)}.")
if data.empty:
data = new_dataframe
else:

View File

@@ -5,13 +5,18 @@ from datetime import datetime, timezone
from pathlib import Path
import ccxt
from pandas import DataFrame
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.enums import CandleType, MarginMode, PriceType, TradingMode
from freqtrade.exceptions import DDosProtection, OperationalException, TemporaryError
from freqtrade.exchange import Exchange
from freqtrade.exchange.binance_public_data import concat_safe, download_archive_ohlcv
from freqtrade.exchange.common import retrier
from freqtrade.exchange.exchange_types import FtHas, OHLCVResponse, Tickers
from freqtrade.exchange.exchange_types import FtHas, Tickers
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_msecs
from freqtrade.misc import deep_merge_dicts, json_load
from freqtrade.util.datetime_helpers import dt_from_ts, dt_ts
logger = logging.getLogger(__name__)
@@ -97,23 +102,24 @@ class Binance(Exchange):
except ccxt.BaseError as e:
raise OperationalException(e) from e
async def _async_get_historic_ohlcv(
def get_historic_ohlcv(
self,
pair: str,
timeframe: str,
since_ms: int,
candle_type: CandleType,
is_new_pair: bool = False,
raise_: bool = False,
until_ms: int | None = None,
) -> OHLCVResponse:
) -> DataFrame:
"""
Overwrite to introduce "fast new pair" functionality by detecting the pair's listing date
Does not work for other exchanges, which don't return the earliest data when called with "0"
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
if is_new_pair:
x = await self._async_get_candle_history(pair, timeframe, candle_type, 0)
x = self.loop.run_until_complete(
self._async_get_candle_history(pair, timeframe, candle_type, 0)
)
if x and x[3] and x[3][0] and x[3][0][0] > since_ms:
# Set starting date to first available candle.
since_ms = x[3][0][0]
@@ -121,17 +127,89 @@ class Binance(Exchange):
f"Candle-data for {pair} available starting with "
f"{datetime.fromtimestamp(since_ms // 1000, tz=timezone.utc).isoformat()}."
)
if until_ms and since_ms >= until_ms:
logger.warning(
f"No available candle-data for {pair} before "
f"{dt_from_ts(until_ms).isoformat()}"
)
return DataFrame(columns=DEFAULT_DATAFRAME_COLUMNS)
return await super()._async_get_historic_ohlcv(
pair=pair,
timeframe=timeframe,
since_ms=since_ms,
is_new_pair=is_new_pair,
raise_=raise_,
candle_type=candle_type,
until_ms=until_ms,
if (
self._config["exchange"].get("only_from_ccxt", False)
and
# only download timeframes with significant improvements,
# otherwise fall back to rest API
not (
(candle_type == CandleType.SPOT and timeframe in ["1s", "1m", "3m", "5m"])
or (
candle_type == CandleType.FUTURES
and timeframe in ["1m", "3m", "5m", "15m", "30m"]
)
)
):
return super().get_historic_ohlcv(
pair=pair,
timeframe=timeframe,
since_ms=since_ms,
candle_type=candle_type,
is_new_pair=is_new_pair,
until_ms=until_ms,
)
else:
# Download from data.binance.vision
return self.get_historic_ohlcv_fast(
pair=pair,
timeframe=timeframe,
since_ms=since_ms,
candle_type=candle_type,
is_new_pair=is_new_pair,
until_ms=until_ms,
)
def get_historic_ohlcv_fast(
self,
pair: str,
timeframe: str,
since_ms: int,
candle_type: CandleType,
is_new_pair: bool = False,
until_ms: int | None = None,
) -> DataFrame:
"""
Fastly fetch OHLCV data by leveraging https://data.binance.vision.
"""
df = self.loop.run_until_complete(
download_archive_ohlcv(
candle_type=candle_type,
pair=pair,
timeframe=timeframe,
since_ms=since_ms,
until_ms=until_ms,
markets=self.markets,
)
)
# download the remaining data from rest API
if df.empty:
rest_since_ms = since_ms
else:
rest_since_ms = dt_ts(df.iloc[-1].date) + timeframe_to_msecs(timeframe)
# make sure since <= until
if until_ms and rest_since_ms > until_ms:
rest_df = DataFrame()
else:
rest_df = super().get_historic_ohlcv(
pair=pair,
timeframe=timeframe,
since_ms=rest_since_ms,
candle_type=candle_type,
is_new_pair=is_new_pair,
until_ms=until_ms,
)
all_df = concat_safe([df, rest_df])
return all_df
def funding_fee_cutoff(self, open_date: datetime):
"""
Funding fees are only charged at full hours (usually every 4-8h).

View File

@@ -0,0 +1,281 @@
"""
Fetch daily-archived OHLCV data from https://data.binance.vision/
"""
import asyncio
import logging
import zipfile
from datetime import date, timedelta
from io import BytesIO
from typing import Any
import aiohttp
import pandas as pd
from pandas import DataFrame
from freqtrade.enums import CandleType
from freqtrade.misc import chunks
from freqtrade.util.datetime_helpers import dt_from_ts, dt_now
logger = logging.getLogger(__name__)
class Http404(Exception):
def __init__(self, msg, date, url):
super().__init__(msg)
self.date = date
self.url = url
class BadHttpStatus(Exception):
"""Not 200/404"""
pass
async def download_archive_ohlcv(
candle_type: CandleType,
pair: str,
timeframe: str,
*,
since_ms: int,
until_ms: int | None,
markets: dict[str, Any],
stop_on_404: bool = True,
) -> DataFrame:
"""
Fetch OHLCV data from https://data.binance.vision
The function makes its best effort to download data within the time range
[`since_ms`, `until_ms`] -- including `since_ms`, but excluding `until_ms`.
If `stop_one_404` is True, this returned DataFrame is guaranteed to start from `since_ms`
with no gaps in the data.
:candle_type: Currently only spot and futures are supported
:pair: symbol name in CCXT convention
:since_ms: the start timestamp of data, including itself
:until_ms: the end timestamp of data, excluding itself
:param until_ms: `None` indicates the timestamp of the latest available data
:markets: the CCXT markets dict, when it's None, the function will load the markets data
from a new `ccxt.binance` instance
:param stop_on_404: Stop to download the following data when a 404 returned
:return: the date range is between [since_ms, until_ms), return an empty DataFrame if no data
available in the time range
"""
try:
if candle_type == CandleType.SPOT:
asset_type_url_segment = "spot"
elif candle_type == CandleType.FUTURES:
asset_type_url_segment = "futures/um"
else:
raise ValueError(f"Unsupported CandleType: {candle_type}")
symbol = markets[pair]["id"]
start = dt_from_ts(since_ms)
end = dt_from_ts(until_ms) if until_ms else dt_now()
# We use two days ago as the last available day because the daily archives are daily
# uploaded and have several hours delay
last_available_date = dt_now() - timedelta(days=2)
end = min(end, last_available_date)
if start >= end:
return DataFrame()
df = await _download_archive_ohlcv(
asset_type_url_segment, symbol, pair, timeframe, start, end, stop_on_404
)
logger.debug(
f"Downloaded data for {pair} from https://data.binance.vision with length {len(df)}."
)
except Exception as e:
logger.warning(
"An exception occurred during fast download from Binance, falling back to"
"the slower REST API, this can take more time.",
exc_info=e,
)
df = DataFrame()
if not df.empty:
# only return the data within the requested time range
return df.loc[(df["date"] >= start) & (df["date"] < end)]
else:
return df
def concat_safe(dfs) -> DataFrame:
if all(df is None for df in dfs):
return DataFrame()
else:
return pd.concat(dfs)
async def _download_archive_ohlcv(
asset_type_url_segment: str,
symbol: str,
pair: str,
timeframe: str,
start: date,
end: date,
stop_on_404: bool,
) -> DataFrame:
# daily dataframes, `None` indicates missing data in that day (when `stop_on_404` is False)
dfs: list[DataFrame | None] = []
# the current day being processing, starting at 1.
current_day = 0
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector, trust_env=True) as session:
# the HTTP connections has been throttled by TCPConnector
for dates in chunks(list(date_range(start, end)), 1000):
tasks = [
asyncio.create_task(
get_daily_ohlcv(asset_type_url_segment, symbol, timeframe, date, session)
)
for date in dates
]
for task in tasks:
current_day += 1
try:
df = await task
except Http404 as e:
if stop_on_404:
logger.debug(f"Failed to download {e.url} due to 404.")
# A 404 error on the first day indicates missing data
# on https://data.binance.vision, we provide the warning and the advice.
# https://github.com/freqtrade/freqtrade/blob/acc53065e5fa7ab5197073276306dc9dc3adbfa3/tests/exchange_online/test_binance_compare_ohlcv.py#L7
if current_day == 1:
logger.warning(
f"Fast download is unavailable due to missing data: "
f"{e.url}. Falling back to the slower REST API, "
"which may take more time."
)
if pair in ["BTC/USDT:USDT", "ETH/USDT:USDT", "BCH/USDT:USDT"]:
logger.warning(
f"To avoid the delay, you can first download {pair} using "
"`--timerange <start date>-20200101`, and then download the "
"remaining data with `--timerange 20200101-<end date>`."
)
else:
logger.warning(
f"Binance fast download for {pair} stopped at {e.date} due to "
f"missing data: {e.url}, falling back to rest API for the "
"remaining data, this can take more time."
)
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return concat_safe(dfs)
else:
dfs.append(None)
except BaseException as e:
logger.warning(f"An exception raised: : {e}")
# Directly return the existing data, do not allow the gap within the data
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return concat_safe(dfs)
else:
dfs.append(df)
return concat_safe(dfs)
async def cancel_and_await_tasks(unawaited_tasks):
"""Cancel and await the tasks"""
logger.debug("Try to cancel uncompleted download tasks.")
for task in unawaited_tasks:
task.cancel()
await asyncio.gather(*unawaited_tasks, return_exceptions=True)
logger.debug("All download tasks were awaited.")
def date_range(start: date, end: date):
date = start
while date <= end:
yield date
date += timedelta(days=1)
def binance_vision_zip_name(symbol: str, timeframe: str, date: date) -> str:
return f"{symbol}-{timeframe}-{date.strftime('%Y-%m-%d')}.zip"
def binance_vision_zip_url(
asset_type_url_segment: str, symbol: str, timeframe: str, date: date
) -> str:
"""
example urls:
https://data.binance.vision/data/spot/daily/klines/BTCUSDT/1s/BTCUSDT-1s-2023-10-27.zip
https://data.binance.vision/data/futures/um/daily/klines/BTCUSDT/1h/BTCUSDT-1h-2023-10-27.zip
"""
url = (
f"https://data.binance.vision/data/{asset_type_url_segment}/daily/klines/{symbol}"
f"/{timeframe}/{binance_vision_zip_name(symbol, timeframe, date)}"
)
return url
async def get_daily_ohlcv(
asset_type_url_segment: str,
symbol: str,
timeframe: str,
date: date,
session: aiohttp.ClientSession,
retry_count: int = 3,
retry_delay: float = 0.0,
) -> DataFrame:
"""
Get daily OHLCV from https://data.binance.vision
See https://github.com/binance/binance-public-data
:asset_type_url_segment: `spot` or `futures/um`
:symbol: binance symbol name, e.g. BTCUSDT
:timeframe: e.g. 1m, 1h
:date: the returned DataFrame will cover the entire day of `date` in UTC
:session: an aiohttp.ClientSession instance
:retry_count: times to retry before returning the exceptions
:retry_delay: the time to wait before every retry
:return: A dataframe containing columns date,open,high,low,close,volume
"""
url = binance_vision_zip_url(asset_type_url_segment, symbol, timeframe, date)
logger.debug(f"download data from binance: {url}")
retry = 0
while True:
if retry > 0:
sleep_secs = retry * retry_delay
logger.debug(
f"[{retry}/{retry_count}] retry to download {url} after {sleep_secs} seconds"
)
await asyncio.sleep(sleep_secs)
try:
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.read()
logger.debug(f"Successfully downloaded {url}")
with zipfile.ZipFile(BytesIO(content)) as zipf:
with zipf.open(zipf.namelist()[0]) as csvf:
# https://github.com/binance/binance-public-data/issues/283
first_byte = csvf.read(1)[0]
if chr(first_byte).isdigit():
header = None
else:
header = 0
csvf.seek(0)
df = pd.read_csv(
csvf,
usecols=[0, 1, 2, 3, 4, 5],
names=["date", "open", "high", "low", "close", "volume"],
header=header,
)
df["date"] = pd.to_datetime(df["date"], unit="ms", utc=True)
return df
elif resp.status == 404:
logger.debug(f"Failed to download {url}")
raise Http404(f"404: {url}", date, url)
else:
raise BadHttpStatus(f"{resp.status} - {resp.reason}")
except Exception as e:
retry += 1
if isinstance(e, Http404) or retry > retry_count:
logger.debug(f"Failed to get data from {url}: {e}")
raise

View File

@@ -2244,8 +2244,9 @@ class Exchange:
:param pair: Pair to download
:param timeframe: Timeframe to get data for
:param since_ms: Timestamp in milliseconds to get history from
:param until_ms: Timestamp in milliseconds to get history up to
:param candle_type: '', mark, index, premiumIndex, or funding_rate
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading
:param until_ms: Timestamp in milliseconds to get history up to
:return: Dataframe with candle (OHLCV) data
"""
pair, _, _, data, _ = self.loop.run_until_complete(
@@ -2254,11 +2255,10 @@ class Exchange:
timeframe=timeframe,
since_ms=since_ms,
until_ms=until_ms,
is_new_pair=is_new_pair,
candle_type=candle_type,
)
)
logger.info(f"Downloaded data for {pair} with length {len(data)}.")
logger.debug(f"Downloaded data for {pair} from ccxt with length {len(data)}.")
return ohlcv_to_dataframe(data, timeframe, pair, fill_missing=False, drop_incomplete=True)
async def _async_get_historic_ohlcv(
@@ -2267,13 +2267,11 @@ class Exchange:
timeframe: str,
since_ms: int,
candle_type: CandleType,
is_new_pair: bool = False,
raise_: bool = False,
until_ms: int | None = None,
) -> OHLCVResponse:
"""
Download historic ohlcv
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""

View File

@@ -128,8 +128,8 @@ def test_load_data_with_new_pair_1min(
"""
Test load_pair_history() with 1 min timeframe
"""
mocker.patch(f"{EXMS}.get_historic_ohlcv", return_value=ohlcv_history)
exchange = get_patched_exchange(mocker, default_conf)
mocker.patch.object(exchange, "get_historic_ohlcv", return_value=ohlcv_history)
file = tmp_path / "MEME_BTC-1m.feather"
# do not download a new pair if refresh_pairs isn't set
@@ -306,8 +306,8 @@ def test_load_cached_data_for_updating(testdatadir) -> None:
def test_download_pair_history(
ohlcv_history, mocker, default_conf, tmp_path, candle_type, subdir, file_tail
) -> None:
mocker.patch(f"{EXMS}.get_historic_ohlcv", return_value=ohlcv_history)
exchange = get_patched_exchange(mocker, default_conf)
mocker.patch.object(exchange, "get_historic_ohlcv", return_value=ohlcv_history)
file1_1 = tmp_path / f"{subdir}MEME_BTC-1m{file_tail}.feather"
file1_5 = tmp_path / f"{subdir}MEME_BTC-5m{file_tail}.feather"
file2_1 = tmp_path / f"{subdir}CFI_BTC-1m{file_tail}.feather"
@@ -357,8 +357,8 @@ def test_download_pair_history2(mocker, default_conf, testdatadir, ohlcv_history
"freqtrade.data.history.datahandlers.featherdatahandler.FeatherDataHandler.ohlcv_store",
return_value=None,
)
mocker.patch(f"{EXMS}.get_historic_ohlcv", return_value=ohlcv_history)
exchange = get_patched_exchange(mocker, default_conf)
mocker.patch.object(exchange, "get_historic_ohlcv", return_value=ohlcv_history)
_download_pair_history(
datadir=testdatadir,
exchange=exchange,

View File

@@ -1,14 +1,17 @@
from datetime import datetime, timezone
from datetime import datetime, timedelta
from random import randint
from unittest.mock import MagicMock, PropertyMock
import ccxt
import pandas as pd
import pytest
from freqtrade.enums import CandleType, MarginMode, TradingMode
from freqtrade.exceptions import DependencyException, InvalidOrderException, OperationalException
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_seconds
from freqtrade.persistence import Trade
from tests.conftest import EXMS, get_mock_coro, get_patched_exchange, log_has_re
from freqtrade.util.datetime_helpers import dt_from_ts, dt_ts, dt_utc
from tests.conftest import EXMS, get_patched_exchange
from tests.exchange.test_exchange import ccxt_exceptionhandlers
@@ -731,42 +734,243 @@ def test__set_leverage_binance(mocker, default_conf):
)
@pytest.mark.parametrize("candle_type", [CandleType.MARK, ""])
async def test__async_get_historic_ohlcv_binance(default_conf, mocker, caplog, candle_type):
ohlcv = [
[
int((datetime.now(timezone.utc).timestamp() - 1000) * 1000),
1, # open
2, # high
3, # low
4, # close
5, # volume (in quote currency)
def patch_binance_vision_ohlcv(mocker, start, archive_end, api_end, timeframe):
def make_storage(start: datetime, end: datetime, timeframe: str):
date = pd.date_range(start, end, freq=timeframe.replace("m", "min"))
df = pd.DataFrame(
data=dict(date=date, open=1.0, high=1.0, low=1.0, close=1.0),
)
return df
archive_storage = make_storage(start, archive_end, timeframe)
api_storage = make_storage(start, api_end, timeframe)
ohlcv = [[dt_ts(start), 1, 1, 1, 1]]
# (pair, timeframe, candle_type, ohlcv, True)
candle_history = [None, None, None, ohlcv, None]
def get_historic_ohlcv(
# self,
pair: str,
timeframe: str,
since_ms: int,
candle_type: CandleType,
is_new_pair: bool = False,
until_ms: int | None = None,
):
since = dt_from_ts(since_ms)
until = dt_from_ts(until_ms) if until_ms else api_end + timedelta(seconds=1)
return api_storage.loc[(api_storage["date"] >= since) & (api_storage["date"] < until)]
async def download_archive_ohlcv(
candle_type,
pair,
timeframe,
since_ms,
until_ms,
markets=None,
stop_on_404=False,
):
since = dt_from_ts(since_ms)
until = dt_from_ts(until_ms) if until_ms else archive_end + timedelta(seconds=1)
if since < start:
pass
return archive_storage.loc[
(archive_storage["date"] >= since) & (archive_storage["date"] < until)
]
]
candle_mock = mocker.patch(f"{EXMS}._async_get_candle_history", return_value=candle_history)
api_mock = mocker.patch(f"{EXMS}.get_historic_ohlcv", side_effect=get_historic_ohlcv)
archive_mock = mocker.patch(
"freqtrade.exchange.binance.download_archive_ohlcv", side_effect=download_archive_ohlcv
)
return candle_mock, api_mock, archive_mock
@pytest.mark.parametrize(
"timeframe,is_new_pair,since,until,first_date,last_date,candle_called,archive_called,"
"api_called",
[
(
"1m",
True,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59),
True,
True,
False,
),
(
"1m",
True,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 3),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2, 23, 59),
True,
True,
True,
),
(
"1m",
True,
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 2, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 2, 0, 59),
True,
False,
True,
),
(
"1m",
False,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59),
False,
True,
False,
),
(
"1m",
True,
dt_utc(2019, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59),
True,
True,
False,
),
(
"1m",
False,
dt_utc(2019, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59),
False,
True,
False,
),
(
"1m",
False,
dt_utc(2019, 1, 1),
dt_utc(2019, 1, 2),
None,
None,
False,
True,
True,
),
(
"1m",
True,
dt_utc(2019, 1, 1),
dt_utc(2019, 1, 2),
None,
None,
True,
False,
False,
),
(
"1m",
False,
dt_utc(2021, 1, 1),
dt_utc(2021, 1, 2),
None,
None,
False,
False,
False,
),
(
"1m",
True,
dt_utc(2021, 1, 1),
dt_utc(2021, 1, 2),
None,
None,
True,
False,
False,
),
(
"1h",
False,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23),
False,
False,
True,
),
(
"1m",
False,
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 3, 50, 30),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 3, 50),
False,
True,
False,
),
],
)
def test_get_historic_ohlcv_binance(
mocker,
default_conf,
timeframe,
is_new_pair,
since,
until,
first_date,
last_date,
candle_called,
archive_called,
api_called,
):
exchange = get_patched_exchange(mocker, default_conf, exchange="binance")
# Monkey-patch async function
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
pair = "ETH/BTC"
respair, restf, restype, res, _ = await exchange._async_get_historic_ohlcv(
pair, "5m", 1500000000000, is_new_pair=False, candle_type=candle_type
)
assert respair == pair
assert restf == "5m"
assert restype == candle_type
# Call with very old timestamp - causes tons of requests
assert exchange._api_async.fetch_ohlcv.call_count > 400
# assert res == ohlcv
exchange._api_async.fetch_ohlcv.reset_mock()
_, _, _, res, _ = await exchange._async_get_historic_ohlcv(
pair, "5m", 1500000000000, is_new_pair=True, candle_type=candle_type
start = dt_utc(2020, 1, 1)
archive_end = dt_utc(2020, 1, 2)
api_end = dt_utc(2020, 1, 3)
candle_mock, api_mock, archive_mock = patch_binance_vision_ohlcv(
mocker, start=start, archive_end=archive_end, api_end=api_end, timeframe=timeframe
)
# Called twice - one "init" call - and one to get the actual data.
assert exchange._api_async.fetch_ohlcv.call_count == 2
assert res == ohlcv
assert log_has_re(r"Candle-data for ETH/BTC available starting with .*", caplog)
candle_type = CandleType.SPOT
pair = "BTC/USDT"
since_ms = dt_ts(since)
until_ms = dt_ts(until)
df = exchange.get_historic_ohlcv(pair, timeframe, since_ms, candle_type, is_new_pair, until_ms)
if df.empty:
assert first_date is None
assert last_date is None
else:
assert df["date"].iloc[0] == first_date
assert df["date"].iloc[-1] == last_date
assert (
df["date"].diff().iloc[1:] == timedelta(seconds=timeframe_to_seconds(timeframe))
).all()
if candle_called:
candle_mock.assert_called_once()
if archive_called:
archive_mock.assert_called_once()
if api_called:
api_mock.assert_called_once()
@pytest.mark.parametrize(

View File

@@ -0,0 +1,337 @@
import asyncio
import datetime
import io
import re
import sys
import zipfile
from datetime import timedelta
import aiohttp
import pandas as pd
import pytest
from freqtrade.enums import CandleType
from freqtrade.exchange.binance_public_data import (
BadHttpStatus,
Http404,
binance_vision_zip_name,
download_archive_ohlcv,
get_daily_ohlcv,
)
from freqtrade.util.datetime_helpers import dt_ts, dt_utc
@pytest.fixture(scope="module")
def event_loop_policy(request):
if sys.platform == "win32":
return asyncio.WindowsSelectorEventLoopPolicy()
else:
return asyncio.DefaultEventLoopPolicy()
class MockResponse:
"""AioHTTP response mock"""
def __init__(self, content, status, reason=""):
self._content = content
self.status = status
self.reason = reason
async def read(self):
return self._content
async def __aexit__(self, exc_type, exc, tb):
pass
async def __aenter__(self):
return self
# spot klines archive csv file format, the futures/um klines don't have the header line
#
# open_time,open,high,low,close,volume,close_time,quote_volume,count,taker_buy_volume,taker_buy_quote_volume,ignore # noqa: E501
# 1698364800000,34161.6,34182.5,33977.4,34024.2,409953,1698368399999,1202.97118037,15095,192220,564.12041453,0 # noqa: E501
# 1698368400000,34024.2,34060.1,33776.4,33848.4,740960,1698371999999,2183.75671155,23938,368266,1085.17080793,0 # noqa: E501
# 1698372000000,33848.5,34150.0,33815.1,34094.2,390376,1698375599999,1147.73267094,13854,231446,680.60405822,0 # noqa: E501
def make_response_from_url(start_date, end_date):
def make_daily_df(date, timeframe):
start = dt_utc(date.year, date.month, date.day)
end = start + timedelta(days=1)
date_col = pd.date_range(start, end, freq=timeframe.replace("m", "min"), inclusive="left")
cols = (
"open_time,open,high,low,close,volume,close_time,quote_volume,count,taker_buy_volume,"
"taker_buy_quote_volume,ignore"
)
df = pd.DataFrame(columns=cols.split(","), dtype=float)
df["open_time"] = date_col.astype("int64") // 10**6
df["open"] = df["high"] = df["low"] = df["close"] = df["volume"] = 1.0
return df
def make_daily_zip(asset_type_url_segment, symbol, timeframe, date) -> bytes:
df = make_daily_df(date, timeframe)
if asset_type_url_segment == "spot":
header = True
elif asset_type_url_segment == "futures/um":
header = None
else:
raise ValueError
csv = df.to_csv(index=False, header=header)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w") as zipf:
zipf.writestr(binance_vision_zip_name(symbol, timeframe, date), csv)
return zip_buffer.getvalue()
def make_response(url):
pattern = (
r"https://data.binance.vision/data/(?P<asset_type_url_segment>spot|futures/um)"
r"/daily/klines/(?P<symbol>.*?)/(?P<timeframe>.*?)/(?P=symbol)-(?P=timeframe)-"
r"(?P<date>\d{4}-\d{2}-\d{2}).zip"
)
m = re.match(pattern, url)
if not m:
return MockResponse(content="", status=404)
date = datetime.datetime.strptime(m["date"], "%Y-%m-%d").date()
if date < start_date or date > end_date:
return MockResponse(content="", status=404)
zip_file = make_daily_zip(m["asset_type_url_segment"], m["symbol"], m["timeframe"], date)
return MockResponse(content=zip_file, status=200)
return make_response
@pytest.mark.parametrize(
"candle_type,pair,since,until,first_date,last_date,stop_on_404",
[
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59, 59),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 5),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 3, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2019, 12, 25),
dt_utc(2020, 1, 5),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 3, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2019, 1, 1),
dt_utc(2019, 1, 5),
None,
None,
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2021, 1, 1),
dt_utc(2021, 1, 5),
None,
None,
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 2),
None,
dt_utc(2020, 1, 2),
dt_utc(2020, 1, 3, 23),
False,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 5),
dt_utc(2020, 1, 1),
None,
None,
False,
),
(
CandleType.FUTURES,
"BTC/USDT:USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59, 59),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23),
False,
),
(
CandleType.INDEX,
"N/A",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 1, 23, 59, 59),
None,
None,
False,
),
# stop_on_404 = True
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2019, 12, 25),
dt_utc(2020, 1, 5),
None,
None,
True,
),
(
CandleType.SPOT,
"BTC/USDT",
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 5),
dt_utc(2020, 1, 1),
dt_utc(2020, 1, 3, 23),
True,
),
(
CandleType.FUTURES,
"BTC/USDT:USDT",
dt_utc(2019, 12, 25),
dt_utc(2020, 1, 5),
None,
None,
True,
),
],
)
async def test_download_archive_ohlcv(
mocker, candle_type, pair, since, until, first_date, last_date, stop_on_404
):
history_start = dt_utc(2020, 1, 1).date()
history_end = dt_utc(2020, 1, 3).date()
timeframe = "1h"
since_ms = dt_ts(since)
until_ms = dt_ts(until)
mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
side_effect=make_response_from_url(history_start, history_end),
)
markets = {"BTC/USDT": {"id": "BTCUSDT"}, "BTC/USDT:USDT": {"id": "BTCUSDT"}}
df = await download_archive_ohlcv(
candle_type,
pair,
timeframe,
since_ms=since_ms,
until_ms=until_ms,
markets=markets,
stop_on_404=stop_on_404,
)
if df.empty:
assert first_date is None and last_date is None
else:
assert candle_type in [CandleType.SPOT, CandleType.FUTURES]
assert df["date"].iloc[0] == first_date
assert df["date"].iloc[-1] == last_date
async def test_download_archive_ohlcv_exception(mocker):
timeframe = "1h"
pair = "BTC/USDT"
since_ms = dt_ts(dt_utc(2020, 1, 1))
until_ms = dt_ts(dt_utc(2020, 1, 2))
markets = {"BTC/USDT": {"id": "BTCUSDT"}, "BTC/USDT:USDT": {"id": "BTCUSDT"}}
mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get", side_effect=RuntimeError
)
df = await download_archive_ohlcv(
CandleType.SPOT, pair, timeframe, since_ms=since_ms, until_ms=until_ms, markets=markets
)
assert df.empty
async def test_get_daily_ohlcv(mocker, testdatadir):
symbol = "BTCUSDT"
timeframe = "1h"
date = dt_utc(2024, 10, 28).date()
first_date = dt_utc(2024, 10, 28)
last_date = dt_utc(2024, 10, 28, 23)
async with aiohttp.ClientSession() as session:
spot_path = (
testdatadir / "binance/binance_public_data/spot-klines-BTCUSDT-1h-2024-10-28.zip"
)
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(spot_path.read_bytes(), 200),
)
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
assert get.call_count == 1
assert df["date"].iloc[0] == first_date
assert df["date"].iloc[-1] == last_date
futures_path = (
testdatadir / "binance/binance_public_data/futures-um-klines-BTCUSDT-1h-2024-10-28.zip"
)
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(futures_path.read_bytes(), 200),
)
df = await get_daily_ohlcv("futures/um", symbol, timeframe, date, session)
assert get.call_count == 1
assert df["date"].iloc[0] == first_date
assert df["date"].iloc[-1] == last_date
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"", 404),
)
with pytest.raises(Http404):
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session, retry_delay=0)
assert get.call_count == 1
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"", 500),
)
mocker.patch("asyncio.sleep")
with pytest.raises(BadHttpStatus):
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
assert get.call_count == 4 # 1 + 3 default retries
get = mocker.patch(
"freqtrade.exchange.binance_public_data.aiohttp.ClientSession.get",
return_value=MockResponse(b"nop", 200),
)
with pytest.raises(zipfile.BadZipFile):
df = await get_daily_ohlcv("spot", symbol, timeframe, date, session)
assert get.call_count == 4 # 1 + 3 default retries

View File

@@ -2091,6 +2091,7 @@ def test___now_is_time_to_refresh(default_conf, mocker, exchange_name, time_mach
@pytest.mark.parametrize("candle_type", ["mark", ""])
@pytest.mark.parametrize("exchange_name", EXCHANGES)
def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name, candle_type):
caplog.set_level(logging.DEBUG)
exchange = get_patched_exchange(mocker, default_conf, exchange=exchange_name)
pair = "ETH/BTC"
calls = 0
@@ -2123,7 +2124,7 @@ def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name, candle_
assert exchange._async_get_candle_history.call_count == 2
# Returns twice the above OHLCV data after truncating the open candle.
assert len(ret) == 2
assert log_has_re(r"Downloaded data for .* with length .*\.", caplog)
assert log_has_re(r"Downloaded data for .* from ccxt with length .*\.", caplog)
caplog.clear()
@@ -2156,7 +2157,7 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_
pair = "ETH/USDT"
respair, restf, _, res, _ = await exchange._async_get_historic_ohlcv(
pair, "5m", 1500000000000, candle_type=candle_type, is_new_pair=False
pair, "5m", 1500000000000, candle_type=candle_type
)
assert respair == pair
assert restf == "5m"
@@ -2168,7 +2169,7 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_
end_ts = 1_500_500_000_000
start_ts = 1_500_000_000_000
respair, restf, _, res, _ = await exchange._async_get_historic_ohlcv(
pair, "5m", since_ms=start_ts, candle_type=candle_type, is_new_pair=False, until_ms=end_ts
pair, "5m", since_ms=start_ts, candle_type=candle_type, until_ms=end_ts
)
# Required candles
candles = (end_ts - start_ts) / 300_000