feat: further work on download-trades-archive

This commit is contained in:
Matthias
2025-01-26 11:18:27 +01:00
parent af3f9d5a75
commit 3746305b5a

View File

@@ -3,7 +3,6 @@ Fetch daily-archived OHLCV data from https://data.binance.vision/
"""
import asyncio
import csv
import logging
import zipfile
from datetime import date, timedelta
@@ -11,9 +10,11 @@ from io import BytesIO
from typing import Any
import aiohttp
import numpy as np
import pandas as pd
from pandas import DataFrame
from freqtrade.constants import DEFAULT_TRADES_COLUMNS
from freqtrade.enums import CandleType
from freqtrade.misc import chunks
from freqtrade.util.datetime_helpers import dt_from_ts, dt_now
@@ -316,7 +317,10 @@ async def download_archive_trades(
end = min(end, last_available_date)
if start >= end:
return DataFrame()
await _download_archive_ohlcv(asset_type_url_segment, symbol, pair, start, end, stop_on_404)
result_list = await _download_archive_trades(
asset_type_url_segment, symbol, pair, start, end, stop_on_404
)
return pair, result_list
except Exception as e:
logger.warning(
@@ -371,12 +375,14 @@ async def get_daily_trades(
first_byte = csvf.read(1)[0]
if chr(first_byte).isdigit():
# spot
header = [
header = None
names = [
"id",
"price",
"qty",
"quote_qty",
"time",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
"is_best_match",
]
@@ -386,22 +392,27 @@ async def get_daily_trades(
names = [
"id",
"price",
"qty",
"quote_qty",
"time",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
]
csvf.seek(0)
df = pd.read_csv(
csvf,
usecols=[0, 1, 2, 3, 4, 5],
names=names,
header=header,
)
df["cost"] = df["price"] * df["qty"]
return df[].to_records(index=False).tolist()
df["cost"] = df["price"] * df["amount"]
# Side is reversed intentionally
# based on ccxt parseTrade logic.
df["side"] = np.where(df["is_buyer_maker"], "sell", "buy")
df["type"] = None
if header is None:
df["timestamp"] = df["timestamp"] // 1000
return df[DEFAULT_TRADES_COLUMNS].to_records(index=False).tolist()
elif resp.status == 404:
logger.debug(f"Failed to download {url}")
raise Http404(f"404: {url}", date, url)
@@ -423,7 +434,7 @@ async def _download_archive_trades(
stop_on_404: bool,
) -> list[list]:
# daily dataframes, `None` indicates missing data in that day (when `stop_on_404` is False)
result: list[list] = []
results: list[list] = []
# the current day being processing, starting at 1.
current_day = 0
@@ -438,7 +449,7 @@ async def _download_archive_trades(
for task in tasks:
current_day += 1
try:
df = await task
result = await task
except Http404 as e:
if stop_on_404:
logger.debug(f"Failed to download {e.url} due to 404.")
@@ -465,14 +476,13 @@ async def _download_archive_trades(
"remaining data, this can take more time."
)
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return concat_safe(dfs)
else:
dfs.append(None)
return results
except BaseException as e:
logger.warning(f"An exception raised: : {e}")
# Directly return the existing data, do not allow the gap within the data
await cancel_and_await_tasks(tasks[tasks.index(task) + 1 :])
return concat_safe(dfs)
return results
else:
dfs.append(df)
return concat_safe(dfs)
# Happy case
results.extend(result)
return results