chore: use loop_lock for all run_until_complete usages

This commit is contained in:
Matthias
2025-02-02 20:10:57 +01:00
parent 8cad9c8436
commit 2bfd66d271
2 changed files with 25 additions and 21 deletions

View File

@@ -140,9 +140,10 @@ class Binance(Exchange):
:param candle_type: Any of the enum CandleType (must match trading mode!) :param candle_type: Any of the enum CandleType (must match trading mode!)
""" """
if is_new_pair: if is_new_pair:
x = self.loop.run_until_complete( with self._loop_lock:
self._async_get_candle_history(pair, timeframe, candle_type, 0) x = self.loop.run_until_complete(
) self._async_get_candle_history(pair, timeframe, candle_type, 0)
)
if x and x[3] and x[3][0] and x[3][0][0] > since_ms: if x and x[3] and x[3][0] and x[3][0][0] > since_ms:
# Set starting date to first available candle. # Set starting date to first available candle.
since_ms = x[3][0][0] since_ms = x[3][0][0]
@@ -201,16 +202,17 @@ class Binance(Exchange):
""" """
Fastly fetch OHLCV data by leveraging https://data.binance.vision. Fastly fetch OHLCV data by leveraging https://data.binance.vision.
""" """
df = self.loop.run_until_complete( with self._loop_lock:
download_archive_ohlcv( df = self.loop.run_until_complete(
candle_type=candle_type, download_archive_ohlcv(
pair=pair, candle_type=candle_type,
timeframe=timeframe, pair=pair,
since_ms=since_ms, timeframe=timeframe,
until_ms=until_ms, since_ms=since_ms,
markets=self.markets, until_ms=until_ms,
markets=self.markets,
)
) )
)
# download the remaining data from rest API # download the remaining data from rest API
if df.empty: if df.empty:

View File

@@ -648,7 +648,8 @@ class Exchange:
def _load_async_markets(self, reload: bool = False) -> dict[str, Any]: def _load_async_markets(self, reload: bool = False) -> dict[str, Any]:
try: try:
markets = self.loop.run_until_complete(self._api_reload_markets(reload=reload)) with self._loop_lock:
markets = self.loop.run_until_complete(self._api_reload_markets(reload=reload))
if isinstance(markets, Exception): if isinstance(markets, Exception):
raise markets raise markets
@@ -2342,15 +2343,16 @@ class Exchange:
:param until_ms: Timestamp in milliseconds to get history up to :param until_ms: Timestamp in milliseconds to get history up to
:return: Dataframe with candle (OHLCV) data :return: Dataframe with candle (OHLCV) data
""" """
pair, _, _, data, _ = self.loop.run_until_complete( with self._loop_lock:
self._async_get_historic_ohlcv( pair, _, _, data, _ = self.loop.run_until_complete(
pair=pair, self._async_get_historic_ohlcv(
timeframe=timeframe, pair=pair,
since_ms=since_ms, timeframe=timeframe,
until_ms=until_ms, since_ms=since_ms,
candle_type=candle_type, until_ms=until_ms,
candle_type=candle_type,
)
) )
)
logger.debug(f"Downloaded data for {pair} from ccxt with length {len(data)}.") logger.debug(f"Downloaded data for {pair} from ccxt with length {len(data)}.")
return ohlcv_to_dataframe(data, timeframe, pair, fill_missing=False, drop_incomplete=True) return ohlcv_to_dataframe(data, timeframe, pair, fill_missing=False, drop_incomplete=True)