diff --git a/freqtrade/constants.py b/freqtrade/constants.py index c864833c3..3bf0179a8 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -3,7 +3,7 @@ """ bot constants """ -from typing import Any, Dict, List, Literal, Tuple +from typing import Any, Dict, List, Literal, Tuple, Optional from freqtrade.enums import CandleType, PriceType, RPCMessageType @@ -588,7 +588,7 @@ CONF_SCHEMA = { "properties": { "shuffle": {"type": "boolean", "default": False}, "nu": {"type": "number", "default": 0.1} - }, + }, }, "shuffle_after_split": {"type": "boolean", "default": False}, "buffer_train_data_candles": {"type": "integer", "default": 0} @@ -704,6 +704,9 @@ ListPairsWithTimeframes = List[PairWithTimeframe] # Type for trades list TradeList = List[List] +# ticks, pair, timeframe, CandleType +TickWithTimeframe = Tuple[str, str, CandleType, Optional[int], Optional[int]] +ListTicksWithTimeframes = List[TickWithTimeframe] LongShort = Literal['long', 'short'] EntryExit = Literal['entry', 'exit'] diff --git a/freqtrade/data/converter/converter.py b/freqtrade/data/converter/converter.py index 8d1401e88..403ba9b52 100644 --- a/freqtrade/data/converter/converter.py +++ b/freqtrade/data/converter/converter.py @@ -28,7 +28,8 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *, :param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete :return: DataFrame """ - logger.debug(f"Converting candle (OHLCV) data to dataframe for pair {pair}.") + logger.debug( + f"Converting candle (OHLCV) data to dataframe for pair {pair}.") cols = DEFAULT_DATAFRAME_COLUMNS df = DataFrame(ohlcv, columns=cols) @@ -44,6 +45,331 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *, drop_incomplete=drop_incomplete) +def _init_dataframe_with_trades_columns(dataframe: DataFrame): + """ + Populates a dataframe with trades columns + :param dataframe: Dataframe to populate + """ + dataframe['trades'] = dataframe.apply(lambda _: [], axis=1) + dataframe['orderflow'] = dataframe.apply(lambda _: {}, axis=1) + dataframe['bid'] = np.nan + dataframe['ask'] = np.nan + dataframe['delta'] = np.nan + dataframe['min_delta'] = np.nan + dataframe['max_delta'] = np.nan + dataframe['total_trades'] = np.nan + dataframe['stacked_imbalances_bid'] = np.nan + dataframe['stacked_imbalances_ask'] = np.nan + + +def _convert_timeframe_to_pandas_frequency(timeframe: str) -> str: + # convert timeframe to format usable by pandas + from freqtrade.exchange import timeframe_to_minutes + timeframe_minutes = timeframe_to_minutes(timeframe) + timeframe_frequency = f'{timeframe_minutes}min' + return (timeframe_frequency, timeframe_minutes) + + +def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str): + timeframe_frequency, timeframe_minutes = _convert_timeframe_to_pandas_frequency( + timeframe) + # calculate ohlcv candle start and end + df['datetime'] = pd.to_datetime(df['date'], unit='ms') + df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency) + df['candle_end'] = df['candle_start'] + pd.Timedelta(timeframe_minutes) + df.drop(columns=['datetime'], inplace=True) + + +cached_grouped_trades_pair = {} + + +def populate_dataframe_with_trades(config: Config, dataframe: DataFrame, trades: DataFrame, *, pair: str) -> DataFrame: + """ + Populates a dataframe with trades + :param dataframe: Dataframe to populate + :param trades: Trades to populate with + :return: Dataframe with trades populated + """ + config_orderflow = config['orderflow'] + timeframe = config['timeframe'] + + # create columns for trades + _init_dataframe_with_trades_columns(dataframe) + df = dataframe.copy() + + try: + start_time = time.time() + # calculate ohlcv candle start and end + _calculate_ohlcv_candle_start_and_end(df, timeframe) + _calculate_ohlcv_candle_start_and_end(trades, timeframe) + + # slice of trades that are before current ohlcv candles to make groupby faster + trades = trades.loc[trades.candle_start >= df.candle_start[0]] + trades.reset_index(inplace=True, drop=True) + + # group trades by candle start + trades_grouped_by_candle_start = trades.groupby( + 'candle_start', group_keys=False) + + # groups = trades_grouped_by_candle_start.groups + new_grouped_trades_dict = {key1: group for key1, + group in trades_grouped_by_candle_start} + logger.debug( + f"{len(new_grouped_trades_dict.keys())} candles to process") + + cached_grouped_trades_dict = cached_grouped_trades_pair.get( + (pair, timeframe), {}) + + new_keys = set(list(new_grouped_trades_dict.keys())) + # don't process twice + for candle_start in cached_grouped_trades_dict: + # TODO: don't delete last candle + # to allow refresh in case of wrong data + if candle_start in new_grouped_trades_dict: + del new_grouped_trades_dict[candle_start] + + cached_keys = set(list(cached_grouped_trades_dict.keys())) + old_keys = cached_keys - new_keys + # return values not in cached + for key in old_keys: + if key in cached_grouped_trades_dict and not np.any((key == df.date) == True): + del cached_grouped_trades_dict[key] + + for candle_start in cached_grouped_trades_dict: + is_between = (candle_start == df['candle_start']) + for column in list(dataframe.columns): + # special case 'trades','oderflow' + # they don't have a single value + if column in ['trades', 'orderflow']: + dataframe.loc[is_between, + column] = dataframe.loc[is_between, + column].apply(lambda _: + cached_grouped_trades_dict[candle_start][column].values[0]) + else: + dataframe.loc[is_between, + column] = cached_grouped_trades_dict[candle_start][column].values[0] + + # repair 'date' datetime type (otherwise crashes on each compare) + if "date" in dataframe.columns: + dataframe['date'] = pd.to_datetime(dataframe['date']) + + for candle_start in new_grouped_trades_dict.keys(): + trades_grouped_df = new_grouped_trades_dict[candle_start] + is_between = (candle_start == df['candle_start']) + if np.any(is_between == True): + (timeframe_frequency, timeframe_minutes) = _convert_timeframe_to_pandas_frequency( + timeframe) + candle_next = candle_start + \ + pd.Timedelta(minutes=timeframe_minutes) + # skip if there are no trades at next candle because that this candle isn't finished yet + # if not np.any((candle_next == df.candle_start)): + if not candle_next in trades_grouped_by_candle_start.groups: + logger.debug( + f"Skipping candle at {candle_start} with {len(trades_grouped_df)} trades, because no finished trades at {candle_next}") + continue + + # add trades to each candle + df.loc[is_between, 'trades'] = df.loc[is_between, + 'trades'].apply(lambda _: trades_grouped_df) + # calculate orderflow for each candle + df.loc[is_between, 'orderflow'] = df.loc[is_between, 'orderflow'].apply( + lambda _: trades_to_volumeprofile_with_total_delta_bid_ask(pd.DataFrame(trades_grouped_df), scale=config_orderflow['scale'])) + # calculate imbalances for each candle's orderflow + df.loc[is_between, 'imbalances'] = df.loc[is_between, 'orderflow'].apply( + lambda x: trades_orderflow_to_imbalances(x, imbalance_ratio=config_orderflow['imbalance_ratio'], imbalance_volume=config_orderflow['imbalance_volume'])) + + df.loc[is_between, 'stacked_imbalances_bid'] = df.loc[is_between, + 'imbalances'].apply(lambda x: stacked_imbalance_bid(x, stacked_imbalance_range=config_orderflow['stacked_imbalance_range'])) + df.loc[is_between, 'stacked_imbalances_ask'] = df.loc[is_between, + 'imbalances'].apply(lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=config_orderflow['stacked_imbalance_range'])) + + buy = df.loc[is_between, 'bid'].apply(lambda _: np.where( + trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount'])) + sell = df.loc[is_between, 'ask'].apply(lambda _: np.where( + trades_grouped_df['side'].str.contains('sell'), 0, trades_grouped_df['amount'])) + deltas_per_trade = sell - buy + min_delta = 0 + max_delta = 0 + delta = 0 + for deltas in deltas_per_trade: + for d in deltas: + delta += d + if delta > max_delta: + max_delta = delta + if delta < min_delta: + min_delta = delta + df.loc[is_between, 'max_delta'] = max_delta + df.loc[is_between, 'min_delta'] = min_delta + + df.loc[is_between, 'bid'] = np.where(trades_grouped_df['side'].str.contains( + 'buy'), 0, trades_grouped_df['amount']).sum() + df.loc[is_between, 'ask'] = np.where(trades_grouped_df['side'].str.contains( + 'sell'), 0, trades_grouped_df['amount']).sum() + df.loc[is_between, 'delta'] = df.loc[is_between, + 'ask'] - df.loc[is_between, 'bid'] + min_delta = np.min(deltas_per_trade) + max_delta = np.max(deltas_per_trade) + + df.loc[is_between, 'total_trades'] = len(trades_grouped_df) + # cache + cached_grouped_trades_dict[candle_start] = df.loc[is_between].copy() # copy() to avoid memleak + dataframe.loc[is_between] = df.loc[is_between] + else: + logger.debug( + f"Found NO candles for trades starting with {candle_start}") + logger.debug( + f"trades.groups_keys in {time.time() - start_time} seconds") + + logger.debug( + f"trades.singleton_iterate in {time.time() - start_time} seconds") + del cached_grouped_trades_pair[(pair, timeframe)] + cached_grouped_trades_pair[(pair, timeframe) + ] = cached_grouped_trades_dict + + except Exception as e: + logger.error(f"Error populating dataframe with trades: {e}") + + return dataframe + + +# TODO: remove timeframe and pair +def public_trades_to_dataframe(trades: list, timeframe: str, pair: str, *, + fill_missing: bool = True, drop_incomplete: bool = True) -> DataFrame: + """ + Converts a list with candle (TRADES) data (in format returned by ccxt.fetch_trades) + to a Dataframe + :param trades: list with candle (TRADES) data, as returned by exchange.async_get_candle_history + :param timeframe: timeframe (e.g. 5m). Used to fill up eventual missing data + :param pair: Pair this data is for (used to warn if fillup was necessary) + :param fill_missing: fill up missing candles with 0 candles + (see trades_fill_up_missing_data for details) + :param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete + :return: DataFrame + """ + logger.debug( + f"Converting candle (TRADES) data to dataframe for pair {pair}.") + cols = DEFAULT_TRADES_COLUMNS + df = DataFrame(trades, columns=cols) + df['date'] = pd.to_datetime( + df['timestamp'], unit='ms', utc=True, infer_datetime_format=True) + + # Some exchanges return int values for Volume and even for OHLC. + # Convert them since TA-LIB indicators used in the strategy assume floats + # and fail with exception... + df = df.astype(dtype={'amount': 'float', 'cost': 'float', + 'price': 'float'}) + # + # df.columns + # df = clean_duplicate_trades(df, timeframe, pair, + # fill_missing=fill_missing, + # drop_incomplete=drop_incomplete) + + # df = drop_incomplete_and_fill_missing_trades(df, timeframe, pair, + # fill_missing=fill_missing, + # drop_incomplete=drop_incomplete) + return df + + +def trades_to_volumeprofile_with_total_delta_bid_ask(trades: DataFrame, scale: int): + """ + :param trades: dataframe + :param scale: scale aka bin size e.g. 0.5 + :return: trades binned to levels according to scale aka orderflow + """ + df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS) + # create bid, ask where side is sell or buy + df['bid_amount'] = np.where( + trades['side'].str.contains('buy'), 0, trades['amount']) + df['ask_amount'] = np.where( + trades['side'].str.contains('sell'), 0, trades['amount']) + df['bid'] = np.where( + trades['side'].str.contains('buy'), 0, 1) + df['ask'] = np.where( + trades['side'].str.contains('sell'), 0, 1) + + # round the prices to the nearest multiple of the scale + df['price'] = ((trades['price'] / scale).round() + * scale).astype('float64').values + if df.empty: + df['total'] = np.nan + df['delta'] = np.nan + return df + + df['delta'] = df['ask_amount'] - df['bid_amount'] + df['total_volume'] = df['ask_amount'] + df['bid_amount'] + df['total_trades'] = df['ask'] + df['bid'] + + # group to bins aka apply scale + df = df.groupby('price').sum(numeric_only=True) + return df + + +def trades_orderflow_to_imbalances(df: DataFrame, imbalance_ratio: int, imbalance_volume: int): + """ + :param df: dataframes with bid and ask + :param imbalance_ratio: imbalance_ratio e.g. 300 + :param imbalance_volume: imbalance volume e.g. 3) + :return: dataframe with bid and ask imbalance + """ + bid = df.bid + ask = df.ask.shift(-1) + bid_imbalance = (bid / ask) > (imbalance_ratio / 100) + # overwrite bid_imbalance with False if volume is not big enough + bid_imbalance_filtered = np.where( + df.total_volume < imbalance_volume, False, bid_imbalance) + ask_imbalance = (ask / bid) > (imbalance_ratio / 100) + # overwrite ask_imbalance with False if volume is not big enough + ask_imbalance_filtered = np.where( + df.total_volume < imbalance_volume, False, ask_imbalance) + dataframe = DataFrame( + {'bid_imbalance': bid_imbalance_filtered, 'ask_imbalance': ask_imbalance_filtered}, index=df.index) + + return dataframe + + +def stacked_imbalance(df: DataFrame, label: str = "bid", stacked_imbalance_range: int = 3, should_reverse: bool = False): + """ + y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1) + https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array + """ + imbalance = df[f'{label}_imbalance'] + int_series = pd.Series(np.where(imbalance, 1, 0)) + stacked = int_series * \ + (int_series.groupby((int_series != int_series.shift()).cumsum()).cumcount() + 1) + + max_stacked_imbalance_idx = stacked.index[stacked >= + stacked_imbalance_range] + stacked_imbalance_price = np.nan + if not max_stacked_imbalance_idx.empty: + # TODO: do better than just take first + idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud( + max_stacked_imbalance_idx)[0] + stacked_imbalance_price = imbalance.index[idx] + return stacked_imbalance_price + + +def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int = 3): + return stacked_imbalance(df, 'bid', stacked_imbalance_range) + + +def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int = 3): + return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True) + + +def orderflow_to_volume_profile(orderflow: DataFrame): + """ + :param orderflow: dataframe + :return: volume profile dataframe + """ + df = orderflow + bid = df.groupby('level').bid.sum() + ask = df.groupby('level').ask.sum() + df.groupby('level')['level'].sum() + delta = df.groupby('level').ask.sum() - df.groupby('level').bid.sum() + df = pd.DataFrame({'bid': bid, 'ask': ask, 'delta': delta}) + return df + + def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *, fill_missing: bool, drop_incomplete: bool) -> DataFrame: """ @@ -78,6 +404,70 @@ def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *, return data +def warn_of_tick_duplicates(data: DataFrame, pair: str) -> None: + no_dupes_colunms = ['id', 'timestamp', 'datetime'] + for col in no_dupes_colunms: + if col in data.columns and data[col].duplicated().any(): + sum = data[col].duplicated().sum() + message = f'{sum} duplicated ticks for {pair} in {col} detected.' + if col == 'id': + logger.warning(message) + else: + logger.debug(message) + + +def clean_duplicate_trades(trades: DataFrame, timeframe: str, pair: str, *, + + fill_missing: bool, drop_incomplete: bool) -> DataFrame: + """ + Cleanse a TRADES dataframe by + * Grouping it by date (removes duplicate tics) + * dropping last candles if requested + * Filling up missing data (if requested) + :param data: DataFrame containing candle (TRADES) data. + :param timeframe: timeframe (e.g. 5m). Used to fill up eventual missing data + :param pair: Pair this data is for (used to warn if fillup was necessary) + :param fill_missing: fill up missing candles with 0 candles + (see trades_fill_up_missing_data for details) + :param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete + :return: DataFrame + """ + # group by index and aggregate results to eliminate duplicate ticks + # check if data has duplicate ticks + logger.debug(f"Clean duplicated ticks from Trades data {pair}") + df = pd.DataFrame(trades_remove_duplicates( + trades.values.tolist()), columns=trades.columns) + + # + # from freqtrade.exchange import timeframe_to_minutes + # timeframe_minutes = timeframe_to_minutes(timeframe) + # sum_dict = {} + # for col in ['amount']: # TODO: remove side,etc + # sum_dict[col] = 'sum' + # group by index and aggregate results to eliminate duplicate ticks + # df = data.groupby( + # by='date', as_index=False, sort=True).agg(sum_dict) # NOTE: sum doesn't make much sense for eliminating duplicates? + + return df + + +def drop_incomplete_and_fill_missing_trades(data: DataFrame, timeframe: str, pair: str, *, + fill_missing: bool, drop_incomplete: bool) -> DataFrame: + + # eliminate partial candle + if drop_incomplete: + # TODO: this is not correct, as it drops the last trade only + # but we need to drop the last candle until closed + pass + data.drop(data.tail(1).index, inplace=True) + logger.debug('Dropping last trade') + + if fill_missing: + return trades_fill_up_missing_data(data, timeframe, pair) + else: + return data + + def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) -> DataFrame: """ Fills up missing data with 0 volume rows, @@ -114,7 +504,8 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) df.reset_index(inplace=True) len_before = len(dataframe) len_after = len(df) - pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0 + pct_missing = (len_after - len_before) / \ + len_before if len_before > 0 else 0 if len_before != len_after: message = (f"Missing data fillup for {pair}: before: {len_before} - after: {len_after}" f" - {pct_missing:.2%}") @@ -159,7 +550,8 @@ def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange, processed: Dict[str, DataFrame] = {} for pair, df in preprocessed.items(): - trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles) + trimed_df = trim_dataframe( + df, timerange, startup_candles=startup_candles) if not trimed_df.empty: processed[pair] = trimed_df else: diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 11cbd7934..a392fb510 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -12,8 +12,9 @@ from typing import Any, Dict, List, Optional, Tuple from pandas import DataFrame, Timedelta, Timestamp, to_timedelta from freqtrade.configuration import TimeRange +from freqtrade.data.history.idatahandler import get_datahandler from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes, - PairWithTimeframe) + PairWithTimeframe, ListTicksWithTimeframes) from freqtrade.data.history import load_pair_history from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.exceptions import ExchangeError, OperationalException @@ -24,6 +25,7 @@ from freqtrade.rpc import RPCManager from freqtrade.rpc.rpc_types import RPCAnalyzedDFMsg from freqtrade.util import PeriodicCache +from freqtrade.data.converter import public_trades_to_dataframe logger = logging.getLogger(__name__) @@ -442,7 +444,12 @@ class DataProvider: if self._exchange is None: raise OperationalException(NO_EXCHANGE_EXCEPTION) final_pairs = (pairlist + helping_pairs) if helping_pairs else pairlist + # refresh latest ohlcv data self._exchange.refresh_latest_ohlcv(final_pairs) + # refresh latest trades data (if enabled) + self._exchange.refresh_latest_trades(final_pairs, + get_datahandler(self._config['datadir'], + data_format=self._config['dataformat_trades'])) @property def available_pairs(self) -> ListPairsWithTimeframes: @@ -482,6 +489,44 @@ class DataProvider: else: return DataFrame() + def trades( + self, + pair: str, + timeframe: Optional[str] = None, + copy: bool = True, + candle_type: str = '' + ) -> DataFrame: + """ + Get candle (TRADES) data for the given pair as DataFrame + Please use the `available_pairs` method to verify which pairs are currently cached. + :param pair: pair to get the data for + :param timeframe: Timeframe to get data for + :param candle_type: '', mark, index, premiumIndex, or funding_rate + :param copy: copy dataframe before returning if True. + Use False only for read-only operations (where the dataframe is not modified) + """ + if self._exchange is None: + raise OperationalException(NO_EXCHANGE_EXCEPTION) + if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE): + _candle_type = CandleType.from_string( + candle_type) if candle_type != '' else self._config['candle_type_def'] + return self._exchange.trades( + (pair, timeframe or self._config['timeframe'], _candle_type), + copy=copy + ) + elif self.runmode in (RunMode.BACKTEST, RunMode.HYPEROPT): + _candle_type = CandleType.from_string( + candle_type) if candle_type != '' else self._config['candle_type_def'] + data_handler = get_datahandler( + self._config['datadir'], data_format=self._config['dataformat_trades']) + ticks = data_handler.trades_load(pair) + trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False, + drop_incomplete=False) + return trades_df + + else: + return DataFrame() + def market(self, pair: str) -> Optional[Dict[str, Any]]: """ Return market data for the pair diff --git a/freqtrade/data/history/history_utils.py b/freqtrade/data/history/history_utils.py index 1ad1060a4..4222b4038 100644 --- a/freqtrade/data/history/history_utils.py +++ b/freqtrade/data/history/history_utils.py @@ -329,7 +329,9 @@ def _download_trades_history(exchange: Exchange, pair: str, *, new_pairs_days: int = 30, timerange: Optional[TimeRange] = None, - data_handler: IDataHandler + data_handler: IDataHandler, + since: Optional[int] = None, + until: Optional[int] = None ) -> bool: """ Download trade history from the exchange. @@ -395,7 +397,7 @@ def _download_trades_history(exchange: Exchange, except Exception: logger.exception( - f'Failed to download historic trades for pair: "{pair}". ' + f'Failed to download and store historic trades for pair: "{pair}". ' ) return False @@ -506,8 +508,6 @@ def download_data_main(config: Config) -> None: # Start downloading try: if config.get('download_trades'): - if config.get('trading_mode') == 'futures': - raise OperationalException("Trade download not supported for futures.") pairs_not_available = refresh_backtest_trades_data( exchange, pairs=expanded_pairs, datadir=config['datadir'], timerange=timerange, new_pairs_days=config['new_pairs_days'], diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index ae8cdaa98..d0b0c9602 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -10,7 +10,7 @@ from copy import deepcopy from datetime import datetime, timedelta, timezone from math import floor from threading import Lock -from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union, Callable import ccxt import ccxt.async_support as ccxt_async @@ -19,7 +19,7 @@ from ccxt import TICK_SIZE from dateutil import parser from pandas import DataFrame, concat -from freqtrade.constants import (DEFAULT_AMOUNT_RESERVE_PERCENT, NON_OPEN_EXCHANGE_STATES, BidAsk, +from freqtrade.constants import (DEFAULT_TRADES_COLUMNS, DEFAULT_AMOUNT_RESERVE_PERCENT, NON_OPEN_EXCHANGE_STATES, BidAsk, BuySell, Config, EntryExit, ExchangeConfig, ListPairsWithTimeframes, MakerTaker, OBLiteral, PairWithTimeframe) from freqtrade.data.converter import clean_ohlcv_dataframe, ohlcv_to_dataframe, trades_dict_to_list @@ -115,6 +115,7 @@ class Exchange: # Holds last candle refreshed time of each pair self._pairs_last_refresh_time: Dict[PairWithTimeframe, int] = {} + self._trades_last_refresh_time: Dict[PairWithTimeframe, int] = {} # Timestamp of last markets refresh self._last_markets_refresh: int = 0 @@ -130,6 +131,9 @@ class Exchange: # Holds candles self._klines: Dict[PairWithTimeframe, DataFrame] = {} + # Holds public_trades + self._trades: Dict[PairWithTimeframe, DataFrame] = {} + # Holds all open sell orders for dry_run self._dry_run_open_orders: Dict[str, Any] = {} @@ -160,10 +164,13 @@ class Exchange: # Assign this directly for easy access self._ohlcv_partial_candle = self._ft_has['ohlcv_partial_candle'] + self._max_trades_candle_limit = self._config.get('exchange', {}).get('trades_candle_limit', 1000) self._trades_pagination = self._ft_has['trades_pagination'] self._trades_pagination_arg = self._ft_has['trades_pagination_arg'] + self._trades_bin_size_scale = self._config.get('exchange', {}).get('bin_size_scale', 0.5) + # Initialize ccxt objects ccxt_config = self._ccxt_config ccxt_config = deep_merge_dicts(exchange_conf.get('ccxt_config', {}), ccxt_config) @@ -338,6 +345,22 @@ class Exchange: return int(self._ft_has.get('ohlcv_candle_limit_per_timeframe', {}).get( timeframe, self._ft_has.get('ohlcv_candle_limit'))) + + def trades_candle_limit( + self, timeframe: str, candle_type: CandleType, since_ms: Optional[int] = None) -> int: + """ + Exchange trades candle limit + Uses trades_candle_limit_per_timeframe if the exchange has different limits + per timeframe (e.g. bittrex), otherwise falls back to trades_candle_limit + :param timeframe: Timeframe to check + :param candle_type: Candle-type + :param since_ms: Starting timestamp + :return: Candle limit as integer + """ + #TODO: check if there are trades candle limits + return int(self._ft_has.get('trade_candle_limit_per_timeframe', {}).get( + timeframe, self._ft_has.get('trade_candle_limit',self._max_trades_candle_limit))) + def get_markets(self, base_currencies: List[str] = [], quote_currencies: List[str] = [], spot_only: bool = False, margin_only: bool = False, futures_only: bool = False, tradable_only: bool = True, @@ -415,6 +438,16 @@ class Exchange: else: return DataFrame() + def trades(self, pair_interval: PairWithTimeframe, copy: bool = True) -> DataFrame: + if pair_interval in self._trades: + if copy: + import copy + return copy.deepcopy(self._trades[pair_interval]) + else: + return self._trades[pair_interval] + else: + return DataFrame() + def get_contract_size(self, pair: str) -> Optional[float]: if self.trading_mode == TradingMode.FUTURES: market = self.markets.get(pair, {}) @@ -1927,24 +1960,68 @@ class Exchange: return data async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, + since_ms: int, candle_type: CandleType, + is_new_pair: bool = False, raise_: bool = False, + until_ms: Optional[int] = None + ) -> OHLCVResponse: + """ + Download historic ohlcv + :param is_new_pair: used by binance subclass to allow "fast" new pair downloading + :param candle_type: Any of the enum CandleType (must match trading mode!) + """ + + one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( + timeframe, candle_type, since_ms) + logger.debug( + "one_call: %s msecs (%s)", + one_call, + arrow.utcnow().shift(seconds=one_call // 1000).humanize(only_distance=True) + ) + input_coroutines = [self._async_get_candle_history( + pair, timeframe, candle_type, since) for since in + range(since_ms, until_ms or (arrow.utcnow().int_timestamp * 1000), one_call)] + + data: List = [] + # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling + for input_coro in chunks(input_coroutines, 100): + + results = await asyncio.gather(*input_coro, return_exceptions=True) + for res in results: + if isinstance(res, Exception): + logger.warning(f"Async code raised an exception: {repr(res)}") + if raise_: + raise + continue + else: + # Deconstruct tuple if it's not an exception + p, _, c, new_data, _ = res + if p == pair and c == candle_type: + data.extend(new_data) + # Sort data again after extending the result - above calls return in "async order" + data = sorted(data, key=lambda x: x[0]) + return pair, timeframe, candle_type, data, self._ohlcv_partial_candle + + + async def _async_get_historic_trades(self, pair: str, timeframe: str, since_ms: int, candle_type: CandleType, is_new_pair: bool = False, raise_: bool = False, until_ms: Optional[int] = None - ) -> OHLCVResponse: + ) -> Ticker: """ - Download historic ohlcv + Download historic trades :param is_new_pair: used by binance subclass to allow "fast" new pair downloading :param candle_type: Any of the enum CandleType (must match trading mode!) """ - one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( + one_call = timeframe_to_msecs(timeframe) * self.trades_candle_limit( timeframe, candle_type, since_ms) logger.debug( "one_call: %s msecs (%s)", one_call, dt_humanize(dt_now() - timedelta(milliseconds=one_call), only_distance=True) ) - input_coroutines = [self._async_get_candle_history( + until_ms = until_ms if until_ms else (arrow.utcnow().int_timestamp * 1000) + input_coroutines = [self._async_get_trades_history( pair, timeframe, candle_type, since) for since in range(since_ms, until_ms or dt_ts(), one_call)] @@ -1963,18 +2040,49 @@ class Exchange: # Deconstruct tuple if it's not an exception p, _, c, new_data, _ = res if p == pair and c == candle_type: - data.extend(new_data) + data.extend(new_data) # Sort data again after extending the result - above calls return in "async order" - data = sorted(data, key=lambda x: x[0]) + data = sorted(data, key=lambda x: x['timestamp'])# TODO: sort via 'timestamp' or 'id'? return pair, timeframe, candle_type, data, self._ohlcv_partial_candle - def _build_coroutine( + def _build_coroutine_get_ohlcv( + self, pair: str, timeframe: str, candle_type: CandleType, + since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: + not_all_data = cache and self.required_candle_call_count > 1 + if cache and (pair, timeframe, candle_type) in self._klines: + candle_limit = self.ohlcv_candle_limit(timeframe, candle_type) + min_date = date_minus_candles(timeframe, candle_limit - self._required_candle_call_count_max).timestamp() + # Check if 1 call can get us updated candles without hole in the data. + if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0): + # Cache can be used - do one-off call. + not_all_data = False + else: + # Time jump detected, evict cache + logger.info( + f"Time jump detected. Evicting ohlcv cache for {pair}, {timeframe}, {candle_type}") + del self._klines[(pair, timeframe, candle_type)] + + if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)): + # Multiple calls for one pair - to get more history + since_ms = self.needed_candle_ms(timeframe,candle_type) + + # TODO: fetch_trades and return as results + if since_ms: + return self._async_get_historic_ohlcv( + pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type) + else: + # One call ... "regular" refresh + return self._async_get_candle_history( + pair, timeframe, since_ms=since_ms, candle_type=candle_type) + + + def _build_coroutine_get_trades( self, pair: str, timeframe: str, candle_type: CandleType, since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: not_all_data = cache and self.required_candle_call_count > 1 - if cache and (pair, timeframe, candle_type) in self._klines: - candle_limit = self.ohlcv_candle_limit(timeframe, candle_type) - min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp() + if cache and (pair, timeframe, candle_type) in self._trades: + candle_limit = self.trades_candle_limit(timeframe, candle_type) + min_date = date_minus_candles(timeframe, candle_limit - self._required_candle_call_count_max).timestamp() # Check if 1 call can get us updated candles without hole in the data. if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0): # Cache can be used - do one-off call. @@ -1982,10 +2090,11 @@ class Exchange: else: # Time jump detected, evict cache logger.info( - f"Time jump detected. Evicting cache for {pair}, {timeframe}, {candle_type}") - del self._klines[(pair, timeframe, candle_type)] + f"Time jump detected. Evicting trades cache for {pair}, {timeframe}, {candle_type}") + del self._trades[(pair, timeframe, candle_type)] - if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)): + #TODO: change to trades candle limit + if (not since_ms or not_all_data): # Multiple calls for one pair - to get more history one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( timeframe, candle_type, since_ms) @@ -1994,22 +2103,70 @@ class Exchange: since_ms = int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) if since_ms: - return self._async_get_historic_ohlcv( + return self._async_get_historic_trades( pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type) else: # One call ... "regular" refresh - return self._async_get_candle_history( + return self._async_get_trades_history( pair, timeframe, since_ms=since_ms, candle_type=candle_type) + def _build_ohlcv_dl_jobs( + self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], + cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]: + """ + Build Coroutines to execute as part of refresh_latest_ohlcv + """ + input_coroutines: List[Coroutine[Any, Any, Ticker]] = [] + cached_pairs = [] + for pair, timeframe, candle_type in set(pair_list): + if (timeframe not in self.timeframes + and candle_type in (CandleType.SPOT, CandleType.FUTURES)): + logger.warning( + f"Cannot download ({pair}, {timeframe}) combination as this timeframe is " + f"not available on {self.name}. Available timeframes are " + f"{', '.join(self.timeframes)}.") + continue + + if ((pair, timeframe, candle_type) not in self._klines or not cache + or self._now_is_time_to_refresh(pair, timeframe, candle_type)): + + input_coroutines.append( + self._build_coroutine_get_ohlcv(pair, timeframe, candle_type, since_ms, cache)) + + else: + logger.debug( + f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..." + ) + cached_pairs.append((pair, timeframe, candle_type)) + + return input_coroutines, cached_pairs + + + def _build_trades_dl_jobs( self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]: """ - Build Coroutines to execute as part of refresh_latest_ohlcv + Build Coroutines to execute as part of refresh_latest_trades """ - input_coroutines: List[Coroutine[Any, Any, OHLCVResponse]] = [] + input_coroutines: List[Coroutine[Any, Any, TRADESResponse]] = [] cached_pairs = [] for pair, timeframe, candle_type in set(pair_list): + if not since_ms: + plr = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) + # If we don't have a last refresh time, we need to download all trades + # This is the case when the bot is started + if not plr: + # using ohlcv_candle_limit here, because we calculate the distance + # to first required candle + one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( + timeframe, candle_type, since_ms) + target_candle = one_call * self.required_candle_call_count + now = timeframe_to_next_date(timeframe) + since_ms = int((now - timedelta(seconds=target_candle // 1000)).timestamp() * 1000) + + else: since_ms = plr + if (timeframe not in self.timeframes and candle_type in (CandleType.SPOT, CandleType.FUTURES)): logger.warning( @@ -2018,94 +2175,222 @@ class Exchange: f"{', '.join(self.timeframes)}.") continue - if ((pair, timeframe, candle_type) not in self._klines or not cache + if ((pair, timeframe, candle_type) not in self._trades or not cache or self._now_is_time_to_refresh(pair, timeframe, candle_type)): input_coroutines.append( - self._build_coroutine(pair, timeframe, candle_type, since_ms, cache)) + self._build_coroutine_get_trades(pair, timeframe, candle_type, since_ms, cache)) else: logger.debug( - f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..." + f"Using cached candle (TRADES) data for {pair}, {timeframe}, {candle_type} ..." ) cached_pairs.append((pair, timeframe, candle_type)) return input_coroutines, cached_pairs def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], - cache: bool, drop_incomplete: bool) -> DataFrame: + cache: bool, drop_incomplete: bool) -> DataFrame: + # keeping last candle time as last refreshed time of the pair + if ticks and cache: + self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000 + # keeping parsed dataframe in cache + ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True, + drop_incomplete=drop_incomplete) + if cache: + if (pair, timeframe, c_type) in self._klines: + old = self._klines[(pair, timeframe, c_type)] + # Reassign so we return the updated, combined df + ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair, + fill_missing=True, drop_incomplete=False) + candle_limit = self.ohlcv_candle_limit(timeframe, self._config['candle_type_def']) + # Age out old candles + ohlcv_df = ohlcv_df.tail(candle_limit + self._startup_candle_count) + ohlcv_df = ohlcv_df.reset_index(drop=True) + self._klines[(pair, timeframe, c_type)] = ohlcv_df + else: + self._klines[(pair, timeframe, c_type)] = ohlcv_df + return ohlcv_df + + def _process_trades_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], + cache: bool, drop_incomplete: bool, first_required_candle_date:Optional[int]) -> DataFrame: + # keeping parsed dataframe in cache + # TODO: pass last_full_candle_date to drop as incomplete + trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False, + drop_incomplete=drop_incomplete) # keeping last candle time as last refreshed time of the pair if ticks and cache: idx = -2 if drop_incomplete and len(ticks) > 1 else -1 - self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[idx][0] // 1000 - # keeping parsed dataframe in cache - ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True, - drop_incomplete=drop_incomplete) + self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # NOTE: // is floor: divides and rounds to nearest int if cache: - if (pair, timeframe, c_type) in self._klines: - old = self._klines[(pair, timeframe, c_type)] + if (pair, timeframe, c_type) in self._trades: + old = self._trades[(pair, timeframe, c_type)] # Reassign so we return the updated, combined df - ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair, - fill_missing=True, drop_incomplete=False) - candle_limit = self.ohlcv_candle_limit(timeframe, self._config['candle_type_def']) + trades_df = clean_duplicate_trades(concat([old, trades_df], axis=0), timeframe, pair, fill_missing=False, drop_incomplete=False) + # warn_of_tick_duplicates(trades_df, pair) # Age out old candles - ohlcv_df = ohlcv_df.tail(candle_limit + self._startup_candle_count) - ohlcv_df = ohlcv_df.reset_index(drop=True) - self._klines[(pair, timeframe, c_type)] = ohlcv_df - else: - self._klines[(pair, timeframe, c_type)] = ohlcv_df - return ohlcv_df + if first_required_candle_date: + # slice of older dates + trades_df = trades_df[first_required_candle_date < trades_df['timestamp']] + trades_df = trades_df.reset_index(drop=True) + self._trades[(pair, timeframe, c_type)] = trades_df + return trades_df def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, - since_ms: Optional[int] = None, cache: bool = True, - drop_incomplete: Optional[bool] = None - ) -> Dict[PairWithTimeframe, DataFrame]: + since_ms: Optional[int] = None, cache: bool = True, + drop_incomplete: Optional[bool] = None + ) -> Dict[PairWithTimeframe, DataFrame]: + """ + Refresh in-memory OHLCV asynchronously and set `_trades` with the result + Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). + Only used in the dataprovider.refresh() method. + :param pair_list: List of 2 element tuples containing pair, interval to refresh + :param since_ms: time since when to download, in milliseconds + :param cache: Assign result to _trades. Usefull for one-off downloads like for pairlists + :param drop_incomplete: Control candle dropping. + Specifying None defaults to _ohlcv_partial_candle + :return: Dict of [{(pair, timeframe): Dataframe}] + """ + logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) + + # Gather coroutines to run + input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache) + + results_df = {} + # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling + for input_coro in chunks(input_coroutines, 100): + + async def gather_stuff(): + return await asyncio.gather(*input_coro, return_exceptions=True) + + with self._loop_lock: + results = self.loop.run_until_complete(gather_stuff()) + + for res in results: + if isinstance(res, Exception): + logger.warning(f"Async code raised an exception: {repr(res)}") + continue + # Deconstruct tuple (has 5 elements) + pair, timeframe, c_type, ticks, drop_hint = res + drop_incomplete = drop_hint if drop_incomplete is None else drop_incomplete + # TODO: here ohlcv candles get saved into self._trades + ohlcv_df = self._process_ohlcv_df( + pair, timeframe, c_type, ticks, cache, drop_incomplete) + + results_df[(pair, timeframe, c_type)] = ohlcv_df + + # Return cached trades + for pair, timeframe, c_type in cached_pairs: + results_df[(pair, timeframe, c_type)] = self.trades( + (pair, timeframe, c_type), + copy=False + ) + + return results_df + + def needed_candle_ms(self, timeframe:str, candle_type:CandleType): + one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( + timeframe, candle_type) + move_to = one_call * self.required_candle_call_count + now = timeframe_to_next_date(timeframe) + return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) + + + def refresh_latest_trades(self, + pair_list: ListPairsWithTimeframes , + data_handler: Callable,# IDataHandler, + *, + cache: bool = True, + ) -> Dict[PairWithTimeframe, DataFrame]: + use_public_trades = self._config.get( + 'exchange', {}).get('use_public_trades', False) + if use_public_trades: + self._refresh_latest_trades(pair_list, data_handler, cache=cache) + + def _refresh_latest_trades(self, + pair_list: ListPairsWithTimeframes , + data_handler: Callable,# IDataHandler, + *, + cache: bool = True, + + ) -> Dict[PairWithTimeframe, DataFrame]: """ - Refresh in-memory OHLCV asynchronously and set `_klines` with the result + Refresh in-memory TRADES asynchronously and set `_trades` with the result Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). Only used in the dataprovider.refresh() method. - :param pair_list: List of 2 element tuples containing pair, interval to refresh + :param pair_list: List of 3 element tuples containing (pair, timeframe, candle_type) :param since_ms: time since when to download, in milliseconds - :param cache: Assign result to _klines. Usefull for one-off downloads like for pairlists + :param cache: Assign result to _trades. Usefull for one-off downloads like for pairlists :param drop_incomplete: Control candle dropping. Specifying None defaults to _ohlcv_partial_candle :return: Dict of [{(pair, timeframe): Dataframe}] """ - logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) - - # Gather coroutines to run - input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache) - + logger.debug("Refreshing TRADES data for %d pairs", len(pair_list)) + since_ms = None results_df = {} - # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling - for input_coro in chunks(input_coroutines, 100): - async def gather_stuff(): - return await asyncio.gather(*input_coro, return_exceptions=True) + for pair, timeframe, candle_type in set(pair_list): + new_ticks = [] + all_stored_ticks = [] + first_candle_ms = self.needed_candle_ms(timeframe, candle_type) + # refresh, if + # a. not in _trades + # b. no cache used + # c. need new data + is_in_cache = (pair, timeframe, candle_type) in self._trades + if ( not is_in_cache or not cache or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)): + logger.debug(f"Refreshing TRADES data for {pair}") + # fetch trades since latest _trades and + # store together with existing trades + try: + until = None + from_id = None + if is_in_cache: + trades = self._trades[(pair, timeframe, candle_type)] + from_id = trades.iloc[-1]['id'] - with self._loop_lock: - results = self.loop.run_until_complete(gather_stuff()) + last_candle_refresh = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) + until = last_candle_refresh * 1000 if last_candle_refresh else arrow.now('UTC').int_timestamp * 1000 - for res in results: - if isinstance(res, Exception): - logger.warning(f"Async code raised an exception: {repr(res)}") - continue - # Deconstruct tuple (has 5 elements) - pair, timeframe, c_type, ticks, drop_hint = res - drop_incomplete_ = drop_hint if drop_incomplete is None else drop_incomplete - ohlcv_df = self._process_ohlcv_df( - pair, timeframe, c_type, ticks, cache, drop_incomplete_) + else: + next_closed_candle_time = timeframe_to_next_date(timeframe) + until = int(next_closed_candle_time.timestamp()) * 1000 + all_stored_ticks = data_handler.trades_load(f"{pair}-cached") + if all_stored_ticks: + if all_stored_ticks[0][0] <= first_candle_ms: + from_id = all_stored_ticks[-1][1] + # from_id overrides simce_ms + since_ms = all_stored_ticks[-1][0] + # doesn't go far enough + else: + all_stored_ticks = [] - results_df[(pair, timeframe, c_type)] = ohlcv_df + # from_id overrules with exchange set to id paginate + # TODO: DEBUG: + # since_ms = 1681284338000 + # from_id = None + # TODO: /DEBUG + [ticks_pair, new_ticks]=self._download_trades_history(pair, + since=since_ms if since_ms else first_candle_ms, + until=until, + from_id=from_id) - # Return cached klines - for pair, timeframe, c_type in cached_pairs: - results_df[(pair, timeframe, c_type)] = self.klines( - (pair, timeframe, c_type), - copy=False - ) + except Exception as e: + logger.error(f"Refreshing TRADES data for {pair} failed") + logger.error(e) + + + if new_ticks: + drop_incomplete = False # TODO: remove, no incomplete trades + all_stored_ticks.extend(new_ticks) + # NOTE: only process new trades + # self._trades = until_first_candle(stored_trades) + fetch_trades + trades_df = self._process_trades_df(pair, timeframe, candle_type, all_stored_ticks, cache, drop_incomplete, first_candle_ms) + results_df[(pair, timeframe, candle_type)] = trades_df + data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS].values.tolist()) return results_df + def _now_is_time_to_refresh(self, pair: str, timeframe: str, candle_type: CandleType) -> bool: # Timeframe in seconds interval_in_sec = timeframe_to_seconds(timeframe) @@ -2114,6 +2399,13 @@ class Exchange: now = int(timeframe_to_prev_date(timeframe).timestamp()) return plr < now + def _now_is_time_to_refresh_trades(self, pair: str, timeframe: str, candle_type: CandleType) -> bool: + # Timeframe in seconds + interval_in_sec = timeframe_to_seconds(timeframe) + plr = self._trades_last_refresh_time.get((pair, timeframe, candle_type), 0) + interval_in_sec + REFRESH_EARLIER_SECONDS = 5 + return plr < arrow.utcnow().int_timestamp - REFRESH_EARLIER_SECONDS + @retrier_async async def _async_get_candle_history( self, @@ -2179,6 +2471,75 @@ class Exchange: raise OperationalException(f'Could not fetch historical candle (OHLCV) data ' f'for pair {pair}. Message: {e}') from e + + @retrier_async + async def _async_get_trades_history( + self, + pair: str, + timeframe: str, + candle_type: CandleType, + since_ms: Optional[int] = None, + ) -> Ticker: + """ + Asynchronously get candle history data using fetch_trades + :param candle_type: '', mark, index, premiumIndex, or funding_rate + returns tuple: (pair, timeframe, trades_list) + """ + try: + # Fetch TRADES asynchronously + s = '(' + arrow.get(since_ms // 1000).isoformat() + ') ' if since_ms is not None else '' + logger.debug( + "Fetching pair %s, %s, interval %s, since %s %s...", + pair, candle_type, timeframe, since_ms, s + ) + params = deepcopy(self._ft_has.get('trades_params', {})) + candle_limit = self.trades_candle_limit( + timeframe, candle_type=candle_type, since_ms=since_ms) + + if candle_type and candle_type != CandleType.SPOT: + params.update({'price': candle_type.value}) + if candle_type != CandleType.FUNDING_RATE: + assert since_ms is not None # NOTE: with none there seems no response + data = await self._api_async.fetch_trades( + pair, since=since_ms, + limit=candle_limit, params=params) + else: + # TODO: debug? + # Funding rate + data = await self._fetch_funding_rate_history( + pair=pair, + timeframe=timeframe, + limit=candle_limit, + since_ms=since_ms, + ) + # Some exchanges sort TRADES in ASC order and others in DESC. + # Ex: Bittrex returns the list of TRADES in ASC order (oldest first, newest last) + # while GDAX returns the list of TRADES in DESC order (newest first, oldest last) + # Only sort if necessary to save computing time + try: + # TODO: check if even needed? + if data and data[0]['timestamp'] > data[-1]['timestamp']: + data = sorted(data, key=lambda x: x[0]) + except KeyError: + logger.exception("Error loading %s. Result was %s.", pair, data) + return pair, timeframe, candle_type, [], True + logger.debug("Done fetching pair %s, interval %s ...", pair, timeframe) + return pair, timeframe, candle_type, data, True + + except ccxt.NotSupported as e: + raise OperationalException( + f'Exchange {self._api.name} does not support fetching historical ' + f'candle (TRADES) data. Message: {e}') from e + except ccxt.DDoSProtection as e: + raise DDosProtection(e) from e + except (ccxt.NetworkError, ccxt.ExchangeError) as e: + raise TemporaryError(f'Could not fetch historical candle (TRADES) data ' + f'for pair {pair} due to {e.__class__.__name__}. ' + f'Message: {e}') from e + except ccxt.BaseError as e: + raise OperationalException(f'Could not fetch historical candle (TRADES) data ' + f'for pair {pair}. Message: {e}') from e + async def _fetch_funding_rate_history( self, pair: str, @@ -2211,18 +2572,21 @@ class Exchange: returns: List of dicts containing trades """ try: + candle_limit = self.trades_candle_limit("1m", candle_type=CandleType.FUTURES, since_ms=since) # fetch trades asynchronously if params: logger.debug("Fetching trades for pair %s, params: %s ", pair, params) - trades = await self._api_async.fetch_trades(pair, params=params, limit=1000) + trades = await self._api_async.fetch_trades(pair, params=params, limit=candle_limit) else: logger.debug( "Fetching trades for pair %s, since %s %s...", pair, since, '(' + dt_from_ts(since).isoformat() + ') ' if since is not None else '' ) - trades = await self._api_async.fetch_trades(pair, since=since, limit=1000) + trades = await self._api_async.fetch_trades(pair, since=since, limit=candle_limit) trades = self._trades_contracts_to_amount(trades) + + logger.debug( "Fetched trades for pair %s, datetime: %s (%d).", pair, trades[0]['datetime'], trades[0]['timestamp'] ) return trades_dict_to_list(trades) except ccxt.NotSupported as e: raise OperationalException( @@ -2239,7 +2603,8 @@ class Exchange: async def _async_get_trade_history_id(self, pair: str, until: int, since: Optional[int] = None, - from_id: Optional[str] = None) -> Tuple[str, List[List]]: + from_id: Optional[str] = None, + stop_on_from_id: Optional[bool] = True) -> Tuple[str, List[List]]: """ Asyncronously gets trade history using fetch_trades use this when exchange uses id-based iteration (check `self._trades_pagination`) @@ -2252,17 +2617,20 @@ class Exchange: trades: List[List] = [] + if not until and not stop_on_from_id: + raise "stop_on_from_id must be set if until is not set" + if not from_id: # Fetch first elements using timebased method to get an ID to paginate on # Depending on the Exchange, this can introduce a drift at the start of the interval # of up to an hour. # e.g. Binance returns the "last 1000" candles within a 1h time interval # - so we will miss the first trades. - t = await self._async_fetch_trades(pair, since=since) + trade = await self._async_fetch_trades(pair, since=since) # DEFAULT_TRADES_COLUMNS: 0 -> timestamp # DEFAULT_TRADES_COLUMNS: 1 -> id - from_id = t[-1][1] - trades.extend(t[:-1]) + from_id = trade[-1][1] + trades.extend(trade[:-1]) while True: try: t = await self._async_fetch_trades(pair, @@ -2322,7 +2690,9 @@ class Exchange: async def _async_get_trade_history(self, pair: str, since: Optional[int] = None, until: Optional[int] = None, - from_id: Optional[str] = None) -> Tuple[str, List[List]]: + from_id: Optional[str] = None, + stop_on_from_id: Optional[bool] = True, + ) -> Tuple[str, List[List]]: """ Async wrapper handling downloading trades using either time or id based methods. """ @@ -2330,16 +2700,17 @@ class Exchange: logger.debug(f"_async_get_trade_history(), pair: {pair}, " f"since: {since}, until: {until}, from_id: {from_id}") - if until is None: - until = ccxt.Exchange.milliseconds() + if self._trades_pagination == 'time': + if until is None: + until = ccxt.Exchange.milliseconds() logger.debug(f"Exchange milliseconds: {until}") - if self._trades_pagination == 'time': return await self._async_get_trade_history_time( pair=pair, since=since, until=until) elif self._trades_pagination == 'id': return await self._async_get_trade_history_id( - pair=pair, since=since, until=until, from_id=from_id + pair=pair, since=since, until=until, from_id=from_id, + stop_on_from_id=stop_on_from_id ) else: raise OperationalException(f"Exchange {self.name} does use neither time, " @@ -2348,7 +2719,9 @@ class Exchange: def get_historic_trades(self, pair: str, since: Optional[int] = None, until: Optional[int] = None, - from_id: Optional[str] = None) -> Tuple[str, List]: + from_id: Optional[str] = None, + stop_on_from_id: Optional[bool] = True + ) -> Tuple[str, List]: """ Get trade history data using asyncio. Handles all async work and returns the list of candles. @@ -2374,6 +2747,33 @@ class Exchange: pass return self.loop.run_until_complete(task) + def _download_trades_history(self, + pair: str, + *, + new_pairs_days: int = 30, + since: Optional[int] = None, + until: Optional[int] = None, + from_id: Optional[int] = None, + stop_on_from_id: Optional[bool] = False + ) -> bool: + + """ + Download trade history from the exchange. + Appends to previously downloaded trades data. + :param until: is in msecs + :param since: is in msecs + :return Boolean of success + """ + + # if not until: + # until = arrow.utcnow().int_timestamp * 1000 + new_trades = self.get_historic_trades(pair=pair, + since=since, + until=until, + from_id=from_id, + stop_on_from_id=stop_on_from_id) + return new_trades + @retrier def _get_funding_fees_from_exchange(self, pair: str, since: Union[datetime, int]) -> float: """ diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 5cdbb6bf6..c5a0d5871 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -14,7 +14,8 @@ from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, MarketDirection, RunMode, SignalDirection, SignalTagType, SignalType, TradingMode) from freqtrade.exceptions import OperationalException, StrategyError -from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds +from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds, timeframe_to_msecs +from freqtrade.data import converter from freqtrade.misc import remove_entry_exit_signals from freqtrade.persistence import Order, PairLocks, Trade from freqtrade.strategy.hyper import HyperStrategyMixin @@ -840,6 +841,7 @@ class IStrategy(ABC, HyperStrategyMixin): dataframe = self.advise_indicators(dataframe, metadata) dataframe = self.advise_entry(dataframe, metadata) dataframe = self.advise_exit(dataframe, metadata) + logger.debug("TA Analysis Ended") return dataframe def _analyze_ticker_internal(self, dataframe: DataFrame, metadata: dict) -> DataFrame: @@ -1364,6 +1366,23 @@ class IStrategy(ABC, HyperStrategyMixin): dataframe = _create_and_merge_informative_pair( self, dataframe, metadata, inf_data, populate_fn) + # TODO: extract this into a separate method e.g. if_enabled_populate_trades() + use_public_trades = self.config.get( + 'exchange', {}).get('use_public_trades', False) + if use_public_trades: + trades = self.dp.trades(pair=metadata['pair'], copy=False) + + config = self.config + config['timeframe'] = self.timeframe + # TODO: slice trades to size of dataframe for faster backtesting + dataframe = converter.populate_dataframe_with_trades( + config, + dataframe, + trades, + pair=metadata['pair']) + + logger.debug("Populated dataframe with trades.") + return self.populate_indicators(dataframe, metadata) def advise_entry(self, dataframe: DataFrame, metadata: dict) -> DataFrame: