diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index 1b1d7a565..77acfe902 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -20,16 +20,25 @@ def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): Populates a dataframe with trades columns :param dataframe: Dataframe to populate """ - dataframe["trades"] = dataframe.apply(lambda _: [], axis=1) - dataframe["orderflow"] = dataframe.apply(lambda _: {}, axis=1) + # Initialize columns with appropriate dtypes + dataframe["trades"] = np.nan + dataframe["orderflow"] = np.nan + dataframe["imbalances"] = np.nan + dataframe["stacked_imbalances_bid"] = np.nan + dataframe["stacked_imbalances_ask"] = np.nan + dataframe["max_delta"] = np.nan + dataframe["min_delta"] = np.nan dataframe["bid"] = np.nan dataframe["ask"] = np.nan dataframe["delta"] = np.nan - dataframe["min_delta"] = np.nan - dataframe["max_delta"] = np.nan dataframe["total_trades"] = np.nan - dataframe["stacked_imbalances_bid"] = np.nan - dataframe["stacked_imbalances_ask"] = np.nan + + # Ensure the 'trades' column is of object type + dataframe["trades"] = dataframe["trades"].astype(object) + dataframe["orderflow"] = dataframe["orderflow"].astype(object) + dataframe["imbalances"] = dataframe["imbalances"].astype(object) + dataframe["stacked_imbalances_bid"] = dataframe["stacked_imbalances_bid"].astype(object) + dataframe["stacked_imbalances_ask"] = dataframe["stacked_imbalances_ask"].astype(object) def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): @@ -61,7 +70,6 @@ def populate_dataframe_with_trades( # create columns for trades _init_dataframe_with_trades_columns(dataframe) - df = dataframe.copy() try: start_time = time.time() @@ -70,18 +78,23 @@ def populate_dataframe_with_trades( # get date of earliest max_candles candle max_candles = config_orderflow["max_candles"] - start_date = df.tail(max_candles).date.iat[0] + start_date = dataframe.tail(max_candles).date.iat[0] # slice of trades that are before current ohlcv candles to make groupby faster trades = trades.loc[trades.candle_start >= start_date] trades.reset_index(inplace=True, drop=True) # group trades by candle start trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False) + # Create Series to hold complex data + trades_series = pd.Series(index=dataframe.index, dtype=object) + orderflow_series = pd.Series(index=dataframe.index, dtype=object) + imbalances_series = pd.Series(index=dataframe.index, dtype=object) + stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object) + stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object) - for candle_start in trades_grouped_by_candle_start.groups: - trades_grouped_df = trades[candle_start == trades["candle_start"]] - is_between = candle_start == df["date"] - if np.any(is_between == True): # noqa: E712 + for candle_start, trades_grouped_df in trades_grouped_by_candle_start: + is_between = candle_start == dataframe["date"] + if is_between.any(): from freqtrade.exchange import timeframe_to_next_date candle_next = timeframe_to_next_date(timeframe, candle_start) @@ -93,32 +106,34 @@ def populate_dataframe_with_trades( f"might be unfinished, because no finished trades at {candle_next}" ) - # add trades to each candle - df.loc[is_between, "trades"] = df.loc[is_between, "trades"].apply( - lambda _: trades_grouped_df - ) - # calculate orderflow for each candle - df.loc[is_between, "orderflow"] = df.loc[is_between, "orderflow"].apply( - lambda _: trades_to_volumeprofile_with_total_delta_bid_ask( - trades_grouped_df, scale=config_orderflow["scale"] - ) - ) - # calculate imbalances for each candle's orderflow - df.loc[is_between, "imbalances"] = df.loc[is_between, "orderflow"].apply( - lambda x: trades_orderflow_to_imbalances( - x, - imbalance_ratio=config_orderflow["imbalance_ratio"], - imbalance_volume=config_orderflow["imbalance_volume"], - ) - ) + indices = dataframe.index[is_between].tolist() + # Add trades to each candle + trades_series.loc[indices] = [trades_grouped_df] * len(indices) - _stacked_imb = config_orderflow["stacked_imbalance_range"] - df.loc[is_between, "stacked_imbalances_bid"] = df.loc[ - is_between, "imbalances" - ].apply(lambda x: stacked_imbalance_bid(x, stacked_imbalance_range=_stacked_imb)) - df.loc[is_between, "stacked_imbalances_ask"] = df.loc[ - is_between, "imbalances" - ].apply(lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb)) + # Calculate orderflow for each candle + orderflow = trades_to_volumeprofile_with_total_delta_bid_ask( + trades_grouped_df, scale=config_orderflow["scale"] + ) + orderflow_series.loc[indices] = [orderflow] * len(indices) + # Calculate imbalances for each candle's orderflow + imbalances = trades_orderflow_to_imbalances( + orderflow, + imbalance_ratio=config_orderflow["imbalance_ratio"], + imbalance_volume=config_orderflow["imbalance_volume"], + ) + imbalances_series.loc[indices] = [imbalances] * len(indices) + + stacked_imbalance_range = config_orderflow["stacked_imbalance_range"] + stacked_imbalances_bid_series.loc[indices] = [ + stacked_imbalance_bid( + imbalances, stacked_imbalance_range=stacked_imbalance_range + ) + ] * len(indices) + stacked_imbalances_ask_series.loc[indices] = [ + stacked_imbalance_ask( + imbalances, stacked_imbalance_range=stacked_imbalance_range + ) + ] * len(indices) bid = np.where( trades_grouped_df["side"].str.contains("sell"), @@ -131,34 +146,30 @@ def populate_dataframe_with_trades( 0, ) deltas_per_trade = ask - bid - min_delta = 0 - max_delta = 0 - delta = 0 - for d in deltas_per_trade: - delta += d - if delta > max_delta: - max_delta = delta - if delta < min_delta: - min_delta = delta - df.loc[is_between, "max_delta"] = max_delta - df.loc[is_between, "min_delta"] = min_delta + min_delta = deltas_per_trade.cumsum().min() + max_delta = deltas_per_trade.cumsum().max() + dataframe.loc[indices, "max_delta"] = max_delta + dataframe.loc[indices, "min_delta"] = min_delta - df.loc[is_between, "bid"] = np.where( - trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0 - ).sum() - df.loc[is_between, "ask"] = np.where( - trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0 - ).sum() - df.loc[is_between, "delta"] = df.loc[is_between, "ask"] - df.loc[is_between, "bid"] - df.loc[is_between, "total_trades"] = len(trades_grouped_df) - # copy to avoid memory leaks - dataframe.loc[is_between] = df.loc[is_between].copy() + dataframe.loc[indices, "bid"] = bid.sum() + dataframe.loc[indices, "ask"] = ask.sum() + dataframe.loc[indices, "delta"] = ( + dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"] + ) + dataframe.loc[indices, "total_trades"] = len(trades_grouped_df) else: logger.debug(f"Found NO candles for trades starting with {candle_start}") logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds") + # Merge the complex data Series back into the DataFrame + dataframe["trades"] = trades_series + dataframe["orderflow"] = orderflow_series + dataframe["imbalances"] = imbalances_series + dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series + dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series + except Exception as e: - logger.exception("Error populating dataframe with trades:", e) + logger.exception("Error populating dataframe with trades") raise DependencyException(e) return dataframe diff --git a/tests/data/test_converter_public_trades.py b/tests/data/test_converter_public_trades.py index 9baadb868..cab4a738d 100644 --- a/tests/data/test_converter_public_trades.py +++ b/tests/data/test_converter_public_trades.py @@ -227,14 +227,15 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( "volume", "trades", "orderflow", + "imbalances", + "stacked_imbalances_bid", + "stacked_imbalances_ask", + "max_delta", + "min_delta", "bid", "ask", "delta", - "min_delta", - "max_delta", "total_trades", - "stacked_imbalances_bid", - "stacked_imbalances_ask", ] # Assert delta, bid, and ask values assert -50.519 == pytest.approx(row["delta"])