feat: add caching to populate_dataframe_with_trades

This commit is contained in:
Joe Schr
2024-06-21 15:28:11 +02:00
parent ffda564f05
commit d23c33a47f
2 changed files with 90 additions and 79 deletions

View File

@@ -10,7 +10,7 @@ import pandas as pd
from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, Config
from freqtrade.exceptions import DependencyException
from collections import OrderedDict
logger = logging.getLogger(__name__)
@@ -20,16 +20,25 @@ def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame):
Populates a dataframe with trades columns
:param dataframe: Dataframe to populate
"""
dataframe["trades"] = dataframe.apply(lambda _: [], axis=1)
dataframe["orderflow"] = dataframe.apply(lambda _: {}, axis=1)
# Initialize columns with appropriate dtypes
dataframe["trades"] = np.nan
dataframe["orderflow"] = np.nan
dataframe["imbalances"] = np.nan
dataframe["stacked_imbalances_bid"] = np.nan
dataframe["stacked_imbalances_ask"] = np.nan
dataframe["max_delta"] = np.nan
dataframe["min_delta"] = np.nan
dataframe["bid"] = np.nan
dataframe["ask"] = np.nan
dataframe["delta"] = np.nan
dataframe["min_delta"] = np.nan
dataframe["max_delta"] = np.nan
dataframe["total_trades"] = np.nan
dataframe["stacked_imbalances_bid"] = np.nan
dataframe["stacked_imbalances_ask"] = np.nan
# Ensure the 'trades' column is of object type
dataframe["trades"] = dataframe["trades"].astype(object)
dataframe["orderflow"] = dataframe["orderflow"].astype(object)
dataframe["imbalances"] = dataframe["imbalances"].astype(object)
dataframe["stacked_imbalances_bid"] = dataframe["stacked_imbalances_bid"].astype(object)
dataframe["stacked_imbalances_ask"] = dataframe["stacked_imbalances_ask"].astype(object)
def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
@@ -47,9 +56,12 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
df.drop(columns=["datetime"], inplace=True)
def populate_dataframe_with_trades(
config: Config, dataframe: pd.DataFrame, trades: pd.DataFrame
) -> pd.DataFrame:
# Global cache dictionary
cache_size = 1000 # TODO move that in config
cached_grouped_trades = OrderedDict() # TODO move that where?
def populate_dataframe_with_trades(config, dataframe, trades):
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
@@ -61,7 +73,6 @@ def populate_dataframe_with_trades(
# create columns for trades
_init_dataframe_with_trades_columns(dataframe)
df = dataframe.copy()
try:
start_time = time.time()
@@ -70,95 +81,94 @@ def populate_dataframe_with_trades(
# get date of earliest max_candles candle
max_candles = config_orderflow["max_candles"]
start_date = df.tail(max_candles).date.iat[0]
start_date = dataframe.tail(max_candles).date.iat[0]
# slice of trades that are before current ohlcv candles to make groupby faster
trades = trades.loc[trades.candle_start >= start_date]
trades.reset_index(inplace=True, drop=True)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False)
trades_grouped_by_candle_start = trades.groupby("candle_start")
for candle_start in trades_grouped_by_candle_start.groups:
trades_grouped_df = trades[candle_start == trades["candle_start"]]
is_between = candle_start == df["date"]
if np.any(is_between == True): # noqa: E712
# Create Series to hold complex data
trades_series = pd.Series(index=dataframe.index, dtype=object)
orderflow_series = pd.Series(index=dataframe.index, dtype=object)
imbalances_series = pd.Series(index=dataframe.index, dtype=object)
stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object)
stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object)
for candle_start, trades_grouped_df in trades_grouped_by_candle_start:
is_between = candle_start == dataframe["date"]
if is_between.any():
from freqtrade.exchange import timeframe_to_next_date
candle_next = timeframe_to_next_date(timeframe, candle_start)
# skip if there are no trades at next candle
# because that this candle isn't finished yet
if candle_next not in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades "
f"might be unfinished, because no finished trades at {candle_next}"
)
# add trades to each candle
df.loc[is_between, "trades"] = df.loc[is_between, "trades"].apply(
lambda _: trades_grouped_df
)
# calculate orderflow for each candle
df.loc[is_between, "orderflow"] = df.loc[is_between, "orderflow"].apply(
lambda _: trades_to_volumeprofile_with_total_delta_bid_ask(
trades_grouped_df, scale=config_orderflow["scale"]
)
)
# calculate imbalances for each candle's orderflow
df.loc[is_between, "imbalances"] = df.loc[is_between, "orderflow"].apply(
lambda x: trades_orderflow_to_imbalances(
x,
imbalance_ratio=config_orderflow["imbalance_ratio"],
imbalance_volume=config_orderflow["imbalance_volume"],
)
)
# Use caching mechanism
if (candle_start, candle_next) in cached_grouped_trades:
cache_entry = cached_grouped_trades[(candle_start, candle_next)]
dataframe.loc[is_between] = cache_entry
continue
_stacked_imb = config_orderflow["stacked_imbalance_range"]
df.loc[is_between, "stacked_imbalances_bid"] = df.loc[
is_between, "imbalances"
].apply(lambda x: stacked_imbalance_bid(x, stacked_imbalance_range=_stacked_imb))
df.loc[is_between, "stacked_imbalances_ask"] = df.loc[
is_between, "imbalances"
].apply(lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb))
# Store trades in Series using integer indices
indices = dataframe.index[is_between].tolist()
trades_series.loc[indices] = [trades_grouped_df] * len(indices)
bid = np.where(
trades_grouped_df["side"].str.contains("sell"),
trades_grouped_df["amount"],
0,
# Calculate orderflow for each candle
orderflow = trades_to_volumeprofile_with_total_delta_bid_ask(
trades_grouped_df, scale=config_orderflow["scale"]
)
ask = np.where(
trades_grouped_df["side"].str.contains("buy"),
trades_grouped_df["amount"],
0,
orderflow_series.loc[indices] = [orderflow] * len(indices)
# Calculate imbalances for each candle's orderflow
imbalances = trades_orderflow_to_imbalances(
orderflow,
imbalance_ratio=config_orderflow["imbalance_ratio"],
imbalance_volume=config_orderflow["imbalance_volume"],
)
imbalances_series.loc[indices] = [imbalances] * len(indices)
stacked_imb = config_orderflow["stacked_imbalance_range"]
stacked_imbalances_bid_series.loc[indices] = [stacked_imbalance_bid(imbalances, stacked_imbalance_range=stacked_imb)] * len(indices)
stacked_imbalances_ask_series.loc[indices] = [stacked_imbalance_ask(imbalances, stacked_imbalance_range=stacked_imb)] * len(indices)
bid = trades_grouped_df["side"].str.contains("sell").astype(int) * trades_grouped_df["amount"]
ask = trades_grouped_df["side"].str.contains("buy").astype(int) * trades_grouped_df["amount"]
deltas_per_trade = ask - bid
min_delta = 0
max_delta = 0
delta = 0
for d in deltas_per_trade:
delta += d
if delta > max_delta:
max_delta = delta
if delta < min_delta:
min_delta = delta
df.loc[is_between, "max_delta"] = max_delta
df.loc[is_between, "min_delta"] = min_delta
df.loc[is_between, "bid"] = np.where(
trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0
).sum()
df.loc[is_between, "ask"] = np.where(
trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0
).sum()
df.loc[is_between, "delta"] = df.loc[is_between, "ask"] - df.loc[is_between, "bid"]
df.loc[is_between, "total_trades"] = len(trades_grouped_df)
# copy to avoid memory leaks
dataframe.loc[is_between] = df.loc[is_between].copy()
min_delta = deltas_per_trade.cumsum().min()
max_delta = deltas_per_trade.cumsum().max()
dataframe.loc[indices, "max_delta"] = max_delta
dataframe.loc[indices, "min_delta"] = min_delta
dataframe.loc[indices, "bid"] = bid.sum()
dataframe.loc[indices, "ask"] = ask.sum()
dataframe.loc[indices, "delta"] = dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"]
dataframe.loc[indices, "total_trades"] = len(trades_grouped_df)
# Cache the result
cached_grouped_trades[(candle_start, candle_next)] = dataframe.loc[is_between].copy()
# Maintain cache size
if len(cached_grouped_trades) > cache_size:
cached_grouped_trades.popitem(last=False)
else:
logger.debug(f"Found NO candles for trades starting with {candle_start}")
logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds")
# Merge the complex data Series back into the DataFrame
dataframe["trades"] = trades_series
dataframe["orderflow"] = orderflow_series
dataframe["imbalances"] = imbalances_series
dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series
dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series
except Exception as e:
logger.exception("Error populating dataframe with trades:", e)
logger.exception("Error populating dataframe with trades")
raise DependencyException(e)
return dataframe

View File

@@ -227,14 +227,15 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
"volume",
"trades",
"orderflow",
"imbalances",
"stacked_imbalances_bid",
"stacked_imbalances_ask",
"max_delta",
"min_delta",
"bid",
"ask",
"delta",
"min_delta",
"max_delta",
"total_trades",
"stacked_imbalances_bid",
"stacked_imbalances_ask",
]
# Assert delta, bid, and ask values
assert -50.519 == pytest.approx(row["delta"])