diff --git a/docs/advanced-orderflow.md b/docs/advanced-orderflow.md index 54655d488..c1091d8ca 100644 --- a/docs/advanced-orderflow.md +++ b/docs/advanced-orderflow.md @@ -21,6 +21,7 @@ This guide walks you through utilizing public trade data for advanced orderflow 2. **Configure Orderflow Processing:** Define your desired settings for orderflow processing within the orderflow section of config.json. Here, you can adjust factors like: +- `cache_size`: How many previous orderflow candles are saved into cache instead of calculated every new candle - `max_candles`: Filter how many candles get processed from the tail - `scale`: This controls the price bin size for the footprint chart. - `stacked_imbalance_range`: Defines the minimum consecutive imbalanced price levels required for consideration. @@ -29,6 +30,7 @@ This guide walks you through utilizing public trade data for advanced orderflow ```json "orderflow": { + "cache_size": 1000, "max_candles": 1500, "scale": 0.5, "stacked_imbalance_range": 3, // needs at least this amount of imbalance next to each other diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 1105f4ffe..92f55bd6b 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -537,6 +537,7 @@ CONF_SCHEMA = { "orderflow": { "type": "object", "properties": { + "cache_size": {"type": "number", "minimum": 1, "default": 1000}, "max_candles": {"type": "number", "minimum": 1, "default": 1500}, "scale": {"type": "number", "minimum": 0.0}, "stacked_imbalance_range": {"type": "number", "minimum": 0}, diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index 77acfe902..bd9e7553b 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -4,16 +4,20 @@ Functions to convert orderflow data from public_trades import logging import time +from collections import OrderedDict import numpy as np import pandas as pd -from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, Config +from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS from freqtrade.exceptions import DependencyException logger = logging.getLogger(__name__) +# Global cache dictionary +cached_grouped_trades: OrderedDict[pd.Timestamp, pd.DataFrame] = OrderedDict() + def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): """ @@ -56,17 +60,16 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): df.drop(columns=["datetime"], inplace=True) -def populate_dataframe_with_trades( - config: Config, dataframe: pd.DataFrame, trades: pd.DataFrame -) -> pd.DataFrame: +def populate_dataframe_with_trades(config, dataframe, trades): """ Populates a dataframe with trades :param dataframe: Dataframe to populate :param trades: Trades to populate with :return: Dataframe with trades populated """ - config_orderflow = config["orderflow"] timeframe = config["timeframe"] + config_orderflow = config["orderflow"] + cache_size = config_orderflow["cache_size"] # create columns for trades _init_dataframe_with_trades_columns(dataframe) @@ -92,14 +95,13 @@ def populate_dataframe_with_trades( stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object) stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object) + trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False) for candle_start, trades_grouped_df in trades_grouped_by_candle_start: is_between = candle_start == dataframe["date"] if is_between.any(): from freqtrade.exchange import timeframe_to_next_date candle_next = timeframe_to_next_date(timeframe, candle_start) - # skip if there are no trades at next candle - # because that this candle isn't finished yet if candle_next not in trades_grouped_by_candle_start.groups: logger.warning( f"candle at {candle_start} with {len(trades_grouped_df)} trades " @@ -109,6 +111,17 @@ def populate_dataframe_with_trades( indices = dataframe.index[is_between].tolist() # Add trades to each candle trades_series.loc[indices] = [trades_grouped_df] * len(indices) + # Use caching mechanism + if (candle_start, candle_next) in cached_grouped_trades: + cache_entry = cached_grouped_trades[(candle_start, candle_next)] + # dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround: + # Create a dictionary of the column values to be assigned + update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns} + # Assign the values using the update_dict + dataframe.loc[is_between, update_dict.keys()] = pd.DataFrame( + [update_dict], index=dataframe.loc[is_between].index + ) + continue # Calculate orderflow for each candle orderflow = trades_to_volumeprofile_with_total_delta_bid_ask( @@ -136,14 +149,11 @@ def populate_dataframe_with_trades( ] * len(indices) bid = np.where( - trades_grouped_df["side"].str.contains("sell"), - trades_grouped_df["amount"], - 0, + trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0 ) + ask = np.where( - trades_grouped_df["side"].str.contains("buy"), - trades_grouped_df["amount"], - 0, + trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0 ) deltas_per_trade = ask - bid min_delta = deltas_per_trade.cumsum().min() @@ -157,6 +167,15 @@ def populate_dataframe_with_trades( dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"] ) dataframe.loc[indices, "total_trades"] = len(trades_grouped_df) + + # Cache the result + cached_grouped_trades[(candle_start, candle_next)] = dataframe.loc[ + is_between + ].copy() + + # Maintain cache size + if len(cached_grouped_trades) > cache_size: + cached_grouped_trades.popitem(last=False) else: logger.debug(f"Found NO candles for trades starting with {candle_start}") logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds") diff --git a/tests/data/test_converter_public_trades.py b/tests/data/test_converter_public_trades.py index cab4a738d..1cb43bc9a 100644 --- a/tests/data/test_converter_public_trades.py +++ b/tests/data/test_converter_public_trades.py @@ -1,3 +1,5 @@ +from collections import OrderedDict + import numpy as np import pandas as pd import pytest @@ -47,6 +49,15 @@ def public_trades_list_simple(testdatadir): return read_csv(testdatadir / "orderflow/public_trades_list_simple_example.csv").copy() +@pytest.fixture +def reset_cache(request): + import freqtrade.data.converter.orderflow as orderflow + + global orderflow # noqa F811 + orderflow.cached_grouped_trades = OrderedDict() + yield + + def test_public_trades_columns_before_change( populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): @@ -71,7 +82,7 @@ def test_public_trades_columns_before_change( def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( - populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades + reset_cache, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): """ Tests the `populate_dataframe_with_trades` function's order flow calculation. @@ -90,6 +101,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( config = { "timeframe": "5m", "orderflow": { + "cache_size": 1000, "max_candles": 1500, "scale": 0.005, "imbalance_volume": 0, @@ -169,7 +181,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( - populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades + reset_cache, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): """ Tests the `populate_dataframe_with_trades` function's handling of trades, @@ -201,6 +213,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( config = { "timeframe": "5m", "orderflow": { + "cache_size": 1000, "max_candles": 1500, "scale": 0.5, "imbalance_volume": 0, @@ -243,7 +256,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( assert 169.442 == row["ask"] # Assert the number of trades - assert 151 == len(row.trades) + assert 151 == len(row["trades"]) # Assert specific details of the first trade t = row["trades"].iloc[0] @@ -367,6 +380,7 @@ def test_public_trades_config_max_trades( orderflow_config = { "timeframe": "5m", "orderflow": { + "cache_size": 1000, "max_candles": 1, "scale": 0.005, "imbalance_volume": 0,