chore: improve download-trades-fast structure

This commit is contained in:
Matthias
2025-02-15 15:20:06 +01:00
parent f2283410ce
commit ad96fa0f58

View File

@@ -332,6 +332,55 @@ async def download_archive_trades(
return pair, []
def parse_trades_from_zip(csvf):
# https://github.com/binance/binance-public-data/issues/283
first_byte = csvf.read(1)[0]
if chr(first_byte).isdigit():
# spot
header = None
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
"is_best_match",
]
else:
# futures
header = 0
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
]
csvf.seek(0)
df = pd.read_csv(
csvf,
names=names,
header=header,
)
df.loc[:, "cost"] = df["price"] * df["amount"]
# Side is reversed intentionally
# based on ccxt parseTrade logic.
df.loc[:, "side"] = np.where(df["is_buyer_maker"], "sell", "buy")
df.loc[:, "type"] = None
# Convert timestamp to ms
df.loc[:, "timestamp"] = np.where(
df["timestamp"] > 10000000000000,
df["timestamp"] // 1000,
df["timestamp"],
)
return df.loc[:, DEFAULT_TRADES_COLUMNS].to_records(index=False).tolist()
async def get_daily_trades(
symbol: str,
candle_type: CandleType,
@@ -372,58 +421,14 @@ async def get_daily_trades(
logger.debug(f"Successfully downloaded {url}")
with zipfile.ZipFile(BytesIO(content)) as zipf:
with zipf.open(zipf.namelist()[0]) as csvf:
# https://github.com/binance/binance-public-data/issues/283
first_byte = csvf.read(1)[0]
if chr(first_byte).isdigit():
# spot
header = None
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
"is_best_match",
]
else:
# futures
header = 0
names = [
"id",
"price",
"amount",
"first_trade_id",
"last_trade_id",
"timestamp",
"is_buyer_maker",
]
csvf.seek(0)
df = pd.read_csv(
csvf,
names=names,
header=header,
)
df["cost"] = df["price"] * df["amount"]
# Side is reversed intentionally
# based on ccxt parseTrade logic.
df["side"] = np.where(df["is_buyer_maker"], "sell", "buy")
df["type"] = None
# Convert timestamp to ms
df["timestamp"] = np.where(
df["timestamp"] > 10000000000000,
df["timestamp"] // 1000,
df["timestamp"],
)
return df[DEFAULT_TRADES_COLUMNS].to_records(index=False).tolist()
return parse_trades_from_zip(csvf)
elif resp.status == 404:
logger.debug(f"Failed to download {url}")
raise Http404(f"404: {url}", date, url)
else:
raise BadHttpStatus(f"{resp.status} - {resp.reason}")
except Exception as e:
logger.info("download Daily_trades raised: %s", e)
retry += 1
if isinstance(e, Http404) or retry > retry_count:
logger.debug(f"Failed to get data from {url}: {e}")
@@ -446,7 +451,7 @@ async def _download_archive_trades(
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector, trust_env=True) as session:
# the HTTP connections has been throttled by TCPConnector
for dates in chunks(list(date_range(start, end)), 1000):
for dates in chunks(list(date_range(start, end)), 5):
tasks = [
asyncio.create_task(get_daily_trades(symbol, candle_type, date, session))
for date in dates
@@ -490,4 +495,5 @@ async def _download_archive_trades(
else:
# Happy case
results.extend(result)
return results