Merge pull request #10484 from freqtrade/feat/trades_async

Move trades-refresh to async
This commit is contained in:
Matthias
2024-08-13 15:18:26 +02:00
committed by GitHub

View File

@@ -2487,17 +2487,17 @@ class Exchange:
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
# Gather coroutines to run # Gather coroutines to run
input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache) ohlcv_dl_jobs, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache)
results_df = {} results_df = {}
# Chunk requests into batches of 100 to avoid overwhelming ccxt Throttling # Chunk requests into batches of 100 to avoid overwhelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100): for dl_jobs_batch in chunks(ohlcv_dl_jobs, 100):
async def gather_stuff(coro): async def gather_coroutines(coro):
return await asyncio.gather(*coro, return_exceptions=True) return await asyncio.gather(*coro, return_exceptions=True)
with self._loop_lock: with self._loop_lock:
results = self.loop.run_until_complete(gather_stuff(input_coro)) results = self.loop.run_until_complete(gather_coroutines(dl_jobs_batch))
for res in results: for res in results:
if isinstance(res, Exception): if isinstance(res, Exception):
@@ -2696,6 +2696,94 @@ class Exchange:
self._trades[(pair, timeframe, c_type)] = trades_df self._trades[(pair, timeframe, c_type)] = trades_df
return trades_df return trades_df
async def _build_trades_dl_jobs(
self, pairwt: PairWithTimeframe, data_handler, cache: bool
) -> Tuple[PairWithTimeframe, Optional[DataFrame]]:
"""
Build coroutines to refresh trades for (they're then called through async.gather)
"""
pair, timeframe, candle_type = pairwt
since_ms = None
new_ticks: List = []
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"])
first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type)
# refresh, if
# a. not in _trades
# b. no cache used
# c. need new data
is_in_cache = (pair, timeframe, candle_type) in self._trades
if (
not is_in_cache
or not cache
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)
):
logger.debug(f"Refreshing TRADES data for {pair}")
# fetch trades since latest _trades and
# store together with existing trades
try:
until = None
from_id = None
if is_in_cache:
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"]
until = dt_ts() # now
else:
until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000
all_stored_ticks_df = data_handler.trades_load(
f"{pair}-cached", self.trading_mode
)
if not all_stored_ticks_df.empty:
if (
all_stored_ticks_df.iloc[-1]["timestamp"] > first_candle_ms
and all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms
):
# Use cache and populate further
last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"]
from_id = all_stored_ticks_df.iloc[-1]["id"]
# only use cached if it's closer than first_candle_ms
since_ms = (
last_cached_ms
if last_cached_ms > first_candle_ms
else first_candle_ms
)
else:
# Skip cache, it's too old
all_stored_ticks_df = DataFrame(
columns=DEFAULT_TRADES_COLUMNS + ["date"]
)
# from_id overrules with exchange set to id paginate
[_, new_ticks] = await self._async_get_trade_history(
pair,
since=since_ms if since_ms else first_candle_ms,
until=until,
from_id=from_id,
)
except Exception:
logger.exception(f"Refreshing TRADES data for {pair} failed")
return pairwt, None
if new_ticks:
all_stored_ticks_list = all_stored_ticks_df[DEFAULT_TRADES_COLUMNS].values.tolist()
all_stored_ticks_list.extend(new_ticks)
trades_df = self._process_trades_df(
pair,
timeframe,
candle_type,
all_stored_ticks_list,
cache,
first_required_candle_date=first_candle_ms,
)
data_handler.trades_store(
f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode
)
return pairwt, trades_df
else:
logger.error(f"No new ticks for {pair}")
return pairwt, None
def refresh_latest_trades( def refresh_latest_trades(
self, self,
pair_list: ListPairsWithTimeframes, pair_list: ListPairsWithTimeframes,
@@ -2716,90 +2804,25 @@ class Exchange:
self._config["datadir"], data_format=self._config["dataformat_trades"] self._config["datadir"], data_format=self._config["dataformat_trades"]
) )
logger.debug("Refreshing TRADES data for %d pairs", len(pair_list)) logger.debug("Refreshing TRADES data for %d pairs", len(pair_list))
since_ms = None
results_df = {} results_df = {}
for pair, timeframe, candle_type in set(pair_list): trades_dl_jobs = []
new_ticks: List = [] for pair_wt in set(pair_list):
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"]) trades_dl_jobs.append(self._build_trades_dl_jobs(pair_wt, data_handler, cache))
first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type)
# refresh, if
# a. not in _trades
# b. no cache used
# c. need new data
is_in_cache = (pair, timeframe, candle_type) in self._trades
if (
not is_in_cache
or not cache
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)
):
logger.debug(f"Refreshing TRADES data for {pair}")
# fetch trades since latest _trades and
# store together with existing trades
try:
until = None
from_id = None
if is_in_cache:
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"]
until = dt_ts() # now
else: async def gather_coroutines(coro):
until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000 return await asyncio.gather(*coro, return_exceptions=True)
all_stored_ticks_df = data_handler.trades_load(
f"{pair}-cached", self.trading_mode
)
if not all_stored_ticks_df.empty: for dl_job_chunk in chunks(trades_dl_jobs, 100):
if ( with self._loop_lock:
all_stored_ticks_df.iloc[-1]["timestamp"] > first_candle_ms results = self.loop.run_until_complete(gather_coroutines(dl_job_chunk))
and all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms
):
# Use cache and populate further
last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"]
from_id = all_stored_ticks_df.iloc[-1]["id"]
# only use cached if it's closer than first_candle_ms
since_ms = (
last_cached_ms
if last_cached_ms > first_candle_ms
else first_candle_ms
)
else:
# Skip cache, it's too old
all_stored_ticks_df = DataFrame(
columns=DEFAULT_TRADES_COLUMNS + ["date"]
)
# from_id overrules with exchange set to id paginate for res in results:
[_, new_ticks] = self.get_historic_trades( if isinstance(res, Exception):
pair, logger.warning(f"Async code raised an exception: {repr(res)}")
since=since_ms if since_ms else first_candle_ms,
until=until,
from_id=from_id,
)
except Exception:
logger.exception(f"Refreshing TRADES data for {pair} failed")
continue continue
pairwt, trades_df = res
if new_ticks: if trades_df is not None:
all_stored_ticks_list = all_stored_ticks_df[ results_df[pairwt] = trades_df
DEFAULT_TRADES_COLUMNS
].values.tolist()
all_stored_ticks_list.extend(new_ticks)
trades_df = self._process_trades_df(
pair,
timeframe,
candle_type,
all_stored_ticks_list,
cache,
first_required_candle_date=first_candle_ms,
)
results_df[(pair, timeframe, candle_type)] = trades_df
data_handler.trades_store(
f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode
)
else:
logger.error(f"No new ticks for {pair}")
return results_df return results_df