mirror of
https://github.com/freqtrade/freqtrade.git
synced 2026-01-20 05:50:36 +00:00
Merge pull request #12151 from mpagnoulle/feat/trades-timerange-filtering
Trades TimeRange filtering for FeatherDataHandler and _if_enabled_populate_trades
This commit is contained in:
@@ -498,7 +498,12 @@ class DataProvider:
|
||||
return DataFrame()
|
||||
|
||||
def trades(
|
||||
self, pair: str, timeframe: str | None = None, copy: bool = True, candle_type: str = ""
|
||||
self,
|
||||
pair: str,
|
||||
timeframe: str | None = None,
|
||||
copy: bool = True,
|
||||
candle_type: str = "",
|
||||
timerange: TimeRange | None = None,
|
||||
) -> DataFrame:
|
||||
"""
|
||||
Get candle (TRADES) data for the given pair as DataFrame
|
||||
@@ -526,7 +531,7 @@ class DataProvider:
|
||||
self._config["datadir"], data_format=self._config["dataformat_trades"]
|
||||
)
|
||||
trades_df = data_handler.trades_load(
|
||||
pair, self._config.get("trading_mode", TradingMode.SPOT)
|
||||
pair, self._config.get("trading_mode", TradingMode.SPOT), timerange=timerange
|
||||
)
|
||||
return trades_df
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
|
||||
from pandas import DataFrame, read_feather, to_datetime
|
||||
from pyarrow import dataset
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, DEFAULT_TRADES_COLUMNS
|
||||
@@ -111,22 +112,71 @@ class FeatherDataHandler(IDataHandler):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _build_arrow_time_filter(self, timerange: TimeRange | None):
|
||||
"""
|
||||
Build Arrow predicate filter for timerange filtering.
|
||||
Treats 0 as unbounded (no filter on that side).
|
||||
:param timerange: TimeRange object with start/stop timestamps
|
||||
:return: Arrow filter expression or None if fully unbounded
|
||||
"""
|
||||
if not timerange:
|
||||
return None
|
||||
|
||||
# Treat 0 as unbounded
|
||||
start_set = bool(timerange.startts and timerange.startts > 0)
|
||||
stop_set = bool(timerange.stopts and timerange.stopts > 0)
|
||||
|
||||
if not (start_set or stop_set):
|
||||
return None
|
||||
|
||||
ts_field = dataset.field("timestamp")
|
||||
exprs = []
|
||||
|
||||
if start_set:
|
||||
exprs.append(ts_field >= timerange.startts)
|
||||
if stop_set:
|
||||
exprs.append(ts_field <= timerange.stopts)
|
||||
|
||||
if len(exprs) == 1:
|
||||
return exprs[0]
|
||||
else:
|
||||
return exprs[0] & exprs[1]
|
||||
|
||||
def _trades_load(
|
||||
self, pair: str, trading_mode: TradingMode, timerange: TimeRange | None = None
|
||||
) -> DataFrame:
|
||||
"""
|
||||
Load a pair from file, either .json.gz or .json
|
||||
# TODO: respect timerange ...
|
||||
:param pair: Load trades for this pair
|
||||
:param trading_mode: Trading mode to use (used to determine the filename)
|
||||
:param timerange: Timerange to load trades for - currently not implemented
|
||||
:param timerange: Timerange to load trades for - filters data to this range if provided
|
||||
:return: Dataframe containing trades
|
||||
"""
|
||||
filename = self._pair_trades_filename(self._datadir, pair, trading_mode)
|
||||
if not filename.exists():
|
||||
return DataFrame(columns=DEFAULT_TRADES_COLUMNS)
|
||||
|
||||
tradesdata = read_feather(filename)
|
||||
# Use Arrow dataset with optional timerange filtering, fallback to read_feather
|
||||
try:
|
||||
dataset_reader = dataset.dataset(filename, format="feather")
|
||||
time_filter = self._build_arrow_time_filter(timerange)
|
||||
|
||||
if time_filter is not None and timerange is not None:
|
||||
tradesdata = dataset_reader.to_table(filter=time_filter).to_pandas()
|
||||
start_desc = timerange.startts if timerange.startts > 0 else "unbounded"
|
||||
stop_desc = timerange.stopts if timerange.stopts > 0 else "unbounded"
|
||||
logger.debug(
|
||||
f"Loaded {len(tradesdata)} trades for {pair} "
|
||||
f"(filtered start={start_desc}, stop={stop_desc})"
|
||||
)
|
||||
else:
|
||||
tradesdata = dataset_reader.to_table().to_pandas()
|
||||
logger.debug(f"Loaded {len(tradesdata)} trades for {pair} (unfiltered)")
|
||||
|
||||
except (ImportError, AttributeError, ValueError) as e:
|
||||
# Fallback: load entire file
|
||||
logger.warning(f"Unable to use Arrow filtering, loading entire trades file: {e}")
|
||||
tradesdata = read_feather(filename)
|
||||
|
||||
return tradesdata
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ from math import isinf, isnan
|
||||
from pandas import DataFrame
|
||||
from pydantic import ValidationError
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.constants import CUSTOM_TAG_MAX_LENGTH, Config, IntOrInf, ListPairsWithTimeframes
|
||||
from freqtrade.data.converter import populate_dataframe_with_trades
|
||||
from freqtrade.data.converter.converter import reduce_dataframe_footprint
|
||||
@@ -40,7 +41,7 @@ from freqtrade.strategy.informative_decorator import (
|
||||
)
|
||||
from freqtrade.strategy.strategy_validation import StrategyResultValidator
|
||||
from freqtrade.strategy.strategy_wrapper import strategy_safe_wrapper
|
||||
from freqtrade.util import dt_now
|
||||
from freqtrade.util import dt_now, dt_ts
|
||||
from freqtrade.wallets import Wallets
|
||||
|
||||
|
||||
@@ -1767,9 +1768,16 @@ class IStrategy(ABC, HyperStrategyMixin):
|
||||
use_public_trades = self.config.get("exchange", {}).get("use_public_trades", False)
|
||||
if use_public_trades:
|
||||
pair = metadata["pair"]
|
||||
trades = self.dp.trades(pair=pair, copy=False)
|
||||
# Build timerange from dataframe date column
|
||||
if not dataframe.empty:
|
||||
start_ts = dt_ts(dataframe["date"].iloc[0])
|
||||
end_ts = dt_ts(dataframe["date"].iloc[-1])
|
||||
timerange = TimeRange("date", "date", startts=start_ts, stopts=end_ts)
|
||||
else:
|
||||
timerange = None
|
||||
|
||||
trades = self.dp.trades(pair=pair, copy=False, timerange=timerange)
|
||||
|
||||
# TODO: slice trades to size of dataframe for faster backtesting
|
||||
cached_grouped_trades: DataFrame | None = self._cached_grouped_trades_per_pair.get(pair)
|
||||
dataframe, cached_grouped_trades = populate_dataframe_with_trades(
|
||||
cached_grouped_trades, self.config, dataframe, trades
|
||||
|
||||
@@ -506,3 +506,137 @@ def test_get_datahandler(testdatadir):
|
||||
assert isinstance(dh, JsonGzDataHandler)
|
||||
dh1 = get_datahandler(testdatadir, "jsongz", dh)
|
||||
assert id(dh1) == id(dh)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def feather_dh(testdatadir):
|
||||
return FeatherDataHandler(testdatadir)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def trades_full(feather_dh):
|
||||
df = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT)
|
||||
assert not df.empty
|
||||
return df
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def timerange_mid(trades_full):
|
||||
# Pick a mid-range window using actual timestamps
|
||||
mid_start = int(trades_full["timestamp"].iloc[len(trades_full) // 3])
|
||||
mid_end = int(trades_full["timestamp"].iloc[(2 * len(trades_full)) // 3])
|
||||
return TimeRange("date", "date", startts=mid_start, stopts=mid_end)
|
||||
|
||||
|
||||
def test_feather_trades_timerange_filter_fullspan(feather_dh, trades_full):
|
||||
timerange_full = TimeRange(
|
||||
"date",
|
||||
"date",
|
||||
startts=int(trades_full["timestamp"].min()),
|
||||
stopts=int(trades_full["timestamp"].max()),
|
||||
)
|
||||
# Full-span filter should equal unfiltered
|
||||
filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=timerange_full)
|
||||
assert_frame_equal(
|
||||
trades_full.reset_index(drop=True), filtered.reset_index(drop=True), check_exact=True
|
||||
)
|
||||
|
||||
|
||||
def test_feather_trades_timerange_filter_subset(feather_dh, trades_full, timerange_mid):
|
||||
# Subset filter should be a subset of the full-span filter
|
||||
subset = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=timerange_mid)
|
||||
assert not subset.empty
|
||||
assert subset["timestamp"].min() >= timerange_mid.startts
|
||||
assert subset["timestamp"].max() <= timerange_mid.stopts
|
||||
assert len(subset) < len(trades_full)
|
||||
|
||||
|
||||
def test_feather_trades_timerange_pushdown_fallback(
|
||||
feather_dh, trades_full, timerange_mid, monkeypatch, caplog
|
||||
):
|
||||
# Pushdown filter should fail, so fallback should load the entire file
|
||||
import freqtrade.data.history.datahandlers.featherdatahandler as fdh
|
||||
|
||||
def raise_err(*args, **kwargs):
|
||||
raise ValueError("fail")
|
||||
|
||||
# Mock the dataset loading to raise an error
|
||||
monkeypatch.setattr(fdh.dataset, "dataset", raise_err)
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
out = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=timerange_mid)
|
||||
|
||||
assert len(out) == len(trades_full)
|
||||
assert any(
|
||||
"Unable to use Arrow filtering, loading entire trades file" in r.message
|
||||
for r in caplog.records
|
||||
)
|
||||
|
||||
|
||||
def test_feather_trades_timerange_open_start(feather_dh, trades_full):
|
||||
# Open start: stop timestamp but no start (startts=0)
|
||||
stop_ts = int(trades_full["timestamp"].iloc[(2 * len(trades_full)) // 3])
|
||||
tr = TimeRange(None, "date", startts=0, stopts=stop_ts)
|
||||
|
||||
filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr)
|
||||
assert 0 < len(filtered) < len(trades_full)
|
||||
assert filtered["timestamp"].max() <= stop_ts
|
||||
# First row should match full's first row
|
||||
assert filtered.iloc[0]["timestamp"] == trades_full.iloc[0]["timestamp"]
|
||||
|
||||
|
||||
def test_feather_trades_timerange_open_end(feather_dh, trades_full):
|
||||
# Open end: start timestamp but no stop (stopts=0)
|
||||
start_ts = int(trades_full["timestamp"].iloc[len(trades_full) // 3])
|
||||
tr = TimeRange("date", None, startts=start_ts, stopts=0)
|
||||
|
||||
filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr)
|
||||
assert 0 < len(filtered) < len(trades_full)
|
||||
assert filtered["timestamp"].min() >= start_ts
|
||||
# Last row should match full's last row
|
||||
assert filtered.iloc[-1]["timestamp"] == trades_full.iloc[-1]["timestamp"]
|
||||
|
||||
|
||||
def test_feather_trades_timerange_fully_open(feather_dh, trades_full):
|
||||
# Fully open: no start or stop bounds (both 0)
|
||||
tr = TimeRange(None, None, startts=0, stopts=0)
|
||||
|
||||
filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr)
|
||||
# Should equal unfiltered load
|
||||
assert_frame_equal(
|
||||
trades_full.reset_index(drop=True), filtered.reset_index(drop=True), check_exact=True
|
||||
)
|
||||
|
||||
|
||||
def test_feather_build_arrow_time_filter(feather_dh):
|
||||
# None timerange should return None
|
||||
assert feather_dh._build_arrow_time_filter(None) is None
|
||||
|
||||
# Fully open (both bounds 0) should return None
|
||||
tr_fully_open = TimeRange(None, None, startts=0, stopts=0)
|
||||
assert feather_dh._build_arrow_time_filter(tr_fully_open) is None
|
||||
|
||||
# Open start (startts=0) should return stop filter only
|
||||
tr_open_start = TimeRange(None, "date", startts=0, stopts=1000)
|
||||
filter_open_start = feather_dh._build_arrow_time_filter(tr_open_start)
|
||||
assert filter_open_start is not None
|
||||
# Should be a single expression (timestamp <= stopts)
|
||||
assert str(filter_open_start).count("<=") == 1
|
||||
assert str(filter_open_start).count(">=") == 0
|
||||
|
||||
# Open end (stopts=0) should return start filter only
|
||||
tr_open_end = TimeRange("date", None, startts=500, stopts=0)
|
||||
filter_open_end = feather_dh._build_arrow_time_filter(tr_open_end)
|
||||
assert filter_open_end is not None
|
||||
# Should be a single expression (timestamp >= startts)
|
||||
assert str(filter_open_end).count(">=") == 1
|
||||
assert str(filter_open_end).count("<=") == 0
|
||||
|
||||
# Closed range should return combined filter
|
||||
tr_closed = TimeRange("date", "date", startts=500, stopts=1000)
|
||||
filter_closed = feather_dh._build_arrow_time_filter(tr_closed)
|
||||
assert filter_closed is not None
|
||||
# Should contain both >= and <= (combined with &)
|
||||
filter_str = str(filter_closed)
|
||||
assert ">=" in filter_str
|
||||
assert "<=" in filter_str
|
||||
|
||||
Reference in New Issue
Block a user