From 0233c38711a42d94dd97b6210e9098d864676aff Mon Sep 17 00:00:00 2001 From: Maxime Pagnoulle Date: Sat, 23 Aug 2025 20:24:10 +0200 Subject: [PATCH 1/5] feat: Only load trades needed for specified timerange --- freqtrade/data/dataprovider.py | 9 +++++-- .../datahandlers/featherdatahandler.py | 24 ++++++++++++++++--- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index ed1da4ca2..a30e4cf27 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -498,7 +498,12 @@ class DataProvider: return DataFrame() def trades( - self, pair: str, timeframe: str | None = None, copy: bool = True, candle_type: str = "" + self, + pair: str, + timeframe: str | None = None, + copy: bool = True, + candle_type: str = "", + timerange: TimeRange | None = None, ) -> DataFrame: """ Get candle (TRADES) data for the given pair as DataFrame @@ -526,7 +531,7 @@ class DataProvider: self._config["datadir"], data_format=self._config["dataformat_trades"] ) trades_df = data_handler.trades_load( - pair, self._config.get("trading_mode", TradingMode.SPOT) + pair, self._config.get("trading_mode", TradingMode.SPOT), timerange=timerange ) return trades_df diff --git a/freqtrade/data/history/datahandlers/featherdatahandler.py b/freqtrade/data/history/datahandlers/featherdatahandler.py index 46fd7e3ae..41978bb13 100644 --- a/freqtrade/data/history/datahandlers/featherdatahandler.py +++ b/freqtrade/data/history/datahandlers/featherdatahandler.py @@ -1,6 +1,7 @@ import logging from pandas import DataFrame, read_feather, to_datetime +from pyarrow import dataset from freqtrade.configuration import TimeRange from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, DEFAULT_TRADES_COLUMNS @@ -116,17 +117,34 @@ class FeatherDataHandler(IDataHandler): ) -> DataFrame: """ Load a pair from file, either .json.gz or .json - # TODO: respect timerange ... :param pair: Load trades for this pair :param trading_mode: Trading mode to use (used to determine the filename) - :param timerange: Timerange to load trades for - currently not implemented + :param timerange: Timerange to load trades for - filters data to this range if provided :return: Dataframe containing trades """ filename = self._pair_trades_filename(self._datadir, pair, trading_mode) if not filename.exists(): return DataFrame(columns=DEFAULT_TRADES_COLUMNS) - tradesdata = read_feather(filename) + # Load trades data with optional timerange filtering + if timerange is None: + # No timerange filter - load entire file + logger.debug(f"Loading entire trades file for {pair}") + tradesdata = read_feather(filename) + else: + # Use Arrow dataset with predicate pushdown for efficient filtering + try: + dataset_reader = dataset.dataset(filename, format="feather") + time_filter = (dataset.field("timestamp") >= timerange.startts) & ( + dataset.field("timestamp") <= timerange.stopts + ) + tradesdata = dataset_reader.to_table(filter=time_filter).to_pandas() + logger.debug(f"Loaded {len(tradesdata)} trades for {pair}") + + except (ImportError, AttributeError, ValueError) as e: + # Fallback: load entire file + logger.debug(f"Unable to use Arrow filtering, loading entire trades file: {e}") + tradesdata = read_feather(filename) return tradesdata From 82903cc56719c4fe463552bab7d8f2b548656ff3 Mon Sep 17 00:00:00 2001 From: Maxime Pagnoulle Date: Sat, 23 Aug 2025 20:25:26 +0200 Subject: [PATCH 2/5] feat: Filter trades based on timerange --- freqtrade/strategy/interface.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index c863a8392..140bd7e0c 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -11,6 +11,7 @@ from math import isinf, isnan from pandas import DataFrame from pydantic import ValidationError +from freqtrade.configuration import TimeRange from freqtrade.constants import CUSTOM_TAG_MAX_LENGTH, Config, IntOrInf, ListPairsWithTimeframes from freqtrade.data.converter import populate_dataframe_with_trades from freqtrade.data.converter.converter import reduce_dataframe_footprint @@ -1767,9 +1768,31 @@ class IStrategy(ABC, HyperStrategyMixin): use_public_trades = self.config.get("exchange", {}).get("use_public_trades", False) if use_public_trades: pair = metadata["pair"] - trades = self.dp.trades(pair=pair, copy=False) + # Build timerange from dataframe date column + if not dataframe.empty: + start_ts = int(dataframe["date"].iloc[0].timestamp() * 1000) + end_ts = int(dataframe["date"].iloc[-1].timestamp() * 1000) + timerange = TimeRange("date", "date", startts=start_ts, stopts=end_ts) + else: + timerange = None + + trades = self.dp.trades(pair=pair, copy=False, timerange=timerange) + + # Apply additional filtering with buffer for faster backtesting + if not trades.empty and not dataframe.empty and "timestamp" in trades.columns: + # Add timeframe buffer to ensure complete candle coverage + timeframe_buffer = timeframe_to_seconds(self.config["timeframe"]) * 1000 + + # Create time bounds with buffer + time_start = start_ts - timeframe_buffer + time_end = end_ts + timeframe_buffer + + # Filter trades within buffered timerange + trades_mask = (trades["timestamp"] >= time_start) & ( + trades["timestamp"] <= time_end + ) + trades = trades.loc[trades_mask].reset_index(drop=True) - # TODO: slice trades to size of dataframe for faster backtesting cached_grouped_trades: DataFrame | None = self._cached_grouped_trades_per_pair.get(pair) dataframe, cached_grouped_trades = populate_dataframe_with_trades( cached_grouped_trades, self.config, dataframe, trades From f21c5ea88a7479757e841523d5dab28595f77dd1 Mon Sep 17 00:00:00 2001 From: Maxime Pagnoulle Date: Sun, 24 Aug 2025 11:59:56 +0200 Subject: [PATCH 3/5] feat: Remove redundant filtering, add tests for pyarrow trade filtering, use date utils for date to ts conversion --- .../datahandlers/featherdatahandler.py | 2 +- freqtrade/strategy/interface.py | 21 +----- tests/data/test_datahandler.py | 67 +++++++++++++++++++ 3 files changed, 71 insertions(+), 19 deletions(-) diff --git a/freqtrade/data/history/datahandlers/featherdatahandler.py b/freqtrade/data/history/datahandlers/featherdatahandler.py index 41978bb13..6e813c046 100644 --- a/freqtrade/data/history/datahandlers/featherdatahandler.py +++ b/freqtrade/data/history/datahandlers/featherdatahandler.py @@ -143,7 +143,7 @@ class FeatherDataHandler(IDataHandler): except (ImportError, AttributeError, ValueError) as e: # Fallback: load entire file - logger.debug(f"Unable to use Arrow filtering, loading entire trades file: {e}") + logger.warning(f"Unable to use Arrow filtering, loading entire trades file: {e}") tradesdata = read_feather(filename) return tradesdata diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 140bd7e0c..30c5f0590 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -41,7 +41,7 @@ from freqtrade.strategy.informative_decorator import ( ) from freqtrade.strategy.strategy_validation import StrategyResultValidator from freqtrade.strategy.strategy_wrapper import strategy_safe_wrapper -from freqtrade.util import dt_now +from freqtrade.util import dt_now, dt_ts from freqtrade.wallets import Wallets @@ -1770,29 +1770,14 @@ class IStrategy(ABC, HyperStrategyMixin): pair = metadata["pair"] # Build timerange from dataframe date column if not dataframe.empty: - start_ts = int(dataframe["date"].iloc[0].timestamp() * 1000) - end_ts = int(dataframe["date"].iloc[-1].timestamp() * 1000) + start_ts = dt_ts(dataframe["date"].iloc[0]) + end_ts = dt_ts(dataframe["date"].iloc[-1]) timerange = TimeRange("date", "date", startts=start_ts, stopts=end_ts) else: timerange = None trades = self.dp.trades(pair=pair, copy=False, timerange=timerange) - # Apply additional filtering with buffer for faster backtesting - if not trades.empty and not dataframe.empty and "timestamp" in trades.columns: - # Add timeframe buffer to ensure complete candle coverage - timeframe_buffer = timeframe_to_seconds(self.config["timeframe"]) * 1000 - - # Create time bounds with buffer - time_start = start_ts - timeframe_buffer - time_end = end_ts + timeframe_buffer - - # Filter trades within buffered timerange - trades_mask = (trades["timestamp"] >= time_start) & ( - trades["timestamp"] <= time_end - ) - trades = trades.loc[trades_mask].reset_index(drop=True) - cached_grouped_trades: DataFrame | None = self._cached_grouped_trades_per_pair.get(pair) dataframe, cached_grouped_trades = populate_dataframe_with_trades( cached_grouped_trades, self.config, dataframe, trades diff --git a/tests/data/test_datahandler.py b/tests/data/test_datahandler.py index 98c7c65ba..6d6cc40ae 100644 --- a/tests/data/test_datahandler.py +++ b/tests/data/test_datahandler.py @@ -506,3 +506,70 @@ def test_get_datahandler(testdatadir): assert isinstance(dh, JsonGzDataHandler) dh1 = get_datahandler(testdatadir, "jsongz", dh) assert id(dh1) == id(dh) + + +@pytest.fixture +def feather_dh(testdatadir): + return FeatherDataHandler(testdatadir) + + +@pytest.fixture +def trades_full(feather_dh): + df = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT) + assert not df.empty + return df + + +@pytest.fixture +def timerange_full(trades_full): + # Pick a full-span window using actual timestamps + startts = int(trades_full["timestamp"].min()) + stopts = int(trades_full["timestamp"].max()) + return TimeRange("date", "date", startts=startts, stopts=stopts) + + +@pytest.fixture +def timerange_mid(trades_full): + # Pick a mid-range window using actual timestamps + mid_start = int(trades_full["timestamp"].iloc[len(trades_full) // 3]) + mid_end = int(trades_full["timestamp"].iloc[(2 * len(trades_full)) // 3]) + return TimeRange("date", "date", startts=mid_start, stopts=mid_end) + + +def test_feather_trades_timerange_filter_fullspan(feather_dh, trades_full, timerange_full): + # Full-span filter should equal unfiltered + filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=timerange_full) + assert_frame_equal( + trades_full.reset_index(drop=True), filtered.reset_index(drop=True), check_exact=True + ) + + +def test_feather_trades_timerange_filter_subset(feather_dh, trades_full, timerange_mid): + # Subset filter should be a subset of the full-span filter + subset = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=timerange_mid) + assert not subset.empty + assert subset["timestamp"].min() >= timerange_mid.startts + assert subset["timestamp"].max() <= timerange_mid.stopts + assert len(subset) < len(trades_full) + + +def test_feather_trades_timerange_pushdown_fallback( + feather_dh, trades_full, timerange_mid, monkeypatch, caplog +): + # Pushdown filter should fail, so fallback should load the entire file + import freqtrade.data.history.datahandlers.featherdatahandler as fdh + + def raise_err(*args, **kwargs): + raise ValueError("fail") + + # Mock the dataset loading to raise an error + monkeypatch.setattr(fdh.dataset, "dataset", raise_err) + + with caplog.at_level("WARNING"): + out = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=timerange_mid) + + assert len(out) == len(trades_full) + assert any( + "Unable to use Arrow filtering, loading entire trades file" in r.message + for r in caplog.records + ) From 3ff1e31a81ef36b763f4c02f2d4770e402bf3540 Mon Sep 17 00:00:00 2001 From: Matthias Date: Wed, 27 Aug 2025 08:31:14 +0200 Subject: [PATCH 4/5] test: don't use fixture if it's only used once --- tests/data/test_datahandler.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/data/test_datahandler.py b/tests/data/test_datahandler.py index 6d6cc40ae..df96751da 100644 --- a/tests/data/test_datahandler.py +++ b/tests/data/test_datahandler.py @@ -520,14 +520,6 @@ def trades_full(feather_dh): return df -@pytest.fixture -def timerange_full(trades_full): - # Pick a full-span window using actual timestamps - startts = int(trades_full["timestamp"].min()) - stopts = int(trades_full["timestamp"].max()) - return TimeRange("date", "date", startts=startts, stopts=stopts) - - @pytest.fixture def timerange_mid(trades_full): # Pick a mid-range window using actual timestamps @@ -536,7 +528,13 @@ def timerange_mid(trades_full): return TimeRange("date", "date", startts=mid_start, stopts=mid_end) -def test_feather_trades_timerange_filter_fullspan(feather_dh, trades_full, timerange_full): +def test_feather_trades_timerange_filter_fullspan(feather_dh, trades_full): + timerange_full = TimeRange( + "date", + "date", + startts=int(trades_full["timestamp"].min()), + stopts=int(trades_full["timestamp"].max()), + ) # Full-span filter should equal unfiltered filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=timerange_full) assert_frame_equal( From 9d5295fdb429dcad50c9c7ec1621ae510aa32183 Mon Sep 17 00:00:00 2001 From: Maxime Pagnoulle Date: Sat, 30 Aug 2025 17:38:23 +0200 Subject: [PATCH 5/5] feat: Support for unbounded start and/or end in timerange filtering, added associated tests --- .../datahandlers/featherdatahandler.py | 68 +++++++++++++----- tests/data/test_datahandler.py | 69 +++++++++++++++++++ 2 files changed, 119 insertions(+), 18 deletions(-) diff --git a/freqtrade/data/history/datahandlers/featherdatahandler.py b/freqtrade/data/history/datahandlers/featherdatahandler.py index 6e813c046..ef293d6b2 100644 --- a/freqtrade/data/history/datahandlers/featherdatahandler.py +++ b/freqtrade/data/history/datahandlers/featherdatahandler.py @@ -112,6 +112,36 @@ class FeatherDataHandler(IDataHandler): """ raise NotImplementedError() + def _build_arrow_time_filter(self, timerange: TimeRange | None): + """ + Build Arrow predicate filter for timerange filtering. + Treats 0 as unbounded (no filter on that side). + :param timerange: TimeRange object with start/stop timestamps + :return: Arrow filter expression or None if fully unbounded + """ + if not timerange: + return None + + # Treat 0 as unbounded + start_set = bool(timerange.startts and timerange.startts > 0) + stop_set = bool(timerange.stopts and timerange.stopts > 0) + + if not (start_set or stop_set): + return None + + ts_field = dataset.field("timestamp") + exprs = [] + + if start_set: + exprs.append(ts_field >= timerange.startts) + if stop_set: + exprs.append(ts_field <= timerange.stopts) + + if len(exprs) == 1: + return exprs[0] + else: + return exprs[0] & exprs[1] + def _trades_load( self, pair: str, trading_mode: TradingMode, timerange: TimeRange | None = None ) -> DataFrame: @@ -126,25 +156,27 @@ class FeatherDataHandler(IDataHandler): if not filename.exists(): return DataFrame(columns=DEFAULT_TRADES_COLUMNS) - # Load trades data with optional timerange filtering - if timerange is None: - # No timerange filter - load entire file - logger.debug(f"Loading entire trades file for {pair}") - tradesdata = read_feather(filename) - else: - # Use Arrow dataset with predicate pushdown for efficient filtering - try: - dataset_reader = dataset.dataset(filename, format="feather") - time_filter = (dataset.field("timestamp") >= timerange.startts) & ( - dataset.field("timestamp") <= timerange.stopts - ) - tradesdata = dataset_reader.to_table(filter=time_filter).to_pandas() - logger.debug(f"Loaded {len(tradesdata)} trades for {pair}") + # Use Arrow dataset with optional timerange filtering, fallback to read_feather + try: + dataset_reader = dataset.dataset(filename, format="feather") + time_filter = self._build_arrow_time_filter(timerange) - except (ImportError, AttributeError, ValueError) as e: - # Fallback: load entire file - logger.warning(f"Unable to use Arrow filtering, loading entire trades file: {e}") - tradesdata = read_feather(filename) + if time_filter is not None and timerange is not None: + tradesdata = dataset_reader.to_table(filter=time_filter).to_pandas() + start_desc = timerange.startts if timerange.startts > 0 else "unbounded" + stop_desc = timerange.stopts if timerange.stopts > 0 else "unbounded" + logger.debug( + f"Loaded {len(tradesdata)} trades for {pair} " + f"(filtered start={start_desc}, stop={stop_desc})" + ) + else: + tradesdata = dataset_reader.to_table().to_pandas() + logger.debug(f"Loaded {len(tradesdata)} trades for {pair} (unfiltered)") + + except (ImportError, AttributeError, ValueError) as e: + # Fallback: load entire file + logger.warning(f"Unable to use Arrow filtering, loading entire trades file: {e}") + tradesdata = read_feather(filename) return tradesdata diff --git a/tests/data/test_datahandler.py b/tests/data/test_datahandler.py index df96751da..bc665535f 100644 --- a/tests/data/test_datahandler.py +++ b/tests/data/test_datahandler.py @@ -571,3 +571,72 @@ def test_feather_trades_timerange_pushdown_fallback( "Unable to use Arrow filtering, loading entire trades file" in r.message for r in caplog.records ) + + +def test_feather_trades_timerange_open_start(feather_dh, trades_full): + # Open start: stop timestamp but no start (startts=0) + stop_ts = int(trades_full["timestamp"].iloc[(2 * len(trades_full)) // 3]) + tr = TimeRange(None, "date", startts=0, stopts=stop_ts) + + filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr) + assert 0 < len(filtered) < len(trades_full) + assert filtered["timestamp"].max() <= stop_ts + # First row should match full's first row + assert filtered.iloc[0]["timestamp"] == trades_full.iloc[0]["timestamp"] + + +def test_feather_trades_timerange_open_end(feather_dh, trades_full): + # Open end: start timestamp but no stop (stopts=0) + start_ts = int(trades_full["timestamp"].iloc[len(trades_full) // 3]) + tr = TimeRange("date", None, startts=start_ts, stopts=0) + + filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr) + assert 0 < len(filtered) < len(trades_full) + assert filtered["timestamp"].min() >= start_ts + # Last row should match full's last row + assert filtered.iloc[-1]["timestamp"] == trades_full.iloc[-1]["timestamp"] + + +def test_feather_trades_timerange_fully_open(feather_dh, trades_full): + # Fully open: no start or stop bounds (both 0) + tr = TimeRange(None, None, startts=0, stopts=0) + + filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr) + # Should equal unfiltered load + assert_frame_equal( + trades_full.reset_index(drop=True), filtered.reset_index(drop=True), check_exact=True + ) + + +def test_feather_build_arrow_time_filter(feather_dh): + # None timerange should return None + assert feather_dh._build_arrow_time_filter(None) is None + + # Fully open (both bounds 0) should return None + tr_fully_open = TimeRange(None, None, startts=0, stopts=0) + assert feather_dh._build_arrow_time_filter(tr_fully_open) is None + + # Open start (startts=0) should return stop filter only + tr_open_start = TimeRange(None, "date", startts=0, stopts=1000) + filter_open_start = feather_dh._build_arrow_time_filter(tr_open_start) + assert filter_open_start is not None + # Should be a single expression (timestamp <= stopts) + assert str(filter_open_start).count("<=") == 1 + assert str(filter_open_start).count(">=") == 0 + + # Open end (stopts=0) should return start filter only + tr_open_end = TimeRange("date", None, startts=500, stopts=0) + filter_open_end = feather_dh._build_arrow_time_filter(tr_open_end) + assert filter_open_end is not None + # Should be a single expression (timestamp >= startts) + assert str(filter_open_end).count(">=") == 1 + assert str(filter_open_end).count("<=") == 0 + + # Closed range should return combined filter + tr_closed = TimeRange("date", "date", startts=500, stopts=1000) + filter_closed = feather_dh._build_arrow_time_filter(tr_closed) + assert filter_closed is not None + # Should contain both >= and <= (combined with &) + filter_str = str(filter_closed) + assert ">=" in filter_str + assert "<=" in filter_str