From d7b88194e024a9457184f120efb4971e7e84087e Mon Sep 17 00:00:00 2001 From: Joe Schr <8218910+TheJoeSchr@users.noreply.github.com> Date: Mon, 12 Feb 2024 13:09:28 +0100 Subject: [PATCH] Resolve various issues and comments --- freqtrade/data/converter/converter.py | 108 ++++++++++++++------------ freqtrade/exchange/exchange.py | 1 - 2 files changed, 59 insertions(+), 50 deletions(-) diff --git a/freqtrade/data/converter/converter.py b/freqtrade/data/converter/converter.py index 38a05558f..13193f6ae 100644 --- a/freqtrade/data/converter/converter.py +++ b/freqtrade/data/converter/converter.py @@ -3,15 +3,16 @@ Functions to convert data from one format to another """ import logging import time -from typing import Dict, List +from typing import Dict import numpy as np import pandas as pd from pandas import DataFrame, to_datetime from freqtrade.constants import (DEFAULT_DATAFRAME_COLUMNS, DEFAULT_ORDERFLOW_COLUMNS, - DEFAULT_TRADES_COLUMNS, Config) + Config) from freqtrade.enums import CandleType, TradingMode +from freqtrade.exchange.exchange_utils import timeframe_to_resample_freq logger = logging.getLogger(__name__) @@ -30,7 +31,8 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *, :param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete :return: DataFrame """ - logger.debug(f"Converting candle (OHLCV) data to dataframe for pair {pair}.") + logger.debug( + f"Converting candle (OHLCV) data to dataframe for pair {pair}.") cols = DEFAULT_DATAFRAME_COLUMNS df = DataFrame(ohlcv, columns=cols) @@ -72,13 +74,16 @@ def _convert_timeframe_to_pandas_frequency(timeframe: str): def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str): - timeframe_frequency, timeframe_minutes = _convert_timeframe_to_pandas_frequency( + _, timeframe_minutes = _convert_timeframe_to_pandas_frequency( timeframe) + + timeframe_frequency = timeframe_to_resample_freq(timeframe) # calculate ohlcv candle start and end if df is not None and not df.empty: df['datetime'] = pd.to_datetime(df['date'], unit='ms') df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency) - df['candle_end'] = df['candle_start'] + pd.Timedelta(timeframe_minutes) + df['candle_end'] = df['candle_start'] + \ + pd.Timedelta(minutes=timeframe_minutes) df.drop(columns=['datetime'], inplace=True) @@ -103,25 +108,26 @@ def populate_dataframe_with_trades(config: Config, try: start_time = time.time() # calculate ohlcv candle start and end + # TODO: check if call is necessary for df. _calculate_ohlcv_candle_start_and_end(df, timeframe) _calculate_ohlcv_candle_start_and_end(trades, timeframe) # slice of trades that are before current ohlcv candles to make groupby faster + # TODO: maybe use df.date instead of df.candle_start at comparision below trades = trades.loc[trades.candle_start >= df.candle_start[0]] trades.reset_index(inplace=True, drop=True) # group trades by candle start - trades_grouped_by_candle_start = trades.groupby('candle_start', group_keys=False) - # repair 'date' datetime type (otherwise crashes on each compare) - if "date" in dataframe.columns: - dataframe['date'] = pd.to_datetime(dataframe['date']) + trades_grouped_by_candle_start = trades.groupby( + 'candle_start', group_keys=False) for candle_start in trades_grouped_by_candle_start.groups: trades_grouped_df = trades[candle_start == trades['candle_start']] is_between = (candle_start == df['candle_start']) if np.any(is_between == True): # noqa: E712 (_, timeframe_minutes) = _convert_timeframe_to_pandas_frequency(timeframe) - candle_next = candle_start + pd.Timedelta(minutes=timeframe_minutes) + candle_next = candle_start + \ + pd.Timedelta(minutes=timeframe_minutes) # skip if there are no trades at next candle # because that this candle isn't finished yet if candle_next not in trades_grouped_by_candle_start.groups: @@ -153,6 +159,7 @@ def populate_dataframe_with_trades(config: Config, is_between, 'imbalances'].apply( lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb)) + # TODO: maybe use simple np.where instead buy = df.loc[is_between, 'bid'].apply(lambda _: np.where( trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount'])) sell = df.loc[is_between, 'ask'].apply(lambda _: np.where( @@ -184,10 +191,13 @@ def populate_dataframe_with_trades(config: Config, # copy to avoid memory leaks dataframe.loc[is_between] = df.loc[is_between].copy() else: - logger.debug(f"Found NO candles for trades starting with {candle_start}") - logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds") + logger.debug( + f"Found NO candles for trades starting with {candle_start}") + logger.debug( + f"trades.groups_keys in {time.time() - start_time} seconds") - logger.debug(f"trades.singleton_iterate in {time.time() - start_time} seconds") + logger.debug( + f"trades.singleton_iterate in {time.time() - start_time} seconds") except Exception as e: logger.exception("Error populating dataframe with trades:", e) @@ -203,13 +213,16 @@ def trades_to_volumeprofile_with_total_delta_bid_ask(trades: DataFrame, scale: f """ df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS) # create bid, ask where side is sell or buy - df['bid_amount'] = np.where(trades['side'].str.contains('buy'), 0, trades['amount']) - df['ask_amount'] = np.where(trades['side'].str.contains('sell'), 0, trades['amount']) + df['bid_amount'] = np.where( + trades['side'].str.contains('buy'), 0, trades['amount']) + df['ask_amount'] = np.where( + trades['side'].str.contains('sell'), 0, trades['amount']) df['bid'] = np.where(trades['side'].str.contains('buy'), 0, 1) df['ask'] = np.where(trades['side'].str.contains('sell'), 0, 1) # round the prices to the nearest multiple of the scale - df['price'] = ((trades['price'] / scale).round() * scale).astype('float64').values + df['price'] = ((trades['price'] / scale).round() + * scale).astype('float64').values if df.empty: df['total'] = np.nan df['delta'] = np.nan @@ -235,23 +248,25 @@ def trades_orderflow_to_imbalances(df: DataFrame, imbalance_ratio: int, imbalanc ask = df.ask.shift(-1) bid_imbalance = (bid / ask) > (imbalance_ratio / 100) # overwrite bid_imbalance with False if volume is not big enough - bid_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, bid_imbalance) + bid_imbalance_filtered = np.where( + df.total_volume < imbalance_volume, False, bid_imbalance) ask_imbalance = (ask / bid) > (imbalance_ratio / 100) # overwrite ask_imbalance with False if volume is not big enough - ask_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, ask_imbalance) + ask_imbalance_filtered = np.where( + df.total_volume < imbalance_volume, False, ask_imbalance) dataframe = DataFrame({ "bid_imbalance": bid_imbalance_filtered, "ask_imbalance": ask_imbalance_filtered - }, index=df.index, + }, index=df.index, ) return dataframe def stacked_imbalance(df: DataFrame, - label: str = "bid", - stacked_imbalance_range: int = 3, - should_reverse: bool = False): + label: str, + stacked_imbalance_range: int, + should_reverse: bool): """ y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1) https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array @@ -260,11 +275,13 @@ def stacked_imbalance(df: DataFrame, int_series = pd.Series(np.where(imbalance, 1, 0)) stacked = ( int_series * ( - int_series.groupby((int_series != int_series.shift()).cumsum()).cumcount() + 1 - ) + int_series.groupby( + (int_series != int_series.shift()).cumsum()).cumcount() + 1 + ) ) - max_stacked_imbalance_idx = stacked.index[stacked >= stacked_imbalance_range] + max_stacked_imbalance_idx = stacked.index[stacked >= + stacked_imbalance_range] stacked_imbalance_price = np.nan if not max_stacked_imbalance_idx.empty: idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud( @@ -273,11 +290,11 @@ def stacked_imbalance(df: DataFrame, return stacked_imbalance_price -def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int = 3): - return stacked_imbalance(df, 'bid', stacked_imbalance_range) +def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int): + return stacked_imbalance(df, 'bid', stacked_imbalance_range, should_reverse=False) -def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int = 3): +def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int): return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True) @@ -328,20 +345,6 @@ def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *, return data -def drop_incomplete_and_fill_missing_trades(data: DataFrame, timeframe: str, pair: str, *, - fill_missing: bool, drop_incomplete: bool) -> DataFrame: - - # eliminate partial candle - if drop_incomplete: - # TODO: this is not correct, as it drops the last trade only - # but we need to drop the last candle until closed - pass - data.drop(data.tail(1).index, inplace=True) - logger.debug('Dropping last trade') - - return data - - def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) -> DataFrame: """ Fills up missing data with 0 volume rows, @@ -372,7 +375,8 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) df.reset_index(inplace=True) len_before = len(dataframe) len_after = len(df) - pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0 + pct_missing = (len_after - len_before) / \ + len_before if len_before > 0 else 0 if len_before != len_after: message = (f"Missing data fillup for {pair}, {timeframe}: " f"before: {len_before} - after: {len_after} - {pct_missing:.2%}") @@ -417,7 +421,8 @@ def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange, processed: Dict[str, DataFrame] = {} for pair, df in preprocessed.items(): - trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles) + trimed_df = trim_dataframe( + df, timerange, startup_candles=startup_candles) if not trimed_df.empty: processed[pair] = trimed_df else: @@ -473,15 +478,18 @@ def convert_ohlcv_format( candle_types = [CandleType.from_string(ct) for ct in config.get('candle_types', [ c.value for c in CandleType])] logger.info(candle_types) - paircombs = src.ohlcv_get_available_data(config['datadir'], TradingMode.SPOT) - paircombs.extend(src.ohlcv_get_available_data(config['datadir'], TradingMode.FUTURES)) + paircombs = src.ohlcv_get_available_data( + config['datadir'], TradingMode.SPOT) + paircombs.extend(src.ohlcv_get_available_data( + config['datadir'], TradingMode.FUTURES)) if 'pairs' in config: # Filter pairs paircombs = [comb for comb in paircombs if comb[0] in config['pairs']] if 'timeframes' in config: - paircombs = [comb for comb in paircombs if comb[1] in config['timeframes']] + paircombs = [comb for comb in paircombs if comb[1] + in config['timeframes']] paircombs = [comb for comb in paircombs if comb[2] in candle_types] paircombs = sorted(paircombs, key=lambda x: (x[0], x[1], x[2].value)) @@ -498,7 +506,8 @@ def convert_ohlcv_format( drop_incomplete=False, startup_candles=0, candle_type=candle_type) - logger.info(f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}") + logger.info( + f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}") if len(data) > 0: trg.ohlcv_store( pair=pair, @@ -508,7 +517,8 @@ def convert_ohlcv_format( ) if erase and convert_from != convert_to: logger.info(f"Deleting source data for {pair} / {timeframe}") - src.ohlcv_purge(pair=pair, timeframe=timeframe, candle_type=candle_type) + src.ohlcv_purge(pair=pair, timeframe=timeframe, + candle_type=candle_type) def reduce_dataframe_footprint(df: DataFrame) -> DataFrame: diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index a0b6aaa15..833b16fee 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -2257,7 +2257,6 @@ class Exchange: candle_type, all_stored_ticks_list, cache, - drop_incomplete=False, first_required_candle_date=first_candle_ms) results_df[(pair, timeframe, candle_type)] = trades_df data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS])