diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index 52256e7b4..50b298c75 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -4,19 +4,31 @@ Functions to convert orderflow data from public_trades import logging import time -from collections import OrderedDict from datetime import datetime import numpy as np import pandas as pd from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, Config -from freqtrade.enums import RunMode from freqtrade.exceptions import DependencyException logger = logging.getLogger(__name__) +ORDERFLOW_ADDED_COLUMNS = [ + "trades", + "orderflow", + "imbalances", + "stacked_imbalances_bid", + "stacked_imbalances_ask", + "max_delta", + "min_delta", + "bid", + "ask", + "delta", + "total_trades", +] + def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): """ @@ -24,24 +36,18 @@ def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): :param dataframe: Dataframe to populate """ # Initialize columns with appropriate dtypes - dataframe["trades"] = np.nan - dataframe["orderflow"] = np.nan - dataframe["imbalances"] = np.nan - dataframe["stacked_imbalances_bid"] = np.nan - dataframe["stacked_imbalances_ask"] = np.nan - dataframe["max_delta"] = np.nan - dataframe["min_delta"] = np.nan - dataframe["bid"] = np.nan - dataframe["ask"] = np.nan - dataframe["delta"] = np.nan - dataframe["total_trades"] = np.nan + for column in ORDERFLOW_ADDED_COLUMNS: + dataframe[column] = np.nan - # Ensure the 'trades' column is of object type - dataframe["trades"] = dataframe["trades"].astype(object) - dataframe["orderflow"] = dataframe["orderflow"].astype(object) - dataframe["imbalances"] = dataframe["imbalances"].astype(object) - dataframe["stacked_imbalances_bid"] = dataframe["stacked_imbalances_bid"].astype(object) - dataframe["stacked_imbalances_ask"] = dataframe["stacked_imbalances_ask"].astype(object) + # Set columns to object type + for column in ( + "trades", + "orderflow", + "imbalances", + "stacked_imbalances_bid", + "stacked_imbalances_ask", + ): + dataframe[column] = dataframe[column].astype(object) def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): @@ -60,17 +66,18 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): def populate_dataframe_with_trades( - cached_grouped_trades: OrderedDict[tuple[datetime, datetime], pd.DataFrame], + cached_grouped_trades: pd.DataFrame | None, config: Config, dataframe: pd.DataFrame, trades: pd.DataFrame, -) -> tuple[pd.DataFrame, OrderedDict[tuple[datetime, datetime], pd.DataFrame]]: +) -> tuple[pd.DataFrame, pd.DataFrame]: """ Populates a dataframe with trades :param dataframe: Dataframe to populate :param trades: Trades to populate with :return: Dataframe with trades populated """ + timeframe = config["timeframe"] config_orderflow = config["orderflow"] @@ -91,13 +98,6 @@ def populate_dataframe_with_trades( trades = trades.loc[trades["candle_start"] >= start_date] trades.reset_index(inplace=True, drop=True) - # Create Series to hold complex data - trades_series = pd.Series(index=dataframe.index, dtype=object) - orderflow_series = pd.Series(index=dataframe.index, dtype=object) - imbalances_series = pd.Series(index=dataframe.index, dtype=object) - stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object) - stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object) - # group trades by candle start trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False) @@ -105,58 +105,47 @@ def populate_dataframe_with_trades( for candle_start, trades_grouped_df in trades_grouped_by_candle_start: is_between = candle_start == dataframe["date"] if is_between.any(): - from freqtrade.exchange import timeframe_to_next_date + # there can only be one row with the same date + index = dataframe.index[is_between][0] - candle_next = timeframe_to_next_date(timeframe, candle_start) - if candle_next not in trades_grouped_by_candle_start.groups: - logger.warning( - f"candle at {candle_start} with {len(trades_grouped_df)} trades " - f"might be unfinished, because no finished trades at {candle_next}" - ) - - # Use caching mechanism - if (candle_start, candle_next) in cached_grouped_trades: - cache_entry = cached_grouped_trades[(candle_start, candle_next)] - # dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround: - # Create a dictionary of the column values to be assigned - update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns} - # Assign the values using the update_dict - dataframe.loc[is_between, update_dict.keys()] = pd.DataFrame( - [update_dict], index=dataframe.loc[is_between].index - ) + if ( + cached_grouped_trades is not None + and (candle_start == cached_grouped_trades["date"]).any() + ): + # Check if the trades are already in the cache + cache_idx = cached_grouped_trades.index[ + cached_grouped_trades["date"] == candle_start + ][0] + for col in ORDERFLOW_ADDED_COLUMNS: + dataframe.at[index, col] = cached_grouped_trades.at[cache_idx, col] continue - indices = dataframe.index[is_between].tolist() - # Add trades to each candle - trades_series.loc[indices] = [ - trades_grouped_df.drop(columns=["candle_start", "candle_end"]).to_dict( - orient="records" - ) - ] + dataframe.at[index, "trades"] = trades_grouped_df.drop( + columns=["candle_start", "candle_end"] + ).to_dict(orient="records") + # Calculate orderflow for each candle orderflow = trades_to_volumeprofile_with_total_delta_bid_ask( trades_grouped_df, scale=config_orderflow["scale"] ) - orderflow_series.loc[indices] = [orderflow.to_dict(orient="index")] + dataframe.at[index, "orderflow"] = orderflow.to_dict(orient="index") + # orderflow_series.loc[[index]] = [orderflow.to_dict(orient="index")] # Calculate imbalances for each candle's orderflow imbalances = trades_orderflow_to_imbalances( orderflow, imbalance_ratio=config_orderflow["imbalance_ratio"], imbalance_volume=config_orderflow["imbalance_volume"], ) - imbalances_series.loc[indices] = [imbalances.to_dict(orient="index")] + dataframe.at[index, "imbalances"] = imbalances.to_dict(orient="index") stacked_imbalance_range = config_orderflow["stacked_imbalance_range"] - stacked_imbalances_bid_series.loc[indices] = [ - stacked_imbalance_bid( - imbalances, stacked_imbalance_range=stacked_imbalance_range - ) - ] - stacked_imbalances_ask_series.loc[indices] = [ - stacked_imbalance_ask( - imbalances, stacked_imbalance_range=stacked_imbalance_range - ) - ] + dataframe.at[index, "stacked_imbalances_bid"] = stacked_imbalance_bid( + imbalances, stacked_imbalance_range=stacked_imbalance_range + ) + + dataframe.at[index, "stacked_imbalances_ask"] = stacked_imbalance_ask( + imbalances, stacked_imbalance_range=stacked_imbalance_range + ) bid = np.where( trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0 @@ -166,39 +155,20 @@ def populate_dataframe_with_trades( trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0 ) deltas_per_trade = ask - bid - min_delta = deltas_per_trade.cumsum().min() - max_delta = deltas_per_trade.cumsum().max() - dataframe.loc[indices, "max_delta"] = max_delta - dataframe.loc[indices, "min_delta"] = min_delta + dataframe.at[index, "max_delta"] = deltas_per_trade.cumsum().max() + dataframe.at[index, "min_delta"] = deltas_per_trade.cumsum().min() - dataframe.loc[indices, "bid"] = bid.sum() - dataframe.loc[indices, "ask"] = ask.sum() - dataframe.loc[indices, "delta"] = ( - dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"] + dataframe.at[index, "bid"] = bid.sum() + dataframe.at[index, "ask"] = ask.sum() + dataframe.at[index, "delta"] = ( + dataframe.at[index, "ask"] - dataframe.at[index, "bid"] ) - dataframe.loc[indices, "total_trades"] = len(trades_grouped_df) + dataframe.at[index, "total_trades"] = len(trades_grouped_df) - # Cache the result - cached_grouped_trades[(candle_start, candle_next)] = dataframe.loc[ - is_between - ].copy() - - # Maintain cache size - if ( - config.get("runmode") in (RunMode.DRY_RUN, RunMode.LIVE) - and len(cached_grouped_trades) > config_orderflow["cache_size"] - ): - cached_grouped_trades.popitem(last=False) - else: - logger.debug(f"Found NO candles for trades starting with {candle_start}") logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds") - # Merge the complex data Series back into the DataFrame - dataframe["trades"] = trades_series - dataframe["orderflow"] = orderflow_series - dataframe["imbalances"] = imbalances_series - dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series - dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series + # Cache the entire dataframe + cached_grouped_trades = dataframe.tail(config_orderflow["cache_size"]).copy() except Exception as e: logger.exception("Error populating dataframe with trades") diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 1a032f3a2..73d3acfa9 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -5,7 +5,6 @@ This module defines the interface to apply for strategies import logging from abc import ABC, abstractmethod -from collections import OrderedDict from datetime import datetime, timedelta, timezone from math import isinf, isnan @@ -141,9 +140,7 @@ class IStrategy(ABC, HyperStrategyMixin): market_direction: MarketDirection = MarketDirection.NONE # Global cache dictionary - _cached_grouped_trades_per_pair: dict[ - str, OrderedDict[tuple[datetime, datetime], DataFrame] - ] = {} + _cached_grouped_trades_per_pair: dict[str, DataFrame] = {} def __init__(self, config: Config) -> None: self.config = config @@ -1604,15 +1601,11 @@ class IStrategy(ABC, HyperStrategyMixin): if use_public_trades: trades = self.dp.trades(pair=metadata["pair"], copy=False) - config = self.config - config["timeframe"] = self.timeframe pair = metadata["pair"] # TODO: slice trades to size of dataframe for faster backtesting - cached_grouped_trades: OrderedDict[tuple[datetime, datetime], DataFrame] = ( - self._cached_grouped_trades_per_pair.get(pair, OrderedDict()) - ) + cached_grouped_trades: DataFrame | None = self._cached_grouped_trades_per_pair.get(pair) dataframe, cached_grouped_trades = populate_dataframe_with_trades( - cached_grouped_trades, config, dataframe, trades + cached_grouped_trades, self.config, dataframe, trades ) # dereference old cache diff --git a/tests/data/test_converter_orderflow.py b/tests/data/test_converter_orderflow.py index 9a337da91..96f550020 100644 --- a/tests/data/test_converter_orderflow.py +++ b/tests/data/test_converter_orderflow.py @@ -1,13 +1,16 @@ -from collections import OrderedDict - import numpy as np import pandas as pd import pytest from freqtrade.constants import DEFAULT_TRADES_COLUMNS from freqtrade.data.converter import populate_dataframe_with_trades -from freqtrade.data.converter.orderflow import trades_to_volumeprofile_with_total_delta_bid_ask +from freqtrade.data.converter.orderflow import ( + ORDERFLOW_ADDED_COLUMNS, + trades_to_volumeprofile_with_total_delta_bid_ask, +) from freqtrade.data.converter.trade_converter import trades_list_to_df +from freqtrade.data.dataprovider import DataProvider +from tests.strategy.strats.strategy_test_v3 import StrategyTestV3 BIN_SIZE_SCALE = 0.5 @@ -37,6 +40,7 @@ def populate_dataframe_with_trades_trades(testdatadir): @pytest.fixture def candles(testdatadir): + # TODO: this fixture isn't really necessary and could be removed return pd.read_json(testdatadir / "orderflow/candles.json").copy() @@ -102,7 +106,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( }, } # Apply the function to populate the data frame with order flow data - df, _ = populate_dataframe_with_trades(OrderedDict(), config, dataframe, trades) + df, _ = populate_dataframe_with_trades(None, config, dataframe, trades) # Extract results from the first row of the DataFrame results = df.iloc[0] t = results["trades"] @@ -243,7 +247,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( } # Populate the DataFrame with trades and order flow data - df, _ = populate_dataframe_with_trades(OrderedDict(), config, dataframe, trades) + df, _ = populate_dataframe_with_trades(None, config, dataframe, trades) # --- DataFrame and Trade Data Validation --- @@ -401,9 +405,7 @@ def test_public_trades_config_max_trades( }, } - df, _ = populate_dataframe_with_trades( - OrderedDict(), default_conf | orderflow_config, dataframe, trades - ) + df, _ = populate_dataframe_with_trades(None, default_conf | orderflow_config, dataframe, trades) assert df.delta.count() == 1 @@ -482,3 +484,83 @@ def test_public_trades_testdata_sanity( "cost", "date", ] + + +def test_analyze_with_orderflow( + default_conf_usdt, + mocker, + populate_dataframe_with_trades_dataframe, + populate_dataframe_with_trades_trades, +): + ohlcv_history = populate_dataframe_with_trades_dataframe + # call without orderflow + strategy = StrategyTestV3(config=default_conf_usdt) + strategy.dp = DataProvider(default_conf_usdt, None, None) + + mocker.patch.object(strategy.dp, "trades", return_value=populate_dataframe_with_trades_trades) + import freqtrade.data.converter.orderflow as orderflow_module + + spy = mocker.spy(orderflow_module, "trades_to_volumeprofile_with_total_delta_bid_ask") + + pair = "ETH/BTC" + df = strategy.advise_indicators(ohlcv_history, {"pair:": pair}) + assert len(df) == len(ohlcv_history) + assert "open" in df.columns + assert spy.call_count == 0 + + # Not expected to run - shouldn't have added orderflow columns + for col in ORDERFLOW_ADDED_COLUMNS: + assert col not in df.columns, f"Column {col} found in df.columns" + + default_conf_usdt["exchange"]["use_public_trades"] = True + default_conf_usdt["orderflow"] = { + "cache_size": 5, + "max_candles": 5, + "scale": 0.005, + "imbalance_volume": 0, + "imbalance_ratio": 3, + "stacked_imbalance_range": 3, + } + + strategy.config = default_conf_usdt + # First round - builds cache + df1 = strategy.advise_indicators(ohlcv_history, {"pair": pair}) + assert len(df1) == len(ohlcv_history) + assert "open" in df1.columns + assert spy.call_count == 5 + + for col in ORDERFLOW_ADDED_COLUMNS: + assert col in df1.columns, f"Column {col} not found in df.columns" + + if col not in ("stacked_imbalances_bid", "stacked_imbalances_ask"): + assert df1[col].count() == 5, f"Column {col} has {df1[col].count()} non-NaN values" + + assert len(strategy._cached_grouped_trades_per_pair[pair]) == 5 + + lastval_trades = df1.at[len(df1) - 1, "trades"] + assert isinstance(lastval_trades, list) + assert len(lastval_trades) == 122 + + lastval_of = df1.at[len(df1) - 1, "orderflow"] + assert isinstance(lastval_of, dict) + + spy.reset_mock() + # Ensure caching works - call the same logic again. + df2 = strategy.advise_indicators(ohlcv_history, {"pair": pair}) + assert len(df2) == len(ohlcv_history) + assert "open" in df2.columns + assert spy.call_count == 0 + for col in ORDERFLOW_ADDED_COLUMNS: + assert col in df2.columns, f"Round2: Column {col} not found in df.columns" + + if col not in ("stacked_imbalances_bid", "stacked_imbalances_ask"): + assert ( + df2[col].count() == 5 + ), f"Round2: Column {col} has {df2[col].count()} non-NaN values" + + lastval_trade2 = df2.at[len(df2) - 1, "trades"] + assert isinstance(lastval_trade2, list) + assert len(lastval_trade2) == 122 + + lastval_of2 = df2.at[len(df2) - 1, "orderflow"] + assert isinstance(lastval_of2, dict)