From 6b18c4f24c16213c52ec098863b787298322072e Mon Sep 17 00:00:00 2001 From: xzmeng Date: Thu, 14 Nov 2024 05:24:31 +0800 Subject: [PATCH] feat: cancel uncompleted tasks before return --- freqtrade/exchange/binance_public_data.py | 27 +++++++++++++++++------ 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/freqtrade/exchange/binance_public_data.py b/freqtrade/exchange/binance_public_data.py index 734f54723..a0197b98b 100644 --- a/freqtrade/exchange/binance_public_data.py +++ b/freqtrade/exchange/binance_public_data.py @@ -126,10 +126,12 @@ async def _fetch_ohlcv( async with aiohttp.ClientSession(connector=connector, trust_env=True) as session: # the HTTP connections has been throttled by TCPConnector for dates in chunks(list(date_range(start, end)), 1000): - results = await asyncio.gather( - *(get_daily_ohlcv(asset_type, symbol, timeframe, date, session) for date in dates) - ) - for result in results: + tasks = [ + asyncio.create_task(get_daily_ohlcv(asset_type, symbol, timeframe, date, session)) + for date in dates + ] + for task in tasks: + result = await task current_day += 1 if isinstance(result, Http404): if stop_on_404: @@ -152,20 +154,29 @@ async def _fetch_ohlcv( f"data missing: {result.url}, fall back to rest API for the " "remaining data download, this can take more time." ) - - logger.debug("Abort downloading from data.binance.vision due to 404") + await cancel_uncompleted_tasks(tasks) return concat(dfs) else: dfs.append(None) elif isinstance(result, BaseException): logger.warning(f"An exception raised: : {result}") # Directly return the existing data, do not allow the gap within the data + await cancel_uncompleted_tasks(tasks) return concat(dfs) else: dfs.append(result) return concat(dfs) +async def cancel_uncompleted_tasks(tasks): + logger.debug("Try to cancel uncompleted download tasks.") + uncompleted_tasks = [task for task in tasks if not task.done()] + for task in uncompleted_tasks: + task.cancel() + await asyncio.gather(*uncompleted_tasks) + logger.debug("All uncompleted download tasks were successfully cancelled.") + + def date_range(start: datetime.date, end: datetime.date): date = start while date <= end: @@ -253,10 +264,12 @@ async def get_daily_ohlcv( df["date"] = pd.to_datetime(df["date"], unit="ms", utc=True) return df elif resp.status == 404: - logger.debug(f"No data available for {symbol} in {format_date(date)}") + logger.debug(f"Failed to download {url}") raise Http404(f"404: {url}", date, url) else: raise BadHttpStatus(f"{resp.status} - {resp.reason}") + except asyncio.CancelledError as e: + return e except Exception as e: if isinstance(e, Http404): return e