refactor: extract websocket builder logic to it's own function

This commit is contained in:
Matthias
2025-05-10 11:24:18 +02:00
parent 950a0df8b1
commit 0dfc4ed696

View File

@@ -2414,6 +2414,34 @@ class Exchange:
data = sorted(data, key=lambda x: x[0])
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
def _try_build_from_websocket(
self, pair: str, timeframe: str, candle_type: CandleType
) -> Coroutine[Any, Any, OHLCVResponse] | None:
"""
Try to build a coroutine to get data from websocket.
"""
if self._exchange_ws:
candle_ts = dt_ts(timeframe_to_prev_date(timeframe))
prev_candle_ts = dt_ts(date_minus_candles(timeframe, 1))
candles = self._exchange_ws.ohlcvs(pair, timeframe)
half_candle = int(candle_ts - (candle_ts - prev_candle_ts) * 0.5)
last_refresh_time = int(
self._exchange_ws.klines_last_refresh.get((pair, timeframe, candle_type), 0)
)
if candles and candles[-1][0] >= prev_candle_ts and last_refresh_time >= half_candle:
# Usable result, candle contains the previous candle.
# Also, we check if the last refresh time is no more than half the candle ago.
logger.debug(f"reuse watch result for {pair}, {timeframe}, {last_refresh_time}")
return self._exchange_ws.get_ohlcv(pair, timeframe, candle_type, candle_ts)
logger.info(
f"Couldn't reuse watch for {pair}, {timeframe}, falling back to REST api. "
f"{candle_ts < last_refresh_time}, {candle_ts}, {last_refresh_time}, "
f"{format_ms_time(candle_ts)}, {format_ms_time(last_refresh_time)} "
)
return None
def _build_coroutine(
self,
pair: str,
@@ -2432,30 +2460,9 @@ class Exchange:
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
min_ts = dt_ts(date_minus_candles(timeframe, candle_limit - 5))
if self._exchange_ws:
candle_ts = dt_ts(timeframe_to_prev_date(timeframe))
prev_candle_ts = dt_ts(date_minus_candles(timeframe, 1))
candles = self._exchange_ws.ohlcvs(pair, timeframe)
half_candle = int(candle_ts - (candle_ts - prev_candle_ts) * 0.5)
last_refresh_time = int(
self._exchange_ws.klines_last_refresh.get((pair, timeframe, candle_type), 0)
)
if (
candles
and candles[-1][0] >= prev_candle_ts
and last_refresh_time >= half_candle
):
# Usable result, candle contains the previous candle.
# Also, we check if the last refresh time is no more than half the candle ago.
logger.debug(f"reuse watch result for {pair}, {timeframe}, {last_refresh_time}")
return self._exchange_ws.get_ohlcv(pair, timeframe, candle_type, candle_ts)
logger.info(
f"Couldn't reuse watch for {pair}, {timeframe}, falling back to REST api. "
f"{candle_ts < last_refresh_time}, {candle_ts}, {last_refresh_time}, "
f"{format_ms_time(candle_ts)}, {format_ms_time(last_refresh_time)} "
)
if ws_resp := self._try_build_from_websocket(pair, timeframe, candle_type):
# We have a usable websocket response
return ws_resp
# Check if 1 call can get us updated candles without hole in the data.
if min_ts < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0):