diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index b3bccc88f..bde9cc6f6 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -40,7 +40,7 @@ from freqtrade.exchange.exchange_utils import (ROUND, ROUND_DOWN, ROUND_UP, Ccxt timeframe_to_minutes, timeframe_to_msecs, timeframe_to_next_date, timeframe_to_prev_date, timeframe_to_seconds) -from freqtrade.exchange.types import OHLCVResponse, OrderBook, Ticker, Tickers, TRADESResponse +from freqtrade.exchange.types import OHLCVResponse, OrderBook, Ticker, Tickers from freqtrade.misc import (chunks, deep_merge_dicts, file_dump_json, file_load_json, safe_value_fallback2) from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist @@ -2020,49 +2020,14 @@ class Exchange: data = sorted(data, key=lambda x: x[0]) return pair, timeframe, candle_type, data, self._ohlcv_partial_candle - async def _async_get_historic_trades(self, pair: str, timeframe: str, - since_ms: int, candle_type: CandleType, - is_new_pair: bool = False, raise_: bool = False, - until_ms: Optional[int] = None - ) -> Ticker: - """ - Download historic trades - :param is_new_pair: used by binance subclass to allow "fast" new pair downloading - :param candle_type: Any of the enum CandleType (must match trading mode!) - """ + def needed_candle_ms(self, timeframe: str, candle_type:CandleType): + one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( + timeframe, candle_type) + move_to = one_call * self.required_candle_call_count + now = timeframe_to_next_date(timeframe) + return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) - one_call = timeframe_to_msecs(timeframe) * self.trades_candle_limit( - timeframe, candle_type, since_ms) - logger.debug( - "one_call: %s msecs (%s)", - one_call, - dt_humanize(dt_now() - timedelta(milliseconds=one_call), only_distance=True) - ) - input_coroutines = [self._async_get_trades_history( - pair, timeframe, candle_type, since) for since in - range(since_ms, until_ms or dt_ts(), one_call)] - - data: List = [] - # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling - for input_coro in chunks(input_coroutines, 100): - - results = await asyncio.gather(*input_coro, return_exceptions=True) - for res in results: - if isinstance(res, BaseException): - logger.warning(f"Async code raised an exception: {repr(res)}") - if raise_: - raise - continue - else: - # Deconstruct tuple if it's not an exception - p, _, c, new_data, _ = res - if p == pair and c == candle_type: - data.extend(new_data) - # Sort data again after extending the result - above calls return in "async order" - data = sorted(data, key=lambda x: x['timestamp']) # TODO: sort via 'timestamp' or 'id'? - return pair, timeframe, candle_type, data, self._ohlcv_partial_candle - - def _build_coroutine_get_ohlcv( + def _build_coroutine( self, pair: str, timeframe: str, candle_type: CandleType, since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: not_all_data = cache and self.required_candle_call_count > 1 @@ -2076,7 +2041,7 @@ class Exchange: else: # Time jump detected, evict cache logger.info( - f"Time jump detected. Evicting ohlcv cache for {pair}, {timeframe}, {candle_type}") + f"Time jump detected. Evicting cache for {pair}, {timeframe}, {candle_type}") del self._klines[(pair, timeframe, candle_type)] if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)): @@ -2091,39 +2056,6 @@ class Exchange: return self._async_get_candle_history( pair, timeframe, since_ms=since_ms, candle_type=candle_type) - def _build_coroutine_get_trades( - self, pair: str, timeframe: str, candle_type: CandleType, - since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: - not_all_data = cache and self.required_candle_call_count > 1 - if cache and (pair, timeframe, candle_type) in self._trades: - candle_limit = self.trades_candle_limit(timeframe, candle_type) - min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp() - # Check if 1 call can get us updated candles without hole in the data. - if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0): - # Cache can be used - do one-off call. - not_all_data = False - else: - # Time jump detected, evict cache - logger.info( - f"Time jump detected. Evicting trades cache for {pair}, {timeframe}, {candle_type}") - del self._trades[(pair, timeframe, candle_type)] - - if (not since_ms or not_all_data): - # Multiple calls for one pair - to get more history - one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( - timeframe, candle_type, since_ms) - move_to = one_call * self.required_candle_call_count - now = timeframe_to_next_date(timeframe) - since_ms = int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) - - if since_ms: - return self._async_get_historic_trades( - pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type) - else: - # One call ... "regular" refresh - return self._async_get_trades_history( - pair, timeframe, since_ms=since_ms, candle_type=candle_type) - def _build_ohlcv_dl_jobs( self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]: @@ -2145,7 +2077,7 @@ class Exchange: or self._now_is_time_to_refresh(pair, timeframe, candle_type)): input_coroutines.append( - self._build_coroutine_get_ohlcv(pair, timeframe, candle_type, since_ms, cache)) + self._build_coroutine(pair, timeframe, candle_type, since_ms, cache)) else: logger.debug( @@ -2155,53 +2087,6 @@ class Exchange: return input_coroutines, cached_pairs - def _build_trades_dl_jobs( - self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], - cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]: - """ - Build Coroutines to execute as part of refresh_latest_trades - """ - input_coroutines: List[Coroutine[Any, Any, TRADESResponse]] = [] - cached_pairs = [] - for pair, timeframe, candle_type in set(pair_list): - if not since_ms: - plr = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) - # If we don't have a last refresh time, we need to download all trades - # This is the case when the bot is started - if not plr: - # using ohlcv_candle_limit here, because we calculate the distance - # to first required candle - one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( - timeframe, candle_type, since_ms) - target_candle = one_call * self.required_candle_call_count - now = timeframe_to_next_date(timeframe) - since_ms = int((now - timedelta(seconds=target_candle // 1000)).timestamp() - * 1000) - - else: since_ms = plr - - if (timeframe not in self.timeframes - and candle_type in (CandleType.SPOT, CandleType.FUTURES)): - logger.warning( - f"Cannot download ({pair}, {timeframe}) combination as this timeframe is " - f"not available on {self.name}. Available timeframes are " - f"{', '.join(self.timeframes)}.") - continue - - if ((pair, timeframe, candle_type) not in self._trades or not cache - or self._now_is_time_to_refresh(pair, timeframe, candle_type)): - - input_coroutines.append( - self._build_coroutine_get_trades(pair, timeframe, candle_type, since_ms, cache)) - - else: - logger.debug( - f"Using cached candle (TRADES) data for {pair}, {timeframe}, {candle_type} ..." - ) - cached_pairs.append((pair, timeframe, candle_type)) - - return input_coroutines, cached_pairs - def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], cache: bool, drop_incomplete: bool) -> DataFrame: # keeping last candle time as last refreshed time of the pair @@ -2301,13 +2186,6 @@ class Exchange: return results_df - def needed_candle_ms(self, timeframe: str, candle_type:CandleType): - one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( - timeframe, candle_type) - move_to = one_call * self.required_candle_call_count - now = timeframe_to_next_date(timeframe) - return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) - def refresh_latest_trades(self, pair_list: ListPairsWithTimeframes, data_handler: Callable, # using IDataHandler ends with circular import, @@ -2484,71 +2362,6 @@ class Exchange: f'for pair {pair}. Message: {e}') from e - @retrier_async - async def _async_get_trades_history( - self, - pair: str, - timeframe: str, - candle_type: CandleType, - since_ms: Optional[int] = None, - ) -> Ticker: - """ - Asynchronously get candle history data using fetch_trades - :param candle_type: '', mark, index, premiumIndex, or funding_rate - returns tuple: (pair, timeframe, trades_list) - """ - try: - # Fetch TRADES asynchronously - logger.debug( - "Fetching pair %s, %s, interval %s, since %s ...", - pair, candle_type, timeframe, since_ms - ) - params = deepcopy(self._ft_has.get('trades_params', {})) - candle_limit = self.trades_candle_limit( - timeframe, candle_type=candle_type, since_ms=since_ms) - - if candle_type and candle_type != CandleType.SPOT: - params.update({'price': candle_type.value}) - if candle_type != CandleType.FUNDING_RATE: - assert since_ms is not None # NOTE: with none there seems no response - data = await self._api_async.fetch_trades( - pair, since=since_ms, - limit=candle_limit, params=params) - else: - # Funding rate - data = await self._fetch_funding_rate_history( - pair=pair, - timeframe=timeframe, - limit=candle_limit, - since_ms=since_ms, - ) - # Some exchanges sort TRADES in ASC order and others in DESC. - # Ex: Bittrex returns the list of TRADES in ASC order (oldest first, newest last) - # while GDAX returns the list of TRADES in DESC order (newest first, oldest last) - # Only sort if necessary to save computing time - try: - # TODO: check if even needed? - if data and data[0]['timestamp'] > data[-1]['timestamp']: - data = sorted(data, key=lambda x: x[0]) - except KeyError: - logger.exception("Error loading %s. Result was %s.", pair, data) - return pair, timeframe, candle_type, [], True - logger.debug("Done fetching pair %s, interval %s ...", pair, timeframe) - return pair, timeframe, candle_type, data, True - - except ccxt.NotSupported as e: - raise OperationalException( - f'Exchange {self._api.name} does not support fetching historical ' - f'candle (TRADES) data. Message: {e}') from e - except ccxt.DDoSProtection as e: - raise DDosProtection(e) from e - except (ccxt.NetworkError, ccxt.ExchangeError) as e: - raise TemporaryError(f'Could not fetch historical candle (TRADES) data ' - f'for pair {pair} due to {e.__class__.__name__}. ' - f'Message: {e}') from e - except ccxt.BaseError as e: - raise OperationalException(f'Could not fetch historical candle (TRADES) data ' - f'for pair {pair}. Message: {e}') from e async def _fetch_funding_rate_history( self, diff --git a/freqtrade/exchange/types.py b/freqtrade/exchange/types.py index 52c1e6ae7..5568e4336 100644 --- a/freqtrade/exchange/types.py +++ b/freqtrade/exchange/types.py @@ -28,4 +28,3 @@ Tickers = Dict[str, Ticker] # pair, timeframe, candleType, OHLCV, drop last?, OHLCVResponse = Tuple[str, str, CandleType, List, bool] -TRADESResponse = Tuple[str, str, CandleType, List, bool]