From d23c33a47f79f722babe127c2674c04995eccae3 Mon Sep 17 00:00:00 2001 From: Joe Schr <8218910+TheJoeSchr@users.noreply.github.com> Date: Fri, 21 Jun 2024 15:28:11 +0200 Subject: [PATCH 1/5] feat: add caching to populate_dataframe_with_trades --- freqtrade/data/converter/orderflow.py | 160 +++++++++++---------- tests/data/test_converter_public_trades.py | 9 +- 2 files changed, 90 insertions(+), 79 deletions(-) diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index 1b1d7a565..af6976e47 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -10,7 +10,7 @@ import pandas as pd from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, Config from freqtrade.exceptions import DependencyException - +from collections import OrderedDict logger = logging.getLogger(__name__) @@ -20,16 +20,25 @@ def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): Populates a dataframe with trades columns :param dataframe: Dataframe to populate """ - dataframe["trades"] = dataframe.apply(lambda _: [], axis=1) - dataframe["orderflow"] = dataframe.apply(lambda _: {}, axis=1) + # Initialize columns with appropriate dtypes + dataframe["trades"] = np.nan + dataframe["orderflow"] = np.nan + dataframe["imbalances"] = np.nan + dataframe["stacked_imbalances_bid"] = np.nan + dataframe["stacked_imbalances_ask"] = np.nan + dataframe["max_delta"] = np.nan + dataframe["min_delta"] = np.nan dataframe["bid"] = np.nan dataframe["ask"] = np.nan dataframe["delta"] = np.nan - dataframe["min_delta"] = np.nan - dataframe["max_delta"] = np.nan dataframe["total_trades"] = np.nan - dataframe["stacked_imbalances_bid"] = np.nan - dataframe["stacked_imbalances_ask"] = np.nan + + # Ensure the 'trades' column is of object type + dataframe["trades"] = dataframe["trades"].astype(object) + dataframe["orderflow"] = dataframe["orderflow"].astype(object) + dataframe["imbalances"] = dataframe["imbalances"].astype(object) + dataframe["stacked_imbalances_bid"] = dataframe["stacked_imbalances_bid"].astype(object) + dataframe["stacked_imbalances_ask"] = dataframe["stacked_imbalances_ask"].astype(object) def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): @@ -47,9 +56,12 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): df.drop(columns=["datetime"], inplace=True) -def populate_dataframe_with_trades( - config: Config, dataframe: pd.DataFrame, trades: pd.DataFrame -) -> pd.DataFrame: +# Global cache dictionary +cache_size = 1000 # TODO move that in config +cached_grouped_trades = OrderedDict() # TODO move that where? + + +def populate_dataframe_with_trades(config, dataframe, trades): """ Populates a dataframe with trades :param dataframe: Dataframe to populate @@ -61,7 +73,6 @@ def populate_dataframe_with_trades( # create columns for trades _init_dataframe_with_trades_columns(dataframe) - df = dataframe.copy() try: start_time = time.time() @@ -70,95 +81,94 @@ def populate_dataframe_with_trades( # get date of earliest max_candles candle max_candles = config_orderflow["max_candles"] - start_date = df.tail(max_candles).date.iat[0] + start_date = dataframe.tail(max_candles).date.iat[0] # slice of trades that are before current ohlcv candles to make groupby faster trades = trades.loc[trades.candle_start >= start_date] trades.reset_index(inplace=True, drop=True) # group trades by candle start - trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False) + trades_grouped_by_candle_start = trades.groupby("candle_start") - for candle_start in trades_grouped_by_candle_start.groups: - trades_grouped_df = trades[candle_start == trades["candle_start"]] - is_between = candle_start == df["date"] - if np.any(is_between == True): # noqa: E712 + + # Create Series to hold complex data + trades_series = pd.Series(index=dataframe.index, dtype=object) + orderflow_series = pd.Series(index=dataframe.index, dtype=object) + imbalances_series = pd.Series(index=dataframe.index, dtype=object) + stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object) + stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object) + + for candle_start, trades_grouped_df in trades_grouped_by_candle_start: + is_between = candle_start == dataframe["date"] + if is_between.any(): from freqtrade.exchange import timeframe_to_next_date - candle_next = timeframe_to_next_date(timeframe, candle_start) - # skip if there are no trades at next candle - # because that this candle isn't finished yet if candle_next not in trades_grouped_by_candle_start.groups: logger.warning( f"candle at {candle_start} with {len(trades_grouped_df)} trades " f"might be unfinished, because no finished trades at {candle_next}" ) - # add trades to each candle - df.loc[is_between, "trades"] = df.loc[is_between, "trades"].apply( - lambda _: trades_grouped_df - ) - # calculate orderflow for each candle - df.loc[is_between, "orderflow"] = df.loc[is_between, "orderflow"].apply( - lambda _: trades_to_volumeprofile_with_total_delta_bid_ask( - trades_grouped_df, scale=config_orderflow["scale"] - ) - ) - # calculate imbalances for each candle's orderflow - df.loc[is_between, "imbalances"] = df.loc[is_between, "orderflow"].apply( - lambda x: trades_orderflow_to_imbalances( - x, - imbalance_ratio=config_orderflow["imbalance_ratio"], - imbalance_volume=config_orderflow["imbalance_volume"], - ) - ) + # Use caching mechanism + if (candle_start, candle_next) in cached_grouped_trades: + cache_entry = cached_grouped_trades[(candle_start, candle_next)] + dataframe.loc[is_between] = cache_entry + continue - _stacked_imb = config_orderflow["stacked_imbalance_range"] - df.loc[is_between, "stacked_imbalances_bid"] = df.loc[ - is_between, "imbalances" - ].apply(lambda x: stacked_imbalance_bid(x, stacked_imbalance_range=_stacked_imb)) - df.loc[is_between, "stacked_imbalances_ask"] = df.loc[ - is_between, "imbalances" - ].apply(lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb)) + # Store trades in Series using integer indices + indices = dataframe.index[is_between].tolist() + trades_series.loc[indices] = [trades_grouped_df] * len(indices) - bid = np.where( - trades_grouped_df["side"].str.contains("sell"), - trades_grouped_df["amount"], - 0, + # Calculate orderflow for each candle + orderflow = trades_to_volumeprofile_with_total_delta_bid_ask( + trades_grouped_df, scale=config_orderflow["scale"] ) - ask = np.where( - trades_grouped_df["side"].str.contains("buy"), - trades_grouped_df["amount"], - 0, + orderflow_series.loc[indices] = [orderflow] * len(indices) + + # Calculate imbalances for each candle's orderflow + imbalances = trades_orderflow_to_imbalances( + orderflow, + imbalance_ratio=config_orderflow["imbalance_ratio"], + imbalance_volume=config_orderflow["imbalance_volume"], ) + imbalances_series.loc[indices] = [imbalances] * len(indices) + + stacked_imb = config_orderflow["stacked_imbalance_range"] + stacked_imbalances_bid_series.loc[indices] = [stacked_imbalance_bid(imbalances, stacked_imbalance_range=stacked_imb)] * len(indices) + stacked_imbalances_ask_series.loc[indices] = [stacked_imbalance_ask(imbalances, stacked_imbalance_range=stacked_imb)] * len(indices) + + bid = trades_grouped_df["side"].str.contains("sell").astype(int) * trades_grouped_df["amount"] + ask = trades_grouped_df["side"].str.contains("buy").astype(int) * trades_grouped_df["amount"] deltas_per_trade = ask - bid - min_delta = 0 - max_delta = 0 - delta = 0 - for d in deltas_per_trade: - delta += d - if delta > max_delta: - max_delta = delta - if delta < min_delta: - min_delta = delta - df.loc[is_between, "max_delta"] = max_delta - df.loc[is_between, "min_delta"] = min_delta - df.loc[is_between, "bid"] = np.where( - trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0 - ).sum() - df.loc[is_between, "ask"] = np.where( - trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0 - ).sum() - df.loc[is_between, "delta"] = df.loc[is_between, "ask"] - df.loc[is_between, "bid"] - df.loc[is_between, "total_trades"] = len(trades_grouped_df) - # copy to avoid memory leaks - dataframe.loc[is_between] = df.loc[is_between].copy() + min_delta = deltas_per_trade.cumsum().min() + max_delta = deltas_per_trade.cumsum().max() + dataframe.loc[indices, "max_delta"] = max_delta + dataframe.loc[indices, "min_delta"] = min_delta + + dataframe.loc[indices, "bid"] = bid.sum() + dataframe.loc[indices, "ask"] = ask.sum() + dataframe.loc[indices, "delta"] = dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"] + dataframe.loc[indices, "total_trades"] = len(trades_grouped_df) + + # Cache the result + cached_grouped_trades[(candle_start, candle_next)] = dataframe.loc[is_between].copy() + + # Maintain cache size + if len(cached_grouped_trades) > cache_size: + cached_grouped_trades.popitem(last=False) else: logger.debug(f"Found NO candles for trades starting with {candle_start}") logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds") + # Merge the complex data Series back into the DataFrame + dataframe["trades"] = trades_series + dataframe["orderflow"] = orderflow_series + dataframe["imbalances"] = imbalances_series + dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series + dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series + except Exception as e: - logger.exception("Error populating dataframe with trades:", e) + logger.exception("Error populating dataframe with trades") raise DependencyException(e) return dataframe diff --git a/tests/data/test_converter_public_trades.py b/tests/data/test_converter_public_trades.py index 9baadb868..cab4a738d 100644 --- a/tests/data/test_converter_public_trades.py +++ b/tests/data/test_converter_public_trades.py @@ -227,14 +227,15 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( "volume", "trades", "orderflow", + "imbalances", + "stacked_imbalances_bid", + "stacked_imbalances_ask", + "max_delta", + "min_delta", "bid", "ask", "delta", - "min_delta", - "max_delta", "total_trades", - "stacked_imbalances_bid", - "stacked_imbalances_ask", ] # Assert delta, bid, and ask values assert -50.519 == pytest.approx(row["delta"]) From ecd2118941bc107b56f3de68438674e1967e9a15 Mon Sep 17 00:00:00 2001 From: Joe Schr <8218910+TheJoeSchr@users.noreply.github.com> Date: Mon, 24 Jun 2024 17:14:22 +0200 Subject: [PATCH 2/5] fix: copying orderflow dataframe from cache doesn't work --- freqtrade/data/converter/orderflow.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index af6976e47..5ae4ad27f 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -111,7 +111,13 @@ def populate_dataframe_with_trades(config, dataframe, trades): # Use caching mechanism if (candle_start, candle_next) in cached_grouped_trades: cache_entry = cached_grouped_trades[(candle_start, candle_next)] - dataframe.loc[is_between] = cache_entry + # dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround: + # Create a dictionary of the column values to be assigned + update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns} + # Assign the values using the update_dict + dataframe.loc[is_between, update_dict.keys()] = pd.DataFrame( + [update_dict], index=dataframe.loc[is_between].index + ) continue # Store trades in Series using integer indices From 390373cb9b42c1d6fa8a72cf2a39b5280051c29f Mon Sep 17 00:00:00 2001 From: Joe Schr <8218910+TheJoeSchr@users.noreply.github.com> Date: Mon, 24 Jun 2024 17:15:39 +0200 Subject: [PATCH 3/5] orderflow: ruff format --- freqtrade/data/converter/orderflow.py | 30 ++++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index 5ae4ad27f..4e2986b77 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -58,7 +58,7 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): # Global cache dictionary cache_size = 1000 # TODO move that in config -cached_grouped_trades = OrderedDict() # TODO move that where? +cached_grouped_trades = OrderedDict() # TODO move that where? def populate_dataframe_with_trades(config, dataframe, trades): @@ -89,7 +89,6 @@ def populate_dataframe_with_trades(config, dataframe, trades): # group trades by candle start trades_grouped_by_candle_start = trades.groupby("candle_start") - # Create Series to hold complex data trades_series = pd.Series(index=dataframe.index, dtype=object) orderflow_series = pd.Series(index=dataframe.index, dtype=object) @@ -101,6 +100,7 @@ def populate_dataframe_with_trades(config, dataframe, trades): is_between = candle_start == dataframe["date"] if is_between.any(): from freqtrade.exchange import timeframe_to_next_date + candle_next = timeframe_to_next_date(timeframe, candle_start) if candle_next not in trades_grouped_by_candle_start.groups: logger.warning( @@ -139,11 +139,21 @@ def populate_dataframe_with_trades(config, dataframe, trades): imbalances_series.loc[indices] = [imbalances] * len(indices) stacked_imb = config_orderflow["stacked_imbalance_range"] - stacked_imbalances_bid_series.loc[indices] = [stacked_imbalance_bid(imbalances, stacked_imbalance_range=stacked_imb)] * len(indices) - stacked_imbalances_ask_series.loc[indices] = [stacked_imbalance_ask(imbalances, stacked_imbalance_range=stacked_imb)] * len(indices) + stacked_imbalances_bid_series.loc[indices] = [ + stacked_imbalance_bid(imbalances, stacked_imbalance_range=stacked_imb) + ] * len(indices) + stacked_imbalances_ask_series.loc[indices] = [ + stacked_imbalance_ask(imbalances, stacked_imbalance_range=stacked_imb) + ] * len(indices) - bid = trades_grouped_df["side"].str.contains("sell").astype(int) * trades_grouped_df["amount"] - ask = trades_grouped_df["side"].str.contains("buy").astype(int) * trades_grouped_df["amount"] + bid = ( + trades_grouped_df["side"].str.contains("sell").astype(int) + * trades_grouped_df["amount"] + ) + ask = ( + trades_grouped_df["side"].str.contains("buy").astype(int) + * trades_grouped_df["amount"] + ) deltas_per_trade = ask - bid min_delta = deltas_per_trade.cumsum().min() @@ -153,11 +163,15 @@ def populate_dataframe_with_trades(config, dataframe, trades): dataframe.loc[indices, "bid"] = bid.sum() dataframe.loc[indices, "ask"] = ask.sum() - dataframe.loc[indices, "delta"] = dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"] + dataframe.loc[indices, "delta"] = ( + dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"] + ) dataframe.loc[indices, "total_trades"] = len(trades_grouped_df) # Cache the result - cached_grouped_trades[(candle_start, candle_next)] = dataframe.loc[is_between].copy() + cached_grouped_trades[(candle_start, candle_next)] = dataframe.loc[ + is_between + ].copy() # Maintain cache size if len(cached_grouped_trades) > cache_size: From 54df6f5b9c74c777037c6861412f2297d66394a2 Mon Sep 17 00:00:00 2001 From: Joe Schr <8218910+TheJoeSchr@users.noreply.github.com> Date: Mon, 24 Jun 2024 17:21:29 +0200 Subject: [PATCH 4/5] orderflow: adds `cache_size` to config --- docs/advanced-orderflow.md | 2 ++ freqtrade/constants.py | 1 + freqtrade/data/converter/orderflow.py | 11 +++++------ tests/data/test_converter_public_trades.py | 5 ++++- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/docs/advanced-orderflow.md b/docs/advanced-orderflow.md index 54655d488..c1091d8ca 100644 --- a/docs/advanced-orderflow.md +++ b/docs/advanced-orderflow.md @@ -21,6 +21,7 @@ This guide walks you through utilizing public trade data for advanced orderflow 2. **Configure Orderflow Processing:** Define your desired settings for orderflow processing within the orderflow section of config.json. Here, you can adjust factors like: +- `cache_size`: How many previous orderflow candles are saved into cache instead of calculated every new candle - `max_candles`: Filter how many candles get processed from the tail - `scale`: This controls the price bin size for the footprint chart. - `stacked_imbalance_range`: Defines the minimum consecutive imbalanced price levels required for consideration. @@ -29,6 +30,7 @@ This guide walks you through utilizing public trade data for advanced orderflow ```json "orderflow": { + "cache_size": 1000, "max_candles": 1500, "scale": 0.5, "stacked_imbalance_range": 3, // needs at least this amount of imbalance next to each other diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 1105f4ffe..92f55bd6b 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -537,6 +537,7 @@ CONF_SCHEMA = { "orderflow": { "type": "object", "properties": { + "cache_size": {"type": "number", "minimum": 1, "default": 1000}, "max_candles": {"type": "number", "minimum": 1, "default": 1500}, "scale": {"type": "number", "minimum": 0.0}, "stacked_imbalance_range": {"type": "number", "minimum": 0}, diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index 4e2986b77..1667db301 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -14,6 +14,9 @@ from collections import OrderedDict logger = logging.getLogger(__name__) +# Global cache dictionary +cached_grouped_trades = OrderedDict() + def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): """ @@ -56,11 +59,6 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): df.drop(columns=["datetime"], inplace=True) -# Global cache dictionary -cache_size = 1000 # TODO move that in config -cached_grouped_trades = OrderedDict() # TODO move that where? - - def populate_dataframe_with_trades(config, dataframe, trades): """ Populates a dataframe with trades @@ -68,8 +66,9 @@ def populate_dataframe_with_trades(config, dataframe, trades): :param trades: Trades to populate with :return: Dataframe with trades populated """ - config_orderflow = config["orderflow"] timeframe = config["timeframe"] + config_orderflow = config["orderflow"] + cache_size = config_orderflow["cache_size"] # create columns for trades _init_dataframe_with_trades_columns(dataframe) diff --git a/tests/data/test_converter_public_trades.py b/tests/data/test_converter_public_trades.py index cab4a738d..4388b584f 100644 --- a/tests/data/test_converter_public_trades.py +++ b/tests/data/test_converter_public_trades.py @@ -90,6 +90,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( config = { "timeframe": "5m", "orderflow": { + "cache_size": 1000, "max_candles": 1500, "scale": 0.005, "imbalance_volume": 0, @@ -201,6 +202,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( config = { "timeframe": "5m", "orderflow": { + "cache_size": 1000, "max_candles": 1500, "scale": 0.5, "imbalance_volume": 0, @@ -243,7 +245,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( assert 169.442 == row["ask"] # Assert the number of trades - assert 151 == len(row.trades) + assert 151 == len(row["trades"]) # Assert specific details of the first trade t = row["trades"].iloc[0] @@ -367,6 +369,7 @@ def test_public_trades_config_max_trades( orderflow_config = { "timeframe": "5m", "orderflow": { + "cache_size": 1000, "max_candles": 1, "scale": 0.005, "imbalance_volume": 0, From 4735835aabad4b8224e0b2a46d76c07abb0dbc57 Mon Sep 17 00:00:00 2001 From: Joe Schr <8218910+TheJoeSchr@users.noreply.github.com> Date: Mon, 24 Jun 2024 17:40:54 +0200 Subject: [PATCH 5/5] orderflow tests: reset orderflow cache between tests --- tests/data/test_converter_public_trades.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/data/test_converter_public_trades.py b/tests/data/test_converter_public_trades.py index 4388b584f..8e5b73d62 100644 --- a/tests/data/test_converter_public_trades.py +++ b/tests/data/test_converter_public_trades.py @@ -6,6 +6,8 @@ from freqtrade.constants import DEFAULT_TRADES_COLUMNS from freqtrade.data.converter import populate_dataframe_with_trades from freqtrade.data.converter.orderflow import trades_to_volumeprofile_with_total_delta_bid_ask from freqtrade.data.converter.trade_converter import trades_list_to_df +from collections import OrderedDict +from importlib import reload BIN_SIZE_SCALE = 0.5 @@ -47,6 +49,15 @@ def public_trades_list_simple(testdatadir): return read_csv(testdatadir / "orderflow/public_trades_list_simple_example.csv").copy() +@pytest.fixture +def reset_cache(request): + import freqtrade.data.converter.orderflow as orderflow + + global orderflow + orderflow.cached_grouped_trades = OrderedDict() + yield + + def test_public_trades_columns_before_change( populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): @@ -71,7 +82,7 @@ def test_public_trades_columns_before_change( def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( - populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades + reset_cache, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): """ Tests the `populate_dataframe_with_trades` function's order flow calculation. @@ -170,7 +181,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( - populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades + reset_cache, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): """ Tests the `populate_dataframe_with_trades` function's handling of trades,