Merge pull request #11046 from freqtrade/fix/orderflow_cache

Fix orderflow cache
This commit is contained in:
Matthias
2024-12-12 06:49:58 +01:00
committed by GitHub
3 changed files with 156 additions and 111 deletions

View File

@@ -4,19 +4,31 @@ Functions to convert orderflow data from public_trades
import logging
import time
from collections import OrderedDict
from datetime import datetime
import numpy as np
import pandas as pd
from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, Config
from freqtrade.enums import RunMode
from freqtrade.exceptions import DependencyException
logger = logging.getLogger(__name__)
ORDERFLOW_ADDED_COLUMNS = [
"trades",
"orderflow",
"imbalances",
"stacked_imbalances_bid",
"stacked_imbalances_ask",
"max_delta",
"min_delta",
"bid",
"ask",
"delta",
"total_trades",
]
def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame):
"""
@@ -24,24 +36,18 @@ def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame):
:param dataframe: Dataframe to populate
"""
# Initialize columns with appropriate dtypes
dataframe["trades"] = np.nan
dataframe["orderflow"] = np.nan
dataframe["imbalances"] = np.nan
dataframe["stacked_imbalances_bid"] = np.nan
dataframe["stacked_imbalances_ask"] = np.nan
dataframe["max_delta"] = np.nan
dataframe["min_delta"] = np.nan
dataframe["bid"] = np.nan
dataframe["ask"] = np.nan
dataframe["delta"] = np.nan
dataframe["total_trades"] = np.nan
for column in ORDERFLOW_ADDED_COLUMNS:
dataframe[column] = np.nan
# Ensure the 'trades' column is of object type
dataframe["trades"] = dataframe["trades"].astype(object)
dataframe["orderflow"] = dataframe["orderflow"].astype(object)
dataframe["imbalances"] = dataframe["imbalances"].astype(object)
dataframe["stacked_imbalances_bid"] = dataframe["stacked_imbalances_bid"].astype(object)
dataframe["stacked_imbalances_ask"] = dataframe["stacked_imbalances_ask"].astype(object)
# Set columns to object type
for column in (
"trades",
"orderflow",
"imbalances",
"stacked_imbalances_bid",
"stacked_imbalances_ask",
):
dataframe[column] = dataframe[column].astype(object)
def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
@@ -60,17 +66,18 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
def populate_dataframe_with_trades(
cached_grouped_trades: OrderedDict[tuple[datetime, datetime], pd.DataFrame],
cached_grouped_trades: pd.DataFrame | None,
config: Config,
dataframe: pd.DataFrame,
trades: pd.DataFrame,
) -> tuple[pd.DataFrame, OrderedDict[tuple[datetime, datetime], pd.DataFrame]]:
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
:param trades: Trades to populate with
:return: Dataframe with trades populated
"""
timeframe = config["timeframe"]
config_orderflow = config["orderflow"]
@@ -91,13 +98,6 @@ def populate_dataframe_with_trades(
trades = trades.loc[trades["candle_start"] >= start_date]
trades.reset_index(inplace=True, drop=True)
# Create Series to hold complex data
trades_series = pd.Series(index=dataframe.index, dtype=object)
orderflow_series = pd.Series(index=dataframe.index, dtype=object)
imbalances_series = pd.Series(index=dataframe.index, dtype=object)
stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object)
stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False)
@@ -105,58 +105,47 @@ def populate_dataframe_with_trades(
for candle_start, trades_grouped_df in trades_grouped_by_candle_start:
is_between = candle_start == dataframe["date"]
if is_between.any():
from freqtrade.exchange import timeframe_to_next_date
# there can only be one row with the same date
index = dataframe.index[is_between][0]
candle_next = timeframe_to_next_date(timeframe, candle_start)
if candle_next not in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades "
f"might be unfinished, because no finished trades at {candle_next}"
)
# Use caching mechanism
if (candle_start, candle_next) in cached_grouped_trades:
cache_entry = cached_grouped_trades[(candle_start, candle_next)]
# dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround:
# Create a dictionary of the column values to be assigned
update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns}
# Assign the values using the update_dict
dataframe.loc[is_between, update_dict.keys()] = pd.DataFrame(
[update_dict], index=dataframe.loc[is_between].index
)
if (
cached_grouped_trades is not None
and (candle_start == cached_grouped_trades["date"]).any()
):
# Check if the trades are already in the cache
cache_idx = cached_grouped_trades.index[
cached_grouped_trades["date"] == candle_start
][0]
for col in ORDERFLOW_ADDED_COLUMNS:
dataframe.at[index, col] = cached_grouped_trades.at[cache_idx, col]
continue
indices = dataframe.index[is_between].tolist()
# Add trades to each candle
trades_series.loc[indices] = [
trades_grouped_df.drop(columns=["candle_start", "candle_end"]).to_dict(
orient="records"
)
]
dataframe.at[index, "trades"] = trades_grouped_df.drop(
columns=["candle_start", "candle_end"]
).to_dict(orient="records")
# Calculate orderflow for each candle
orderflow = trades_to_volumeprofile_with_total_delta_bid_ask(
trades_grouped_df, scale=config_orderflow["scale"]
)
orderflow_series.loc[indices] = [orderflow.to_dict(orient="index")]
dataframe.at[index, "orderflow"] = orderflow.to_dict(orient="index")
# orderflow_series.loc[[index]] = [orderflow.to_dict(orient="index")]
# Calculate imbalances for each candle's orderflow
imbalances = trades_orderflow_to_imbalances(
orderflow,
imbalance_ratio=config_orderflow["imbalance_ratio"],
imbalance_volume=config_orderflow["imbalance_volume"],
)
imbalances_series.loc[indices] = [imbalances.to_dict(orient="index")]
dataframe.at[index, "imbalances"] = imbalances.to_dict(orient="index")
stacked_imbalance_range = config_orderflow["stacked_imbalance_range"]
stacked_imbalances_bid_series.loc[indices] = [
stacked_imbalance_bid(
imbalances, stacked_imbalance_range=stacked_imbalance_range
)
]
stacked_imbalances_ask_series.loc[indices] = [
stacked_imbalance_ask(
imbalances, stacked_imbalance_range=stacked_imbalance_range
)
]
dataframe.at[index, "stacked_imbalances_bid"] = stacked_imbalance_bid(
imbalances, stacked_imbalance_range=stacked_imbalance_range
)
dataframe.at[index, "stacked_imbalances_ask"] = stacked_imbalance_ask(
imbalances, stacked_imbalance_range=stacked_imbalance_range
)
bid = np.where(
trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0
@@ -166,39 +155,20 @@ def populate_dataframe_with_trades(
trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0
)
deltas_per_trade = ask - bid
min_delta = deltas_per_trade.cumsum().min()
max_delta = deltas_per_trade.cumsum().max()
dataframe.loc[indices, "max_delta"] = max_delta
dataframe.loc[indices, "min_delta"] = min_delta
dataframe.at[index, "max_delta"] = deltas_per_trade.cumsum().max()
dataframe.at[index, "min_delta"] = deltas_per_trade.cumsum().min()
dataframe.loc[indices, "bid"] = bid.sum()
dataframe.loc[indices, "ask"] = ask.sum()
dataframe.loc[indices, "delta"] = (
dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"]
dataframe.at[index, "bid"] = bid.sum()
dataframe.at[index, "ask"] = ask.sum()
dataframe.at[index, "delta"] = (
dataframe.at[index, "ask"] - dataframe.at[index, "bid"]
)
dataframe.loc[indices, "total_trades"] = len(trades_grouped_df)
dataframe.at[index, "total_trades"] = len(trades_grouped_df)
# Cache the result
cached_grouped_trades[(candle_start, candle_next)] = dataframe.loc[
is_between
].copy()
# Maintain cache size
if (
config.get("runmode") in (RunMode.DRY_RUN, RunMode.LIVE)
and len(cached_grouped_trades) > config_orderflow["cache_size"]
):
cached_grouped_trades.popitem(last=False)
else:
logger.debug(f"Found NO candles for trades starting with {candle_start}")
logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds")
# Merge the complex data Series back into the DataFrame
dataframe["trades"] = trades_series
dataframe["orderflow"] = orderflow_series
dataframe["imbalances"] = imbalances_series
dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series
dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series
# Cache the entire dataframe
cached_grouped_trades = dataframe.tail(config_orderflow["cache_size"]).copy()
except Exception as e:
logger.exception("Error populating dataframe with trades")

View File

@@ -5,7 +5,6 @@ This module defines the interface to apply for strategies
import logging
from abc import ABC, abstractmethod
from collections import OrderedDict
from datetime import datetime, timedelta, timezone
from math import isinf, isnan
@@ -141,9 +140,7 @@ class IStrategy(ABC, HyperStrategyMixin):
market_direction: MarketDirection = MarketDirection.NONE
# Global cache dictionary
_cached_grouped_trades_per_pair: dict[
str, OrderedDict[tuple[datetime, datetime], DataFrame]
] = {}
_cached_grouped_trades_per_pair: dict[str, DataFrame] = {}
def __init__(self, config: Config) -> None:
self.config = config
@@ -1604,15 +1601,11 @@ class IStrategy(ABC, HyperStrategyMixin):
if use_public_trades:
trades = self.dp.trades(pair=metadata["pair"], copy=False)
config = self.config
config["timeframe"] = self.timeframe
pair = metadata["pair"]
# TODO: slice trades to size of dataframe for faster backtesting
cached_grouped_trades: OrderedDict[tuple[datetime, datetime], DataFrame] = (
self._cached_grouped_trades_per_pair.get(pair, OrderedDict())
)
cached_grouped_trades: DataFrame | None = self._cached_grouped_trades_per_pair.get(pair)
dataframe, cached_grouped_trades = populate_dataframe_with_trades(
cached_grouped_trades, config, dataframe, trades
cached_grouped_trades, self.config, dataframe, trades
)
# dereference old cache

View File

@@ -1,13 +1,16 @@
from collections import OrderedDict
import numpy as np
import pandas as pd
import pytest
from freqtrade.constants import DEFAULT_TRADES_COLUMNS
from freqtrade.data.converter import populate_dataframe_with_trades
from freqtrade.data.converter.orderflow import trades_to_volumeprofile_with_total_delta_bid_ask
from freqtrade.data.converter.orderflow import (
ORDERFLOW_ADDED_COLUMNS,
trades_to_volumeprofile_with_total_delta_bid_ask,
)
from freqtrade.data.converter.trade_converter import trades_list_to_df
from freqtrade.data.dataprovider import DataProvider
from tests.strategy.strats.strategy_test_v3 import StrategyTestV3
BIN_SIZE_SCALE = 0.5
@@ -37,6 +40,7 @@ def populate_dataframe_with_trades_trades(testdatadir):
@pytest.fixture
def candles(testdatadir):
# TODO: this fixture isn't really necessary and could be removed
return pd.read_json(testdatadir / "orderflow/candles.json").copy()
@@ -102,7 +106,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
},
}
# Apply the function to populate the data frame with order flow data
df, _ = populate_dataframe_with_trades(OrderedDict(), config, dataframe, trades)
df, _ = populate_dataframe_with_trades(None, config, dataframe, trades)
# Extract results from the first row of the DataFrame
results = df.iloc[0]
t = results["trades"]
@@ -243,7 +247,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
}
# Populate the DataFrame with trades and order flow data
df, _ = populate_dataframe_with_trades(OrderedDict(), config, dataframe, trades)
df, _ = populate_dataframe_with_trades(None, config, dataframe, trades)
# --- DataFrame and Trade Data Validation ---
@@ -401,9 +405,7 @@ def test_public_trades_config_max_trades(
},
}
df, _ = populate_dataframe_with_trades(
OrderedDict(), default_conf | orderflow_config, dataframe, trades
)
df, _ = populate_dataframe_with_trades(None, default_conf | orderflow_config, dataframe, trades)
assert df.delta.count() == 1
@@ -482,3 +484,83 @@ def test_public_trades_testdata_sanity(
"cost",
"date",
]
def test_analyze_with_orderflow(
default_conf_usdt,
mocker,
populate_dataframe_with_trades_dataframe,
populate_dataframe_with_trades_trades,
):
ohlcv_history = populate_dataframe_with_trades_dataframe
# call without orderflow
strategy = StrategyTestV3(config=default_conf_usdt)
strategy.dp = DataProvider(default_conf_usdt, None, None)
mocker.patch.object(strategy.dp, "trades", return_value=populate_dataframe_with_trades_trades)
import freqtrade.data.converter.orderflow as orderflow_module
spy = mocker.spy(orderflow_module, "trades_to_volumeprofile_with_total_delta_bid_ask")
pair = "ETH/BTC"
df = strategy.advise_indicators(ohlcv_history, {"pair:": pair})
assert len(df) == len(ohlcv_history)
assert "open" in df.columns
assert spy.call_count == 0
# Not expected to run - shouldn't have added orderflow columns
for col in ORDERFLOW_ADDED_COLUMNS:
assert col not in df.columns, f"Column {col} found in df.columns"
default_conf_usdt["exchange"]["use_public_trades"] = True
default_conf_usdt["orderflow"] = {
"cache_size": 5,
"max_candles": 5,
"scale": 0.005,
"imbalance_volume": 0,
"imbalance_ratio": 3,
"stacked_imbalance_range": 3,
}
strategy.config = default_conf_usdt
# First round - builds cache
df1 = strategy.advise_indicators(ohlcv_history, {"pair": pair})
assert len(df1) == len(ohlcv_history)
assert "open" in df1.columns
assert spy.call_count == 5
for col in ORDERFLOW_ADDED_COLUMNS:
assert col in df1.columns, f"Column {col} not found in df.columns"
if col not in ("stacked_imbalances_bid", "stacked_imbalances_ask"):
assert df1[col].count() == 5, f"Column {col} has {df1[col].count()} non-NaN values"
assert len(strategy._cached_grouped_trades_per_pair[pair]) == 5
lastval_trades = df1.at[len(df1) - 1, "trades"]
assert isinstance(lastval_trades, list)
assert len(lastval_trades) == 122
lastval_of = df1.at[len(df1) - 1, "orderflow"]
assert isinstance(lastval_of, dict)
spy.reset_mock()
# Ensure caching works - call the same logic again.
df2 = strategy.advise_indicators(ohlcv_history, {"pair": pair})
assert len(df2) == len(ohlcv_history)
assert "open" in df2.columns
assert spy.call_count == 0
for col in ORDERFLOW_ADDED_COLUMNS:
assert col in df2.columns, f"Round2: Column {col} not found in df.columns"
if col not in ("stacked_imbalances_bid", "stacked_imbalances_ask"):
assert (
df2[col].count() == 5
), f"Round2: Column {col} has {df2[col].count()} non-NaN values"
lastval_trade2 = df2.at[len(df2) - 1, "trades"]
assert isinstance(lastval_trade2, list)
assert len(lastval_trade2) == 122
lastval_of2 = df2.at[len(df2) - 1, "orderflow"]
assert isinstance(lastval_of2, dict)