feat: Support for unbounded start and/or end in timerange filtering, added associated tests

This commit is contained in:
Maxime Pagnoulle
2025-08-30 17:38:23 +02:00
parent 3ff1e31a81
commit 9d5295fdb4
2 changed files with 119 additions and 18 deletions

View File

@@ -112,6 +112,36 @@ class FeatherDataHandler(IDataHandler):
"""
raise NotImplementedError()
def _build_arrow_time_filter(self, timerange: TimeRange | None):
"""
Build Arrow predicate filter for timerange filtering.
Treats 0 as unbounded (no filter on that side).
:param timerange: TimeRange object with start/stop timestamps
:return: Arrow filter expression or None if fully unbounded
"""
if not timerange:
return None
# Treat 0 as unbounded
start_set = bool(timerange.startts and timerange.startts > 0)
stop_set = bool(timerange.stopts and timerange.stopts > 0)
if not (start_set or stop_set):
return None
ts_field = dataset.field("timestamp")
exprs = []
if start_set:
exprs.append(ts_field >= timerange.startts)
if stop_set:
exprs.append(ts_field <= timerange.stopts)
if len(exprs) == 1:
return exprs[0]
else:
return exprs[0] & exprs[1]
def _trades_load(
self, pair: str, trading_mode: TradingMode, timerange: TimeRange | None = None
) -> DataFrame:
@@ -126,20 +156,22 @@ class FeatherDataHandler(IDataHandler):
if not filename.exists():
return DataFrame(columns=DEFAULT_TRADES_COLUMNS)
# Load trades data with optional timerange filtering
if timerange is None:
# No timerange filter - load entire file
logger.debug(f"Loading entire trades file for {pair}")
tradesdata = read_feather(filename)
else:
# Use Arrow dataset with predicate pushdown for efficient filtering
# Use Arrow dataset with optional timerange filtering, fallback to read_feather
try:
dataset_reader = dataset.dataset(filename, format="feather")
time_filter = (dataset.field("timestamp") >= timerange.startts) & (
dataset.field("timestamp") <= timerange.stopts
)
time_filter = self._build_arrow_time_filter(timerange)
if time_filter is not None and timerange is not None:
tradesdata = dataset_reader.to_table(filter=time_filter).to_pandas()
logger.debug(f"Loaded {len(tradesdata)} trades for {pair}")
start_desc = timerange.startts if timerange.startts > 0 else "unbounded"
stop_desc = timerange.stopts if timerange.stopts > 0 else "unbounded"
logger.debug(
f"Loaded {len(tradesdata)} trades for {pair} "
f"(filtered start={start_desc}, stop={stop_desc})"
)
else:
tradesdata = dataset_reader.to_table().to_pandas()
logger.debug(f"Loaded {len(tradesdata)} trades for {pair} (unfiltered)")
except (ImportError, AttributeError, ValueError) as e:
# Fallback: load entire file

View File

@@ -571,3 +571,72 @@ def test_feather_trades_timerange_pushdown_fallback(
"Unable to use Arrow filtering, loading entire trades file" in r.message
for r in caplog.records
)
def test_feather_trades_timerange_open_start(feather_dh, trades_full):
# Open start: stop timestamp but no start (startts=0)
stop_ts = int(trades_full["timestamp"].iloc[(2 * len(trades_full)) // 3])
tr = TimeRange(None, "date", startts=0, stopts=stop_ts)
filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr)
assert 0 < len(filtered) < len(trades_full)
assert filtered["timestamp"].max() <= stop_ts
# First row should match full's first row
assert filtered.iloc[0]["timestamp"] == trades_full.iloc[0]["timestamp"]
def test_feather_trades_timerange_open_end(feather_dh, trades_full):
# Open end: start timestamp but no stop (stopts=0)
start_ts = int(trades_full["timestamp"].iloc[len(trades_full) // 3])
tr = TimeRange("date", None, startts=start_ts, stopts=0)
filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr)
assert 0 < len(filtered) < len(trades_full)
assert filtered["timestamp"].min() >= start_ts
# Last row should match full's last row
assert filtered.iloc[-1]["timestamp"] == trades_full.iloc[-1]["timestamp"]
def test_feather_trades_timerange_fully_open(feather_dh, trades_full):
# Fully open: no start or stop bounds (both 0)
tr = TimeRange(None, None, startts=0, stopts=0)
filtered = feather_dh.trades_load("XRP/ETH", TradingMode.SPOT, timerange=tr)
# Should equal unfiltered load
assert_frame_equal(
trades_full.reset_index(drop=True), filtered.reset_index(drop=True), check_exact=True
)
def test_feather_build_arrow_time_filter(feather_dh):
# None timerange should return None
assert feather_dh._build_arrow_time_filter(None) is None
# Fully open (both bounds 0) should return None
tr_fully_open = TimeRange(None, None, startts=0, stopts=0)
assert feather_dh._build_arrow_time_filter(tr_fully_open) is None
# Open start (startts=0) should return stop filter only
tr_open_start = TimeRange(None, "date", startts=0, stopts=1000)
filter_open_start = feather_dh._build_arrow_time_filter(tr_open_start)
assert filter_open_start is not None
# Should be a single expression (timestamp <= stopts)
assert str(filter_open_start).count("<=") == 1
assert str(filter_open_start).count(">=") == 0
# Open end (stopts=0) should return start filter only
tr_open_end = TimeRange("date", None, startts=500, stopts=0)
filter_open_end = feather_dh._build_arrow_time_filter(tr_open_end)
assert filter_open_end is not None
# Should be a single expression (timestamp >= startts)
assert str(filter_open_end).count(">=") == 1
assert str(filter_open_end).count("<=") == 0
# Closed range should return combined filter
tr_closed = TimeRange("date", "date", startts=500, stopts=1000)
filter_closed = feather_dh._build_arrow_time_filter(tr_closed)
assert filter_closed is not None
# Should contain both >= and <= (combined with &)
filter_str = str(filter_closed)
assert ">=" in filter_str
assert "<=" in filter_str