Fix unnecessary deep intend

This commit is contained in:
Joe Schr
2024-02-06 20:13:23 +01:00
parent aa663b926a
commit c04cce52ea

View File

@@ -1981,53 +1981,54 @@ class Exchange:
return data return data
async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, async def _async_get_historic_ohlcv(self, pair: str, timeframe: str,
since_ms: int, candle_type: CandleType,
is_new_pair: bool = False, raise_: bool = False,
until_ms: Optional[int] = None
) -> OHLCVResponse:
"""
Download historic ohlcv
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type, since_ms)
logger.debug(
"one_call: %s msecs (%s)",
one_call,
dt_humanize(dt_now() - timedelta(milliseconds=one_call), only_distance=True)
)
input_coroutines = [self._async_get_candle_history(
pair, timeframe, candle_type, since) for since in
range(since_ms, until_ms or dt_ts(), one_call)]
data: List = []
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100):
results = await asyncio.gather(*input_coro, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
logger.warning(f"Async code raised an exception: {repr(res)}")
if raise_:
raise
continue
else:
# Deconstruct tuple if it's not an exception
p, _, c, new_data, _ = res
if p == pair and c == candle_type:
data.extend(new_data)
# Sort data again after extending the result - above calls return in "async order"
data = sorted(data, key=lambda x: x[0])
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
async def _async_get_historic_trades(self, pair: str, timeframe: str,
since_ms: int, candle_type: CandleType, since_ms: int, candle_type: CandleType,
is_new_pair: bool = False, raise_: bool = False, is_new_pair: bool = False, raise_: bool = False,
until_ms: Optional[int] = None until_ms: Optional[int] = None
) -> Ticker: ) -> OHLCVResponse:
"""
Download historic ohlcv
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type, since_ms)
logger.debug(
"one_call: %s msecs (%s)",
one_call,
dt_humanize(dt_now() - timedelta(milliseconds=one_call),
only_distance=True)
)
input_coroutines = [self._async_get_candle_history(
pair, timeframe, candle_type, since) for since in
range(since_ms, until_ms or dt_ts(), one_call)]
data: List = []
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100):
results = await asyncio.gather(*input_coro, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
logger.warning(
f"Async code raised an exception: {repr(res)}")
if raise_:
raise
continue
else:
# Deconstruct tuple if it's not an exception
p, _, c, new_data, _ = res
if p == pair and c == candle_type:
data.extend(new_data)
# Sort data again after extending the result - above calls return in "async order"
data = sorted(data, key=lambda x: x[0])
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
async def _async_get_historic_trades(self, pair: str, timeframe: str,
since_ms: int, candle_type: CandleType,
is_new_pair: bool = False, raise_: bool = False,
until_ms: Optional[int] = None
) -> Ticker:
""" """
Download historic trades Download historic trades
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading :param is_new_pair: used by binance subclass to allow "fast" new pair downloading
@@ -2060,41 +2061,41 @@ class Exchange:
# Deconstruct tuple if it's not an exception # Deconstruct tuple if it's not an exception
p, _, c, new_data, _ = res p, _, c, new_data, _ = res
if p == pair and c == candle_type: if p == pair and c == candle_type:
data.extend(new_data) data.extend(new_data)
# Sort data again after extending the result - above calls return in "async order" # Sort data again after extending the result - above calls return in "async order"
data = sorted(data, key=lambda x: x['timestamp'])# TODO: sort via 'timestamp' or 'id'? data = sorted(data, key=lambda x: x['timestamp']) # TODO: sort via 'timestamp' or 'id'?
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
def _build_coroutine_get_ohlcv( def _build_coroutine_get_ohlcv(
self, pair: str, timeframe: str, candle_type: CandleType, self, pair: str, timeframe: str, candle_type: CandleType,
since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]:
not_all_data = cache and self.required_candle_call_count > 1 not_all_data = cache and self.required_candle_call_count > 1
if cache and (pair, timeframe, candle_type) in self._klines: if cache and (pair, timeframe, candle_type) in self._klines:
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type) candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp() min_date = date_minus_candles(
# Check if 1 call can get us updated candles without hole in the data. timeframe, candle_limit - 5).timestamp()
if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0): # Check if 1 call can get us updated candles without hole in the data.
# Cache can be used - do one-off call. if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0):
not_all_data = False # Cache can be used - do one-off call.
else: not_all_data = False
# Time jump detected, evict cache
logger.info(
f"Time jump detected. Evicting ohlcv cache for {pair}, {timeframe}, {candle_type}")
del self._klines[(pair, timeframe, candle_type)]
if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)):
# Multiple calls for one pair - to get more history
since_ms = self.needed_candle_ms(timeframe,candle_type)
# TODO: fetch_trades and return as results
if since_ms:
return self._async_get_historic_ohlcv(
pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type)
else: else:
# One call ... "regular" refresh # Time jump detected, evict cache
return self._async_get_candle_history( logger.info(
pair, timeframe, since_ms=since_ms, candle_type=candle_type) f"Time jump detected. Evicting ohlcv cache for {pair}, {timeframe}, {candle_type}")
del self._klines[(pair, timeframe, candle_type)]
if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)):
# Multiple calls for one pair - to get more history
since_ms = self.needed_candle_ms(timeframe, candle_type)
# TODO: fetch_trades and return as results
if since_ms:
return self._async_get_historic_ohlcv(
pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type)
else:
# One call ... "regular" refresh
return self._async_get_candle_history(
pair, timeframe, since_ms=since_ms, candle_type=candle_type)
def _build_coroutine_get_trades( def _build_coroutine_get_trades(
self, pair: str, timeframe: str, candle_type: CandleType, self, pair: str, timeframe: str, candle_type: CandleType,
@@ -2129,38 +2130,36 @@ class Exchange:
return self._async_get_trades_history( return self._async_get_trades_history(
pair, timeframe, since_ms=since_ms, candle_type=candle_type) pair, timeframe, since_ms=since_ms, candle_type=candle_type)
def _build_ohlcv_dl_jobs( def _build_ohlcv_dl_jobs(
self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int],
cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]: cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]:
""" """
Build Coroutines to execute as part of refresh_latest_ohlcv Build Coroutines to execute as part of refresh_latest_ohlcv
""" """
input_coroutines: List[Coroutine[Any, Any, Ticker]] = [] input_coroutines: List[Coroutine[Any, Any, Ticker]] = []
cached_pairs = [] cached_pairs = []
for pair, timeframe, candle_type in set(pair_list): for pair, timeframe, candle_type in set(pair_list):
if (timeframe not in self.timeframes if (timeframe not in self.timeframes
and candle_type in (CandleType.SPOT, CandleType.FUTURES)): and candle_type in (CandleType.SPOT, CandleType.FUTURES)):
logger.warning( logger.warning(
f"Cannot download ({pair}, {timeframe}) combination as this timeframe is " f"Cannot download ({pair}, {timeframe}) combination as this timeframe is "
f"not available on {self.name}. Available timeframes are " f"not available on {self.name}. Available timeframes are "
f"{', '.join(self.timeframes)}.") f"{', '.join(self.timeframes)}.")
continue continue
if ((pair, timeframe, candle_type) not in self._klines or not cache if ((pair, timeframe, candle_type) not in self._klines or not cache
or self._now_is_time_to_refresh(pair, timeframe, candle_type)): or self._now_is_time_to_refresh(pair, timeframe, candle_type)):
input_coroutines.append( input_coroutines.append(
self._build_coroutine_get_ohlcv(pair, timeframe, candle_type, since_ms, cache)) self._build_coroutine_get_ohlcv(pair, timeframe, candle_type, since_ms, cache))
else: else:
logger.debug( logger.debug(
f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..." f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..."
) )
cached_pairs.append((pair, timeframe, candle_type)) cached_pairs.append((pair, timeframe, candle_type))
return input_coroutines, cached_pairs
return input_coroutines, cached_pairs
def _build_trades_dl_jobs( def _build_trades_dl_jobs(
self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int],
@@ -2209,116 +2208,124 @@ class Exchange:
return input_coroutines, cached_pairs return input_coroutines, cached_pairs
def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List],
cache: bool, drop_incomplete: bool) -> DataFrame: cache: bool, drop_incomplete: bool) -> DataFrame:
# keeping last candle time as last refreshed time of the pair # keeping last candle time as last refreshed time of the pair
if ticks and cache: if ticks and cache:
self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000 self._pairs_last_refresh_time[(
# keeping parsed dataframe in cache pair, timeframe, c_type)] = ticks[-1][0] // 1000
ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True, # keeping parsed dataframe in cache
drop_incomplete=drop_incomplete) ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True,
if cache: drop_incomplete=drop_incomplete)
if (pair, timeframe, c_type) in self._klines: if cache:
old = self._klines[(pair, timeframe, c_type)] if (pair, timeframe, c_type) in self._klines:
# Reassign so we return the updated, combined df old = self._klines[(pair, timeframe, c_type)]
ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair, # Reassign so we return the updated, combined df
fill_missing=True, drop_incomplete=False) ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair,
candle_limit = self.ohlcv_candle_limit(timeframe, self._config['candle_type_def']) fill_missing=True, drop_incomplete=False)
# Age out old candles candle_limit = self.ohlcv_candle_limit(
ohlcv_df = ohlcv_df.tail(candle_limit + self._startup_candle_count) timeframe, self._config['candle_type_def'])
ohlcv_df = ohlcv_df.reset_index(drop=True) # Age out old candles
self._klines[(pair, timeframe, c_type)] = ohlcv_df ohlcv_df = ohlcv_df.tail(
else: candle_limit + self._startup_candle_count)
self._klines[(pair, timeframe, c_type)] = ohlcv_df ohlcv_df = ohlcv_df.reset_index(drop=True)
return ohlcv_df self._klines[(pair, timeframe, c_type)] = ohlcv_df
else:
self._klines[(pair, timeframe, c_type)] = ohlcv_df
return ohlcv_df
def _process_trades_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], def _process_trades_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List],
cache: bool, drop_incomplete: bool, first_required_candle_date:Optional[int]) -> DataFrame: cache: bool, drop_incomplete: bool, first_required_candle_date: Optional[int]) -> DataFrame:
# keeping parsed dataframe in cache # keeping parsed dataframe in cache
# TODO: pass last_full_candle_date to drop as incomplete # TODO: pass last_full_candle_date to drop as incomplete
trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False, trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False,
drop_incomplete=drop_incomplete) drop_incomplete=drop_incomplete)
# keeping last candle time as last refreshed time of the pair # keeping last candle time as last refreshed time of the pair
if ticks and cache: if ticks and cache:
idx = -2 if drop_incomplete and len(ticks) > 1 else -1 idx = -2 if drop_incomplete and len(ticks) > 1 else -1
self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # NOTE: // is floor: divides and rounds to nearest int self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # NOTE: // is floor: divides and rounds to nearest int
if cache: if cache:
if (pair, timeframe, c_type) in self._trades: if (pair, timeframe, c_type) in self._trades:
old = self._trades[(pair, timeframe, c_type)] old = self._trades[(pair, timeframe, c_type)]
# Reassign so we return the updated, combined df # Reassign so we return the updated, combined df
trades_df = clean_duplicate_trades(concat([old, trades_df], axis=0), timeframe, pair, fill_missing=False, drop_incomplete=False) trades_df = clean_duplicate_trades(concat(
[old, trades_df], axis=0), timeframe, pair, fill_missing=False, drop_incomplete=False)
# warn_of_tick_duplicates(trades_df, pair) # warn_of_tick_duplicates(trades_df, pair)
# Age out old candles # Age out old candles
if first_required_candle_date: if first_required_candle_date:
# slice of older dates # slice of older dates
trades_df = trades_df[first_required_candle_date < trades_df['timestamp']] trades_df = trades_df[first_required_candle_date <
trades_df['timestamp']]
trades_df = trades_df.reset_index(drop=True) trades_df = trades_df.reset_index(drop=True)
self._trades[(pair, timeframe, c_type)] = trades_df self._trades[(pair, timeframe, c_type)] = trades_df
return trades_df return trades_df
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None, cache: bool = True, since_ms: Optional[int] = None, cache: bool = True,
drop_incomplete: Optional[bool] = None drop_incomplete: Optional[bool] = None
) -> Dict[PairWithTimeframe, DataFrame]: ) -> Dict[PairWithTimeframe, DataFrame]:
""" """
Refresh in-memory OHLCV asynchronously and set `_klines` with the result Refresh in-memory OHLCV asynchronously and set `_klines` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
Only used in the dataprovider.refresh() method. Only used in the dataprovider.refresh() method.
:param pair_list: List of 2 element tuples containing pair, interval to refresh :param pair_list: List of 2 element tuples containing pair, interval to refresh
:param since_ms: time since when to download, in milliseconds :param since_ms: time since when to download, in milliseconds
:param cache: Assign result to _trades. Usefull for one-off downloads like for pairlists :param cache: Assign result to _trades. Usefull for one-off downloads like for pairlists
:param drop_incomplete: Control candle dropping. :param drop_incomplete: Control candle dropping.
Specifying None defaults to _ohlcv_partial_candle Specifying None defaults to _ohlcv_partial_candle
:return: Dict of [{(pair, timeframe): Dataframe}] :return: Dict of [{(pair, timeframe): Dataframe}]
""" """
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) logger.debug(
"Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
# Gather coroutines to run # Gather coroutines to run
input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache) input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(
pair_list, since_ms, cache)
results_df = {} results_df = {}
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100): for input_coro in chunks(input_coroutines, 100):
async def gather_stuff(): async def gather_stuff():
return await asyncio.gather(*input_coro, return_exceptions=True) return await asyncio.gather(*input_coro, return_exceptions=True)
with self._loop_lock: with self._loop_lock:
results = self.loop.run_until_complete(gather_stuff()) results = self.loop.run_until_complete(gather_stuff())
for res in results: for res in results:
if isinstance(res, Exception): if isinstance(res, Exception):
logger.warning(f"Async code raised an exception: {repr(res)}") logger.warning(
continue f"Async code raised an exception: {repr(res)}")
# Deconstruct tuple (has 5 elements) continue
pair, timeframe, c_type, ticks, drop_hint = res # Deconstruct tuple (has 5 elements)
drop_incomplete = drop_hint if drop_incomplete is None else drop_incomplete pair, timeframe, c_type, ticks, drop_hint = res
# TODO: here ohlcv candles get saved into self._trades drop_incomplete = drop_hint if drop_incomplete is None else drop_incomplete
ohlcv_df = self._process_ohlcv_df( # TODO: here ohlcv candles get saved into self._trades
pair, timeframe, c_type, ticks, cache, drop_incomplete) ohlcv_df = self._process_ohlcv_df(
pair, timeframe, c_type, ticks, cache, drop_incomplete)
results_df[(pair, timeframe, c_type)] = ohlcv_df results_df[(pair, timeframe, c_type)] = ohlcv_df
# Return cached trades # Return cached trades
for pair, timeframe, c_type in cached_pairs: for pair, timeframe, c_type in cached_pairs:
results_df[(pair, timeframe, c_type)] = self.klines( results_df[(pair, timeframe, c_type)] = self.klines(
(pair, timeframe, c_type), (pair, timeframe, c_type),
copy=False copy=False
) )
return results_df return results_df
def needed_candle_ms(self, timeframe:str, candle_type:CandleType): def needed_candle_ms(self, timeframe: str, candle_type:CandleType):
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type) timeframe, candle_type)
move_to = one_call * self.required_candle_call_count move_to = one_call * self.required_candle_call_count
now = timeframe_to_next_date(timeframe) now = timeframe_to_next_date(timeframe)
return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000)
def refresh_latest_trades(self, def refresh_latest_trades(self,
pair_list: ListPairsWithTimeframes , pair_list: ListPairsWithTimeframes,
data_handler: Callable,# using IDataHandler ends with circular import, data_handler: Callable, # using IDataHandler ends with circular import,
*, *,
cache: bool = True, cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]: ) -> Dict[PairWithTimeframe, DataFrame]:
""" """
Refresh in-memory TRADES asynchronously and set `_trades` with the result Refresh in-memory TRADES asynchronously and set `_trades` with the result