diff --git a/freqtrade/data/converter/converter.py b/freqtrade/data/converter/converter.py index 81492276d..13e5b2b10 100644 --- a/freqtrade/data/converter/converter.py +++ b/freqtrade/data/converter/converter.py @@ -3,7 +3,7 @@ Functions to convert data from one format to another """ import logging import time -from typing import Dict +from typing import Dict, List import numpy as np import pandas as pd @@ -31,7 +31,8 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *, :param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete :return: DataFrame """ - logger.debug(f"Converting candle (OHLCV) data to dataframe for pair {pair}.") + logger.debug( + f"Converting candle (OHLCV) data to dataframe for pair {pair}.") cols = DEFAULT_DATAFRAME_COLUMNS df = DataFrame(ohlcv, columns=cols) @@ -130,7 +131,7 @@ def populate_dataframe_with_trades(config: Config, # because that this candle isn't finished yet if candle_next not in trades_grouped_by_candle_start.groups: logger.warning( - f"candle at {candle_start} with {len(trades_grouped_df)} trades might be unfinished, because no finished trades at {candle_next}") # noqa + f"candle at {candle_start} with {len(trades_grouped_df)} trades might be unfinished, because no finished trades at {candle_next}") # noqa # add trades to each candle df.loc[is_between, 'trades'] = df.loc[is_between, @@ -200,11 +201,9 @@ def populate_dataframe_with_trades(config: Config, return dataframe -def public_trades_to_dataframe(trades: list, - timeframe: str, - pair: str, *, - fill_missing: bool = True, - drop_incomplete: bool = True) -> DataFrame: +def public_trades_to_dataframe(trades: List, + pair: str, + ) -> DataFrame: """ Converts a list with candle (TRADES) data (in format returned by ccxt.fetch_trades) to a Dataframe @@ -228,15 +227,6 @@ def public_trades_to_dataframe(trades: list, # and fail with exception... df = df.astype(dtype={'amount': 'float', 'cost': 'float', 'price': 'float'}) - # - # df.columns - # df = clean_duplicate_trades(df, timeframe, pair, - # fill_missing=fill_missing, - # drop_incomplete=drop_incomplete) - - # df = drop_incomplete_and_fill_missing_trades(df, timeframe, pair, - # fill_missing=fill_missing, - # drop_incomplete=drop_incomplete) return df @@ -459,7 +449,8 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) df.reset_index(inplace=True) len_before = len(dataframe) len_after = len(df) - pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0 + pct_missing = (len_after - len_before) / \ + len_before if len_before > 0 else 0 if len_before != len_after: message = (f"Missing data fillup for {pair}, {timeframe}: " f"before: {len_before} - after: {len_after} - {pct_missing:.2%}") @@ -504,7 +495,8 @@ def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange, processed: Dict[str, DataFrame] = {} for pair, df in preprocessed.items(): - trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles) + trimed_df = trim_dataframe( + df, timerange, startup_candles=startup_candles) if not trimed_df.empty: processed[pair] = trimed_df else: @@ -560,15 +552,18 @@ def convert_ohlcv_format( candle_types = [CandleType.from_string(ct) for ct in config.get('candle_types', [ c.value for c in CandleType])] logger.info(candle_types) - paircombs = src.ohlcv_get_available_data(config['datadir'], TradingMode.SPOT) - paircombs.extend(src.ohlcv_get_available_data(config['datadir'], TradingMode.FUTURES)) + paircombs = src.ohlcv_get_available_data( + config['datadir'], TradingMode.SPOT) + paircombs.extend(src.ohlcv_get_available_data( + config['datadir'], TradingMode.FUTURES)) if 'pairs' in config: # Filter pairs paircombs = [comb for comb in paircombs if comb[0] in config['pairs']] if 'timeframes' in config: - paircombs = [comb for comb in paircombs if comb[1] in config['timeframes']] + paircombs = [comb for comb in paircombs if comb[1] + in config['timeframes']] paircombs = [comb for comb in paircombs if comb[2] in candle_types] paircombs = sorted(paircombs, key=lambda x: (x[0], x[1], x[2].value)) @@ -585,7 +580,8 @@ def convert_ohlcv_format( drop_incomplete=False, startup_candles=0, candle_type=candle_type) - logger.info(f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}") + logger.info( + f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}") if len(data) > 0: trg.ohlcv_store( pair=pair, @@ -595,7 +591,8 @@ def convert_ohlcv_format( ) if erase and convert_from != convert_to: logger.info(f"Deleting source data for {pair} / {timeframe}") - src.ohlcv_purge(pair=pair, timeframe=timeframe, candle_type=candle_type) + src.ohlcv_purge(pair=pair, timeframe=timeframe, + candle_type=candle_type) def reduce_dataframe_footprint(df: DataFrame) -> DataFrame: diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 923136cbe..41d775405 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -46,23 +46,27 @@ class DataProvider: self._exchange = exchange self._pairlists = pairlists self.__rpc = rpc - self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} + self.__cached_pairs: Dict[PairWithTimeframe, + Tuple[DataFrame, datetime]] = {} self.__slice_index: Optional[int] = None self.__slice_date: Optional[datetime] = None - self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {} + self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = { + } self.__producer_pairs_df: Dict[str, Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]] = {} self.__producer_pairs: Dict[str, List[str]] = {} self._msg_queue: deque = deque() - self._default_candle_type = self._config.get('candle_type_def', CandleType.SPOT) + self._default_candle_type = self._config.get( + 'candle_type_def', CandleType.SPOT) self._default_timeframe = self._config.get('timeframe', '1h') self.__msg_cache = PeriodicCache( maxsize=1000, ttl=timeframe_to_seconds(self._default_timeframe)) - self.producers = self._config.get('external_message_consumer', {}).get('producers', []) + self.producers = self._config.get( + 'external_message_consumer', {}).get('producers', []) self.external_data_enabled = len(self.producers) > 0 def _set_dataframe_max_index(self, limit_index: int): @@ -133,19 +137,19 @@ class DataProvider: """ if self.__rpc: msg: RPCAnalyzedDFMsg = { - 'type': RPCMessageType.ANALYZED_DF, - 'data': { - 'key': pair_key, - 'df': dataframe.tail(1), - 'la': datetime.now(timezone.utc) - } + 'type': RPCMessageType.ANALYZED_DF, + 'data': { + 'key': pair_key, + 'df': dataframe.tail(1), + 'la': datetime.now(timezone.utc) } + } self.__rpc.send_msg(msg) if new_candle: self.__rpc.send_msg({ - 'type': RPCMessageType.NEW_CANDLE, - 'data': pair_key, - }) + 'type': RPCMessageType.NEW_CANDLE, + 'data': pair_key, + }) def _replace_external_df( self, @@ -168,10 +172,13 @@ class DataProvider: if producer_name not in self.__producer_pairs_df: self.__producer_pairs_df[producer_name] = {} - _last_analyzed = datetime.now(timezone.utc) if not last_analyzed else last_analyzed + _last_analyzed = datetime.now( + timezone.utc) if not last_analyzed else last_analyzed - self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed) - logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.") + self.__producer_pairs_df[producer_name][pair_key] = ( + dataframe, _last_analyzed) + logger.debug( + f"External DataFrame for {pair_key} from {producer_name} added.") def _add_external_df( self, @@ -222,7 +229,8 @@ class DataProvider: # CHECK FOR MISSING CANDLES # Convert the timeframe to a timedelta for pandas timeframe_delta: Timedelta = to_timedelta(timeframe) - local_last: Timestamp = existing_df.iloc[-1]['date'] # We want the last date from our copy + # We want the last date from our copy + local_last: Timestamp = existing_df.iloc[-1]['date'] # We want the first date from the incoming incoming_first: Timestamp = dataframe.iloc[0]['date'] @@ -245,13 +253,13 @@ class DataProvider: # Everything is good, we appended self._replace_external_df( - pair, - appended_df, - last_analyzed=last_analyzed, - timeframe=timeframe, - candle_type=candle_type, - producer_name=producer_name - ) + pair, + appended_df, + last_analyzed=last_analyzed, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ) return (True, 0) def get_producer_df( @@ -339,10 +347,13 @@ class DataProvider: startup_candles = self._config.get('startup_candle_count', 0) indicator_periods = freqai_config['feature_parameters']['indicator_periods_candles'] # make sure the startupcandles is at least the set maximum indicator periods - self._config['startup_candle_count'] = max(startup_candles, max(indicator_periods)) + self._config['startup_candle_count'] = max( + startup_candles, max(indicator_periods)) tf_seconds = timeframe_to_seconds(timeframe) - train_candles = freqai_config['train_period_days'] * 86400 / tf_seconds - total_candles = int(self._config['startup_candle_count'] + train_candles) + train_candles = freqai_config['train_period_days'] * \ + 86400 / tf_seconds + total_candles = int( + self._config['startup_candle_count'] + train_candles) logger.info( f'Increasing startup_candle_count for freqai on {timeframe} to {total_candles}') return total_candles @@ -365,18 +376,22 @@ class DataProvider: """ if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE): # Get live OHLCV data. - data = self.ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type) + data = self.ohlcv(pair=pair, timeframe=timeframe, + candle_type=candle_type) else: # Get historical OHLCV data (cached on disk). timeframe = timeframe or self._config['timeframe'] - data = self.historic_ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type) + data = self.historic_ohlcv( + pair=pair, timeframe=timeframe, candle_type=candle_type) # Cut date to timeframe-specific date. # This is necessary to prevent lookahead bias in callbacks through informative pairs. if self.__slice_date: - cutoff_date = timeframe_to_prev_date(timeframe, self.__slice_date) + cutoff_date = timeframe_to_prev_date( + timeframe, self.__slice_date) data = data.loc[data['date'] < cutoff_date] if len(data) == 0: - logger.warning(f"No data found for ({pair}, {timeframe}, {candle_type}).") + logger.warning( + f"No data found for ({pair}, {timeframe}, {candle_type}).") return data def get_analyzed_dataframe(self, pair: str, timeframe: str) -> Tuple[DataFrame, datetime]: @@ -389,7 +404,8 @@ class DataProvider: combination. Returns empty dataframe and Epoch 0 (1970-01-01) if no dataframe was cached. """ - pair_key = (pair, timeframe, self._config.get('candle_type_def', CandleType.SPOT)) + pair_key = (pair, timeframe, self._config.get( + 'candle_type_def', CandleType.SPOT)) if pair_key in self.__cached_pairs: if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE): df, date = self.__cached_pairs[pair_key] @@ -397,7 +413,8 @@ class DataProvider: df, date = self.__cached_pairs[pair_key] if self.__slice_index is not None: max_index = self.__slice_index - df = df.iloc[max(0, max_index - MAX_DATAFRAME_CANDLES):max_index] + df = df.iloc[max( + 0, max_index - MAX_DATAFRAME_CANDLES):max_index] return df, date else: return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) @@ -422,7 +439,8 @@ class DataProvider: if self._pairlists: return self._pairlists.whitelist.copy() else: - raise OperationalException("Dataprovider was not initialized with a pairlist provider.") + raise OperationalException( + "Dataprovider was not initialized with a pairlist provider.") def clear_cache(self): """ @@ -461,8 +479,8 @@ class DataProvider: if use_public_trades: datahandler = get_datahandler( self._config['datadir'], data_format=self._config['dataformat_trades']) - return self._exchange.refresh_latest_trades(pairlist, datahandler) - return {} + if self._exchange: + self._exchange.refresh_latest_trades(pairlist, datahandler) @property def available_pairs(self) -> ListPairsWithTimeframes: @@ -533,8 +551,8 @@ class DataProvider: data_handler = get_datahandler( self._config['datadir'], data_format=self._config['dataformat_trades']) ticks = data_handler.trades_load(pair) - trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False, - drop_incomplete=False) + trades_df = public_trades_to_dataframe( + ticks.values.tolist(), pair=pair) return trades_df else: diff --git a/freqtrade/data/history/history_utils.py b/freqtrade/data/history/history_utils.py index cffcb1826..7304fa606 100644 --- a/freqtrade/data/history/history_utils.py +++ b/freqtrade/data/history/history_utils.py @@ -107,9 +107,11 @@ def load_data(datadir: Path, result[pair] = hist else: if candle_type is CandleType.FUNDING_RATE and user_futures_funding_rate is not None: - logger.warn(f"{pair} using user specified [{user_futures_funding_rate}]") + logger.warn( + f"{pair} using user specified [{user_futures_funding_rate}]") elif candle_type not in (CandleType.SPOT, CandleType.FUTURES): - result[pair] = DataFrame(columns=["date", "open", "close", "high", "low", "volume"]) + result[pair] = DataFrame( + columns=["date", "open", "close", "high", "low", "volume"]) if fail_without_data and not result: raise OperationalException("No data found. Terminating.") @@ -217,7 +219,8 @@ def _download_pair_history(pair: str, *, try: if erase: if data_handler.ohlcv_purge(pair, timeframe, candle_type=candle_type): - logger.info(f'Deleting existing data for pair {pair}, {timeframe}, {candle_type}.') + logger.info( + f'Deleting existing data for pair {pair}, {timeframe}, {candle_type}.') data, since_ms, until_ms = _load_cached_data_for_updating( pair, timeframe, timerange, @@ -266,7 +269,8 @@ def _download_pair_history(pair: str, *, f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}" if not data.empty else 'None') - data_handler.ohlcv_store(pair, timeframe, data=data, candle_type=candle_type) + data_handler.ohlcv_store( + pair, timeframe, data=data, candle_type=candle_type) return True except Exception: @@ -299,7 +303,8 @@ def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes continue for timeframe in timeframes: - logger.debug(f'Downloading pair {pair}, {candle_type}, interval {timeframe}.') + logger.debug( + f'Downloading pair {pair}, {candle_type}, interval {timeframe}.') process = f'{idx}/{len(pairs)}' _download_pair_history(pair=pair, process=process, datadir=datadir, exchange=exchange, @@ -313,12 +318,15 @@ def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes tf_mark = exchange.get_option('mark_ohlcv_timeframe') tf_funding_rate = exchange.get_option('funding_fee_timeframe') - fr_candle_type = CandleType.from_string(exchange.get_option('mark_ohlcv_price')) + fr_candle_type = CandleType.from_string( + exchange.get_option('mark_ohlcv_price')) # All exchanges need FundingRate for futures trading. # The timeframe is aligned to the mark-price timeframe. - combs = ((CandleType.FUNDING_RATE, tf_funding_rate), (fr_candle_type, tf_mark)) + combs = ((CandleType.FUNDING_RATE, tf_funding_rate), + (fr_candle_type, tf_mark)) for candle_type_f, tf in combs: - logger.debug(f'Downloading pair {pair}, {candle_type_f}, interval {tf}.') + logger.debug( + f'Downloading pair {pair}, {candle_type_f}, interval {tf}.') _download_pair_history(pair=pair, process=process, datadir=datadir, exchange=exchange, timerange=timerange, data_handler=data_handler, @@ -368,8 +376,9 @@ def _download_trades_history(exchange: Exchange, # Reset since to the last available point # - 5 seconds (to ensure we're getting all trades) since = trades.iloc[-1]['timestamp'] - (5 * 1000) - logger.info(f"Using last trade date -5s - Downloading trades for {pair} " - f"since: {format_ms_time(since)}.") + if since: + logger.info(f"Using last trade date -5s - Downloading trades for {pair} " + f"since: {format_ms_time(since)}.") if not since: since = dt_ts(dt_now() - timedelta(days=new_pairs_days)) @@ -443,7 +452,8 @@ def get_timerange(data: Dict[str, DataFrame]) -> Tuple[datetime, datetime]: :return: tuple containing min_date, max_date """ timeranges = [ - (frame['date'].min().to_pydatetime(), frame['date'].max().to_pydatetime()) + (frame['date'].min().to_pydatetime(), + frame['date'].max().to_pydatetime()) for frame in data.values() ] return (min(timeranges, key=operator.itemgetter(0))[0], @@ -462,7 +472,8 @@ def validate_backtest_data(data: DataFrame, pair: str, min_date: datetime, :param timeframe_min: Timeframe in minutes """ # total difference in minutes / timeframe-minutes - expected_frames = int((max_date - min_date).total_seconds() // 60 // timeframe_min) + expected_frames = int( + (max_date - min_date).total_seconds() // 60 // timeframe_min) found_missing = False dflen = len(data) if dflen < expected_frames: @@ -476,7 +487,8 @@ def download_data_main(config: Config) -> None: timerange = TimeRange() if 'days' in config: - time_since = (datetime.now() - timedelta(days=config['days'])).strftime("%Y%m%d") + time_since = (datetime.now() - + timedelta(days=config['days'])).strftime("%Y%m%d") timerange = TimeRange.parse_timerange(f'{time_since}-') if 'timerange' in config: @@ -493,7 +505,7 @@ def download_data_main(config: Config) -> None: available_pairs = [ p for p in exchange.get_markets( tradable_only=True, active_only=not config.get('include_inactive') - ).keys() + ).keys() ] expanded_pairs = dynamic_expand_pairlist(config, available_pairs) @@ -526,7 +538,8 @@ def download_data_main(config: Config) -> None: # Convert downloaded trade data to different timeframes convert_trades_to_ohlcv( pairs=expanded_pairs, timeframes=config['timeframes'], - datadir=config['datadir'], timerange=timerange, erase=bool(config.get('erase')), + datadir=config['datadir'], timerange=timerange, erase=bool( + config.get('erase')), data_format_ohlcv=config['dataformat_ohlcv'], data_format_trades=config['dataformat_trades'], ) @@ -536,7 +549,7 @@ def download_data_main(config: Config) -> None: f"Historic klines not available for {exchange.name}. " "Please use `--dl-trades` instead for this exchange " "(will unfortunately take a long time)." - ) + ) migrate_data(config, exchange) pairs_not_available = refresh_backtest_ohlcv_data( exchange, pairs=expanded_pairs, timeframes=config['timeframes'], diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index bde9cc6f6..901043913 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -2114,8 +2114,7 @@ class Exchange: def _process_trades_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], cache: bool, drop_incomplete: bool, first_required_candle_date: Optional[int]) -> DataFrame: # keeping parsed dataframe in cache - trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False, - drop_incomplete=drop_incomplete) + trades_df = public_trades_to_dataframe(ticks, pair=pair) # keeping last candle time as last refreshed time of the pair if ticks and cache: idx = -2 if drop_incomplete and len(ticks) > 1 else -1 @@ -2188,7 +2187,7 @@ class Exchange: def refresh_latest_trades(self, pair_list: ListPairsWithTimeframes, - data_handler: Callable, # using IDataHandler ends with circular import, + data_handler: Any, # using IDataHandler ends with circular import *, cache: bool = True, ) -> Dict[PairWithTimeframe, DataFrame]: @@ -2207,7 +2206,7 @@ class Exchange: since_ms = None results_df = {} for pair, timeframe, candle_type in set(pair_list): - new_ticks = [] + new_ticks: List = [] all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ['date']) first_candle_ms = self.needed_candle_ms(timeframe, candle_type) # refresh, if @@ -2273,7 +2272,7 @@ class Exchange: data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS]) else: - raise "no new ticks" + raise OperationalException("no new ticks") return results_df @@ -2442,7 +2441,7 @@ class Exchange: return trades[-1].get('timestamp') async def _async_get_trade_history_id(self, pair: str, - until: int, + until: Optional[int], since: Optional[int] = None, from_id: Optional[str] = None, stop_on_from_id: Optional[bool] = True) -> Tuple[str, List[List]]: @@ -2464,7 +2463,7 @@ class Exchange: x = slice(None, -1) if has_overlap else slice(None) if not until and not stop_on_from_id: - raise "stop_on_from_id must be set if until is not set" + raise OperationalException("stop_on_from_id must be set if until is not set") if not from_id or not self._valid_trade_pagination_id(pair, from_id): # Fetch first elements using timebased method to get an ID to paginate on # Depending on the Exchange, this can introduce a drift at the start of the interval @@ -2602,9 +2601,9 @@ class Exchange: new_pairs_days: int = 30, since: Optional[int] = None, until: Optional[int] = None, - from_id: Optional[int] = None, + from_id: Optional[str] = None, stop_on_from_id: Optional[bool] = False - ) -> bool: + ) -> Tuple[str, List]: """ Download trade history from the exchange.