feat: cancel uncompleted tasks before return

This commit is contained in:
xzmeng
2024-11-14 05:24:31 +08:00
parent 40f70a1cc0
commit 6b18c4f24c

View File

@@ -126,10 +126,12 @@ async def _fetch_ohlcv(
async with aiohttp.ClientSession(connector=connector, trust_env=True) as session:
# the HTTP connections has been throttled by TCPConnector
for dates in chunks(list(date_range(start, end)), 1000):
results = await asyncio.gather(
*(get_daily_ohlcv(asset_type, symbol, timeframe, date, session) for date in dates)
)
for result in results:
tasks = [
asyncio.create_task(get_daily_ohlcv(asset_type, symbol, timeframe, date, session))
for date in dates
]
for task in tasks:
result = await task
current_day += 1
if isinstance(result, Http404):
if stop_on_404:
@@ -152,20 +154,29 @@ async def _fetch_ohlcv(
f"data missing: {result.url}, fall back to rest API for the "
"remaining data download, this can take more time."
)
logger.debug("Abort downloading from data.binance.vision due to 404")
await cancel_uncompleted_tasks(tasks)
return concat(dfs)
else:
dfs.append(None)
elif isinstance(result, BaseException):
logger.warning(f"An exception raised: : {result}")
# Directly return the existing data, do not allow the gap within the data
await cancel_uncompleted_tasks(tasks)
return concat(dfs)
else:
dfs.append(result)
return concat(dfs)
async def cancel_uncompleted_tasks(tasks):
logger.debug("Try to cancel uncompleted download tasks.")
uncompleted_tasks = [task for task in tasks if not task.done()]
for task in uncompleted_tasks:
task.cancel()
await asyncio.gather(*uncompleted_tasks)
logger.debug("All uncompleted download tasks were successfully cancelled.")
def date_range(start: datetime.date, end: datetime.date):
date = start
while date <= end:
@@ -253,10 +264,12 @@ async def get_daily_ohlcv(
df["date"] = pd.to_datetime(df["date"], unit="ms", utc=True)
return df
elif resp.status == 404:
logger.debug(f"No data available for {symbol} in {format_date(date)}")
logger.debug(f"Failed to download {url}")
raise Http404(f"404: {url}", date, url)
else:
raise BadHttpStatus(f"{resp.status} - {resp.reason}")
except asyncio.CancelledError as e:
return e
except Exception as e:
if isinstance(e, Http404):
return e