diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index 909fd0c74..b460bea59 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -7,20 +7,18 @@ import time import typing from collections import OrderedDict from datetime import datetime -from typing import Dict, Tuple +from typing import Tuple import numpy as np import pandas as pd from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS +from freqtrade.enums import RunMode from freqtrade.exceptions import DependencyException logger = logging.getLogger(__name__) -# Global cache dictionary -cached_grouped_trades_per_pair: Dict[str, OrderedDict[Tuple[datetime, datetime], pd.DataFrame]] = {} - def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): """ @@ -64,8 +62,11 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): def populate_dataframe_with_trades( - pair: str, config, dataframe: pd.DataFrame, trades: pd.DataFrame -): + cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], pd.DataFrame], + config, + dataframe: pd.DataFrame, + trades: pd.DataFrame, +) -> Tuple[pd.DataFrame, OrderedDict[Tuple[datetime, datetime], pd.DataFrame]]: """ Populates a dataframe with trades :param dataframe: Dataframe to populate @@ -74,15 +75,11 @@ def populate_dataframe_with_trades( """ timeframe = config["timeframe"] config_orderflow = config["orderflow"] - cache_size = config_orderflow["cache_size"] # create columns for trades _init_dataframe_with_trades_columns(dataframe) try: - cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], pd.DataFrame] = ( - cached_grouped_trades_per_pair.get(pair, OrderedDict()) - ) start_time = time.time() # calculate ohlcv candle start and end _calculate_ohlcv_candle_start_and_end(trades, timeframe) @@ -184,7 +181,10 @@ def populate_dataframe_with_trades( ) # Maintain cache size - if len(cached_grouped_trades) > cache_size: + if ( + config.get("runmode") in (RunMode.DRY_RUN, RunMode.LIVE) + and len(cached_grouped_trades) > config_orderflow["cache_size"] + ): cached_grouped_trades.popitem(last=False) else: logger.debug(f"Found NO candles for trades starting with {candle_start}") @@ -196,16 +196,12 @@ def populate_dataframe_with_trades( dataframe["imbalances"] = imbalances_series dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series - # dereference old cache - if pair in cached_grouped_trades_per_pair: - del cached_grouped_trades_per_pair[pair] - cached_grouped_trades_per_pair[pair] = cached_grouped_trades except Exception as e: logger.exception("Error populating dataframe with trades") raise DependencyException(e) - return dataframe + return dataframe, cached_grouped_trades def trades_to_volumeprofile_with_total_delta_bid_ask( diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index c3c970c25..3893ee42b 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -5,6 +5,7 @@ This module defines the interface to apply for strategies import logging from abc import ABC, abstractmethod +from collections import OrderedDict from datetime import datetime, timedelta, timezone from math import isinf, isnan from typing import Dict, List, Optional, Tuple, Union @@ -140,6 +141,11 @@ class IStrategy(ABC, HyperStrategyMixin): # A self set parameter that represents the market direction. filled from configuration market_direction: MarketDirection = MarketDirection.NONE + # Global cache dictionary + _cached_grouped_trades_per_pair: Dict[ + str, OrderedDict[Tuple[datetime, datetime], DataFrame] + ] = {} + def __init__(self, config: Config) -> None: self.config = config # Dict to determine if analysis is necessary @@ -1603,8 +1609,19 @@ class IStrategy(ABC, HyperStrategyMixin): config = self.config config["timeframe"] = self.timeframe + pair = metadata["pair"] # TODO: slice trades to size of dataframe for faster backtesting - dataframe = populate_dataframe_with_trades(metadata["pair"], config, dataframe, trades) + cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], DataFrame] = ( + self._cached_grouped_trades_per_pair.get(pair, OrderedDict()) + ) + dataframe, cached_grouped_trades = populate_dataframe_with_trades( + cached_grouped_trades, config, dataframe, trades + ) + + # dereference old cache + if pair in self._cached_grouped_trades_per_pair: + del self._cached_grouped_trades_per_pair[pair] + self._cached_grouped_trades_per_pair[pair] = cached_grouped_trades logger.debug("Populated dataframe with trades.") diff --git a/tests/data/test_converter_public_trades.py b/tests/data/test_converter_public_trades.py index dd08294bc..d3af0f2c1 100644 --- a/tests/data/test_converter_public_trades.py +++ b/tests/data/test_converter_public_trades.py @@ -1,3 +1,5 @@ +from collections import OrderedDict + import numpy as np import pandas as pd import pytest @@ -47,15 +49,6 @@ def public_trades_list_simple(testdatadir): return read_csv(testdatadir / "orderflow/public_trades_list_simple_example.csv").copy() -@pytest.fixture -def reset_cache(request): - import freqtrade.data.converter.orderflow as orderflow - - global orderflow # noqa F811 - orderflow.cached_grouped_trades_per_pair = {} - yield - - def test_public_trades_columns_before_change( populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): @@ -80,7 +73,7 @@ def test_public_trades_columns_before_change( def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( - reset_cache, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades + populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): """ Tests the `populate_dataframe_with_trades` function's order flow calculation. @@ -108,7 +101,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( }, } # Apply the function to populate the data frame with order flow data - df = populate_dataframe_with_trades("BTC/UDST", config, dataframe, trades) + df, _ = populate_dataframe_with_trades(OrderedDict(), config, dataframe, trades) # Extract results from the first row of the DataFrame results = df.iloc[0] t = results["trades"] @@ -179,7 +172,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( - reset_cache, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades + populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades ): """ Tests the `populate_dataframe_with_trades` function's handling of trades, @@ -221,7 +214,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( } # Populate the DataFrame with trades and order flow data - df = populate_dataframe_with_trades("BTC/UDST", config, dataframe, trades) + df, _ = populate_dataframe_with_trades(OrderedDict(), config, dataframe, trades) # --- DataFrame and Trade Data Validation --- @@ -387,8 +380,8 @@ def test_public_trades_config_max_trades( }, } - df = populate_dataframe_with_trades( - "BTC/UDST", default_conf | orderflow_config, dataframe, trades + df, _ = populate_dataframe_with_trades( + OrderedDict(), default_conf | orderflow_config, dataframe, trades ) assert df.delta.count() == 1