Merge pull request #9066 from TheJoeSchr/feature/fetch-public-trades

[Feature] fetch public trades
This commit is contained in:
Matthias
2024-07-21 20:00:54 +02:00
committed by GitHub
21 changed files with 2483 additions and 13 deletions

152
docs/advanced-orderflow.md Normal file
View File

@@ -0,0 +1,152 @@
# Orderflow data
This guide walks you through utilizing public trade data for advanced orderflow analysis in Freqtrade.
!!! Warning "Experimental Feature"
The orderflow feature is currently in beta and may be subject to changes in future releases. Please report any issues or feedback on the [Freqtrade GitHub repository](https://github.com/freqtrade/freqtrade/issues).
!!! Warning "Performance"
Orderflow requires raw trades data. This data is rather large, and can cause a slow initial startup, when freqtrade needs to download the trades data for the last X candles. Additionally, enabling this feature will cause increased memory usage. Please ensure to have sufficient resources available.
## Getting Started
### Enable Public Trades
In your `config.json` file, set the `use_public_trades` option to true under the `exchange` section.
```json
"exchange": {
...
"use_public_trades": true,
}
```
### Configure Orderflow Processing
Define your desired settings for orderflow processing within the orderflow section of config.json. Here, you can adjust factors like:
- `cache_size`: How many previous orderflow candles are saved into cache instead of calculated every new candle
- `max_candles`: Filter how many candles would you like to get trades data for.
- `scale`: This controls the price bin size for the footprint chart.
- `stacked_imbalance_range`: Defines the minimum consecutive imbalanced price levels required for consideration.
- `imbalance_volume`: Filters out imbalances with volume below this threshold.
- `imbalance_ratio`: Filters out imbalances with a ratio (difference between ask and bid volume) lower than this value.
```json
"orderflow": {
"cache_size": 1000,
"max_candles": 1500,
"scale": 0.5,
"stacked_imbalance_range": 3, // needs at least this amount of imbalance next to each other
"imbalance_volume": 1, // filters out below
"imbalance_ratio": 3 // filters out ratio lower than
},
```
## Downloading Trade Data for Backtesting
To download historical trade data for backtesting, use the --dl-trades flag with the freqtrade download-data command.
```bash
freqtrade download-data -p BTC/USDT:USDT --timerange 20230101- --trading-mode futures --timeframes 5m --dl-trades
```
!!! Warning "Data availability"
Not all exchanges provide public trade data. For supported exchanges, freqtrade will warn you if public trade data is not available if you start downloading data with the `--dl-trades` flag.
## Accessing Orderflow Data
Once activated, several new columns become available in your dataframe:
``` python
dataframe["trades"] # Contains information about each individual trade.
dataframe["orderflow"] # Represents a footprint chart dict (see below)
dataframe["imbalances"] # Contains information about imbalances in the order flow.
dataframe["bid"] # Total bid volume
dataframe["ask"] # Total ask volume
dataframe["delta"] # Difference between ask and bid volume.
dataframe["min_delta"] # Minimum delta within the candle
dataframe["max_delta"] # Maximum delta within the candle
dataframe["total_trades"] # Total number of trades
dataframe["stacked_imbalances_bid"] # Price level of stacked bid imbalance
dataframe["stacked_imbalances_ask"] # Price level of stacked ask imbalance
```
You can access these columns in your strategy code for further analysis. Here's an example:
``` python
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Calculating cumulative delta
dataframe["cum_delta"] = cumulative_delta(dataframe["delta"])
# Accessing total trades
total_trades = dataframe["total_trades"]
...
def cumulative_delta(delta: Series):
cumdelta = delta.cumsum()
return cumdelta
```
### Footprint chart (`dataframe["orderflow"]`)
This column provides a detailed breakdown of buy and sell orders at different price levels, offering valuable insights into order flow dynamics. The `scale` parameter in your configuration determines the price bin size for this representation
The `orderflow` column contains a dict with the following structure:
``` output
{
"price": {
"bid_amount": 0.0,
"ask_amount": 0.0,
"bid": 0,
"ask": 0,
"delta": 0.0,
"total_volume": 0.0,
"total_trades": 0
}
}
```
#### Orderflow column explanation
- key: Price bin - binned at `scale` intervals
- `bid_amount`: Total volume bought at each price level.
- `ask_amount`: Total volume sold at each price level.
- `bid`: Number of buy orders at each price level.
- `ask`: Number of sell orders at each price level.
- `delta`: Difference between ask and bid volume at each price level.
- `total_volume`: Total volume (ask amount + bid amount) at each price level.
- `total_trades`: Total number of trades (ask + bid) at each price level.
By leveraging these features, you can gain valuable insights into market sentiment and potential trading opportunities based on order flow analysis.
### Raw trades data (`dataframe["trades"]`)
List with the individual trades that occurred during the candle. This data can be used for more granular analysis of order flow dynamics.
Each individual entry contains a dict with the following keys:
- `timestamp`: Timestamp of the trade.
- `date`: Date of the trade.
- `price`: Price of the trade.
- `amount`: Volume of the trade.
- `side`: Buy or sell.
- `id`: Unique identifier for the trade.
- `cost`: Total cost of the trade (price * amount).
### Imbalances (`dataframe["imbalances"]`)
This column provides a dict with information about imbalances in the order flow. An imbalance occurs when there is a significant difference between the ask and bid volume at a given price level.
Each row looks as follows - with price as index, and the corresponding bid and ask imbalance values as columns
``` output
{
"price": {
"bid_imbalance": False,
"ask_imbalance": False
}
}
```

View File

@@ -83,6 +83,7 @@ def validate_config_consistency(conf: Dict[str, Any], *, preliminary: bool = Fal
_validate_freqai_include_timeframes(conf, preliminary=preliminary)
_validate_consumers(conf)
validate_migrated_strategy_settings(conf)
_validate_orderflow(conf)
# validate configuration before returning
logger.info("Validating configuration ...")
@@ -421,6 +422,14 @@ def _validate_consumers(conf: Dict[str, Any]) -> None:
)
def _validate_orderflow(conf: Dict[str, Any]) -> None:
if conf.get("exchange", {}).get("use_public_trades"):
if "orderflow" not in conf:
raise ConfigurationError(
"Orderflow is a required configuration key when using public trades."
)
def _strategy_settings(conf: Dict[str, Any]) -> None:
process_deprecated_setting(conf, None, "use_sell_signal", None, "use_exit_signal")
process_deprecated_setting(conf, None, "sell_profit_only", None, "exit_profit_only")

View File

@@ -4,7 +4,7 @@
bot constants
"""
from typing import Any, Dict, List, Literal, Tuple
from typing import Any, Dict, List, Literal, Optional, Tuple
from freqtrade.enums import CandleType, PriceType, RPCMessageType
@@ -68,6 +68,7 @@ DEFAULT_DATAFRAME_COLUMNS = ["date", "open", "high", "low", "close", "volume"]
# Don't modify sequence of DEFAULT_TRADES_COLUMNS
# it has wide consequences for stored trades files
DEFAULT_TRADES_COLUMNS = ["timestamp", "id", "type", "side", "price", "amount", "cost"]
DEFAULT_ORDERFLOW_COLUMNS = ["level", "bid", "ask", "delta"]
TRADES_DTYPES = {
"timestamp": "int64",
"id": "str",
@@ -533,6 +534,24 @@ CONF_SCHEMA = {
},
"position_adjustment_enable": {"type": "boolean"},
"max_entry_position_adjustment": {"type": ["integer", "number"], "minimum": -1},
"orderflow": {
"type": "object",
"properties": {
"cache_size": {"type": "number", "minimum": 1, "default": 1500},
"max_candles": {"type": "number", "minimum": 1, "default": 1500},
"scale": {"type": "number", "minimum": 0.0},
"stacked_imbalance_range": {"type": "number", "minimum": 0},
"imbalance_volume": {"type": "number", "minimum": 0},
"imbalance_ratio": {"type": "number", "minimum": 0.0},
},
"required": [
"max_candles",
"scale",
"stacked_imbalance_range",
"imbalance_volume",
"imbalance_ratio",
],
},
},
"definitions": {
"exchange": {
@@ -771,6 +790,9 @@ ListPairsWithTimeframes = List[PairWithTimeframe]
# Type for trades list
TradeList = List[List]
# ticks, pair, timeframe, CandleType
TickWithTimeframe = Tuple[str, str, CandleType, Optional[int], Optional[int]]
ListTicksWithTimeframes = List[TickWithTimeframe]
LongShort = Literal["long", "short"]
EntryExit = Literal["entry", "exit"]

View File

@@ -8,6 +8,7 @@ from freqtrade.data.converter.converter import (
trim_dataframe,
trim_dataframes,
)
from freqtrade.data.converter.orderflow import populate_dataframe_with_trades
from freqtrade.data.converter.trade_converter import (
convert_trades_format,
convert_trades_to_ohlcv,
@@ -30,6 +31,7 @@ __all__ = [
"trim_dataframes",
"convert_trades_format",
"convert_trades_to_ohlcv",
"populate_dataframe_with_trades",
"trades_convert_types",
"trades_df_remove_duplicates",
"trades_dict_to_list",

View File

@@ -0,0 +1,295 @@
"""
Functions to convert orderflow data from public_trades
"""
import logging
import time
import typing
from collections import OrderedDict
from datetime import datetime
from typing import Tuple
import numpy as np
import pandas as pd
from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS
from freqtrade.enums import RunMode
from freqtrade.exceptions import DependencyException
logger = logging.getLogger(__name__)
def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame):
"""
Populates a dataframe with trades columns
:param dataframe: Dataframe to populate
"""
# Initialize columns with appropriate dtypes
dataframe["trades"] = np.nan
dataframe["orderflow"] = np.nan
dataframe["imbalances"] = np.nan
dataframe["stacked_imbalances_bid"] = np.nan
dataframe["stacked_imbalances_ask"] = np.nan
dataframe["max_delta"] = np.nan
dataframe["min_delta"] = np.nan
dataframe["bid"] = np.nan
dataframe["ask"] = np.nan
dataframe["delta"] = np.nan
dataframe["total_trades"] = np.nan
# Ensure the 'trades' column is of object type
dataframe["trades"] = dataframe["trades"].astype(object)
dataframe["orderflow"] = dataframe["orderflow"].astype(object)
dataframe["imbalances"] = dataframe["imbalances"].astype(object)
dataframe["stacked_imbalances_bid"] = dataframe["stacked_imbalances_bid"].astype(object)
dataframe["stacked_imbalances_ask"] = dataframe["stacked_imbalances_ask"].astype(object)
def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
from freqtrade.exchange import timeframe_to_next_date, timeframe_to_resample_freq
timeframe_frequency = timeframe_to_resample_freq(timeframe)
# calculate ohlcv candle start and end
if df is not None and not df.empty:
df["datetime"] = pd.to_datetime(df["date"], unit="ms")
df["candle_start"] = df["datetime"].dt.floor(timeframe_frequency)
# used in _now_is_time_to_refresh_trades
df["candle_end"] = df["candle_start"].apply(
lambda candle_start: timeframe_to_next_date(timeframe, candle_start)
)
df.drop(columns=["datetime"], inplace=True)
def populate_dataframe_with_trades(
cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], pd.DataFrame],
config,
dataframe: pd.DataFrame,
trades: pd.DataFrame,
) -> Tuple[pd.DataFrame, OrderedDict[Tuple[datetime, datetime], pd.DataFrame]]:
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
:param trades: Trades to populate with
:return: Dataframe with trades populated
"""
timeframe = config["timeframe"]
config_orderflow = config["orderflow"]
# create columns for trades
_init_dataframe_with_trades_columns(dataframe)
try:
start_time = time.time()
# calculate ohlcv candle start and end
_calculate_ohlcv_candle_start_and_end(trades, timeframe)
# get date of earliest max_candles candle
max_candles = config_orderflow["max_candles"]
start_date = dataframe.tail(max_candles).date.iat[0]
# slice of trades that are before current ohlcv candles to make groupby faster
trades = trades.loc[trades.candle_start >= start_date]
trades.reset_index(inplace=True, drop=True)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False)
# Create Series to hold complex data
trades_series = pd.Series(index=dataframe.index, dtype=object)
orderflow_series = pd.Series(index=dataframe.index, dtype=object)
imbalances_series = pd.Series(index=dataframe.index, dtype=object)
stacked_imbalances_bid_series = pd.Series(index=dataframe.index, dtype=object)
stacked_imbalances_ask_series = pd.Series(index=dataframe.index, dtype=object)
trades_grouped_by_candle_start = trades.groupby("candle_start", group_keys=False)
for candle_start, trades_grouped_df in trades_grouped_by_candle_start:
is_between = candle_start == dataframe["date"]
if is_between.any():
from freqtrade.exchange import timeframe_to_next_date
candle_next = timeframe_to_next_date(timeframe, typing.cast(datetime, candle_start))
if candle_next not in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades "
f"might be unfinished, because no finished trades at {candle_next}"
)
indices = dataframe.index[is_between].tolist()
# Add trades to each candle
trades_series.loc[indices] = [
trades_grouped_df.drop(columns=["candle_start", "candle_end"]).to_dict(
orient="records"
)
]
# Use caching mechanism
if (candle_start, candle_next) in cached_grouped_trades:
cache_entry = cached_grouped_trades[
(typing.cast(datetime, candle_start), candle_next)
]
# dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround:
# Create a dictionary of the column values to be assigned
update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns}
# Assign the values using the update_dict
dataframe.loc[is_between, update_dict.keys()] = pd.DataFrame(
[update_dict], index=dataframe.loc[is_between].index
)
continue
# Calculate orderflow for each candle
orderflow = trades_to_volumeprofile_with_total_delta_bid_ask(
trades_grouped_df, scale=config_orderflow["scale"]
)
orderflow_series.loc[indices] = [orderflow.to_dict(orient="index")]
# Calculate imbalances for each candle's orderflow
imbalances = trades_orderflow_to_imbalances(
orderflow,
imbalance_ratio=config_orderflow["imbalance_ratio"],
imbalance_volume=config_orderflow["imbalance_volume"],
)
imbalances_series.loc[indices] = [imbalances.to_dict(orient="index")]
stacked_imbalance_range = config_orderflow["stacked_imbalance_range"]
stacked_imbalances_bid_series.loc[indices] = [
stacked_imbalance_bid(
imbalances, stacked_imbalance_range=stacked_imbalance_range
)
]
stacked_imbalances_ask_series.loc[indices] = [
stacked_imbalance_ask(
imbalances, stacked_imbalance_range=stacked_imbalance_range
)
]
bid = np.where(
trades_grouped_df["side"].str.contains("sell"), trades_grouped_df["amount"], 0
)
ask = np.where(
trades_grouped_df["side"].str.contains("buy"), trades_grouped_df["amount"], 0
)
deltas_per_trade = ask - bid
min_delta = deltas_per_trade.cumsum().min()
max_delta = deltas_per_trade.cumsum().max()
dataframe.loc[indices, "max_delta"] = max_delta
dataframe.loc[indices, "min_delta"] = min_delta
dataframe.loc[indices, "bid"] = bid.sum()
dataframe.loc[indices, "ask"] = ask.sum()
dataframe.loc[indices, "delta"] = (
dataframe.loc[indices, "ask"] - dataframe.loc[indices, "bid"]
)
dataframe.loc[indices, "total_trades"] = len(trades_grouped_df)
# Cache the result
cached_grouped_trades[(typing.cast(datetime, candle_start), candle_next)] = (
dataframe.loc[is_between].copy()
)
# Maintain cache size
if (
config.get("runmode") in (RunMode.DRY_RUN, RunMode.LIVE)
and len(cached_grouped_trades) > config_orderflow["cache_size"]
):
cached_grouped_trades.popitem(last=False)
else:
logger.debug(f"Found NO candles for trades starting with {candle_start}")
logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds")
# Merge the complex data Series back into the DataFrame
dataframe["trades"] = trades_series
dataframe["orderflow"] = orderflow_series
dataframe["imbalances"] = imbalances_series
dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series
dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series
except Exception as e:
logger.exception("Error populating dataframe with trades")
raise DependencyException(e)
return dataframe, cached_grouped_trades
def trades_to_volumeprofile_with_total_delta_bid_ask(
trades: pd.DataFrame, scale: float
) -> pd.DataFrame:
"""
:param trades: dataframe
:param scale: scale aka bin size e.g. 0.5
:return: trades binned to levels according to scale aka orderflow
"""
df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS)
# create bid, ask where side is sell or buy
df["bid_amount"] = np.where(trades["side"].str.contains("sell"), trades["amount"], 0)
df["ask_amount"] = np.where(trades["side"].str.contains("buy"), trades["amount"], 0)
df["bid"] = np.where(trades["side"].str.contains("sell"), 1, 0)
df["ask"] = np.where(trades["side"].str.contains("buy"), 1, 0)
# round the prices to the nearest multiple of the scale
df["price"] = ((trades["price"] / scale).round() * scale).astype("float64").values
if df.empty:
df["total"] = np.nan
df["delta"] = np.nan
return df
df["delta"] = df["ask_amount"] - df["bid_amount"]
df["total_volume"] = df["ask_amount"] + df["bid_amount"]
df["total_trades"] = df["ask"] + df["bid"]
# group to bins aka apply scale
df = df.groupby("price").sum(numeric_only=True)
return df
def trades_orderflow_to_imbalances(df: pd.DataFrame, imbalance_ratio: int, imbalance_volume: int):
"""
:param df: dataframes with bid and ask
:param imbalance_ratio: imbalance_ratio e.g. 3
:param imbalance_volume: imbalance volume e.g. 10
:return: dataframe with bid and ask imbalance
"""
bid = df.bid
# compares bid and ask diagonally
ask = df.ask.shift(-1)
bid_imbalance = (bid / ask) > (imbalance_ratio)
# overwrite bid_imbalance with False if volume is not big enough
bid_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, bid_imbalance)
ask_imbalance = (ask / bid) > (imbalance_ratio)
# overwrite ask_imbalance with False if volume is not big enough
ask_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, ask_imbalance)
dataframe = pd.DataFrame(
{"bid_imbalance": bid_imbalance_filtered, "ask_imbalance": ask_imbalance_filtered},
index=df.index,
)
return dataframe
def stacked_imbalance(
df: pd.DataFrame, label: str, stacked_imbalance_range: int, should_reverse: bool
):
"""
y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)
https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array
"""
imbalance = df[f"{label}_imbalance"]
int_series = pd.Series(np.where(imbalance, 1, 0))
stacked = int_series * (
int_series.groupby((int_series != int_series.shift()).cumsum()).cumcount() + 1
)
max_stacked_imbalance_idx = stacked.index[stacked >= stacked_imbalance_range]
stacked_imbalance_price = np.nan
if not max_stacked_imbalance_idx.empty:
idx = (
max_stacked_imbalance_idx[0]
if not should_reverse
else np.flipud(max_stacked_imbalance_idx)[0]
)
stacked_imbalance_price = imbalance.index[idx]
return stacked_imbalance_price
def stacked_imbalance_ask(df: pd.DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, "ask", stacked_imbalance_range, should_reverse=True)
def stacked_imbalance_bid(df: pd.DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, "bid", stacked_imbalance_range, should_reverse=False)

View File

@@ -19,8 +19,8 @@ from freqtrade.constants import (
ListPairsWithTimeframes,
PairWithTimeframe,
)
from freqtrade.data.history import load_pair_history
from freqtrade.enums import CandleType, RPCMessageType, RunMode
from freqtrade.data.history import get_datahandler, load_pair_history
from freqtrade.enums import CandleType, RPCMessageType, RunMode, TradingMode
from freqtrade.exceptions import ExchangeError, OperationalException
from freqtrade.exchange import Exchange, timeframe_to_prev_date, timeframe_to_seconds
from freqtrade.exchange.types import OrderBook
@@ -445,7 +445,20 @@ class DataProvider:
if self._exchange is None:
raise OperationalException(NO_EXCHANGE_EXCEPTION)
final_pairs = (pairlist + helping_pairs) if helping_pairs else pairlist
# refresh latest ohlcv data
self._exchange.refresh_latest_ohlcv(final_pairs)
# refresh latest trades data
self.refresh_latest_trades(pairlist)
def refresh_latest_trades(self, pairlist: ListPairsWithTimeframes) -> None:
"""
Refresh latest trades data (if enabled in config)
"""
use_public_trades = self._config.get("exchange", {}).get("use_public_trades", False)
if use_public_trades:
if self._exchange:
self._exchange.refresh_latest_trades(pairlist)
@property
def available_pairs(self) -> ListPairsWithTimeframes:
@@ -483,6 +496,45 @@ class DataProvider:
else:
return DataFrame()
def trades(
self, pair: str, timeframe: Optional[str] = None, copy: bool = True, candle_type: str = ""
) -> DataFrame:
"""
Get candle (TRADES) data for the given pair as DataFrame
Please use the `available_pairs` method to verify which pairs are currently cached.
This is not meant to be used in callbacks because of lookahead bias.
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
:param candle_type: '', mark, index, premiumIndex, or funding_rate
:param copy: copy dataframe before returning if True.
Use False only for read-only operations (where the dataframe is not modified)
"""
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
if self._exchange is None:
raise OperationalException(NO_EXCHANGE_EXCEPTION)
_candle_type = (
CandleType.from_string(candle_type)
if candle_type != ""
else self._config["candle_type_def"]
)
return self._exchange.trades(
(pair, timeframe or self._config["timeframe"], _candle_type), copy=copy
)
elif self.runmode in (RunMode.BACKTEST, RunMode.HYPEROPT):
_candle_type = (
CandleType.from_string(candle_type)
if candle_type != ""
else self._config["candle_type_def"]
)
data_handler = get_datahandler(
self._config["datadir"], data_format=self._config["dataformat_trades"]
)
trades_df = data_handler.trades_load(pair, TradingMode.FUTURES)
return trades_df
else:
return DataFrame()
def market(self, pair: str) -> Optional[Dict[str, Any]]:
"""
Return market data for the pair

View File

@@ -488,7 +488,7 @@ def _download_trades_history(
return True
except Exception:
logger.exception(f'Failed to download historic trades for pair: "{pair}". ')
logger.exception(f'Failed to download and store historic trades for pair: "{pair}". ')
return False

View File

@@ -22,6 +22,7 @@ from pandas import DataFrame, concat
from freqtrade.constants import (
DEFAULT_AMOUNT_RESERVE_PERCENT,
DEFAULT_TRADES_COLUMNS,
NON_OPEN_EXCHANGE_STATES,
BidAsk,
BuySell,
@@ -33,7 +34,13 @@ from freqtrade.constants import (
OBLiteral,
PairWithTimeframe,
)
from freqtrade.data.converter import clean_ohlcv_dataframe, ohlcv_to_dataframe, trades_dict_to_list
from freqtrade.data.converter import (
clean_ohlcv_dataframe,
ohlcv_to_dataframe,
trades_df_remove_duplicates,
trades_dict_to_list,
trades_list_to_df,
)
from freqtrade.enums import (
OPTIMIZE_MODES,
TRADE_MODES,
@@ -123,6 +130,7 @@ class Exchange:
"tickers_have_quoteVolume": True,
"tickers_have_bid_ask": True, # bid / ask empty for fetch_tickers
"tickers_have_price": True,
"trades_limit": 1000, # Limit for 1 call to fetch_trades
"trades_pagination": "time", # Possible are "time" or "id"
"trades_pagination_arg": "since",
"trades_has_history": False,
@@ -194,6 +202,9 @@ class Exchange:
self._klines: Dict[PairWithTimeframe, DataFrame] = {}
self._expiring_candle_cache: Dict[Tuple[str, int], PeriodicCache] = {}
# Holds public_trades
self._trades: Dict[PairWithTimeframe, DataFrame] = {}
# Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {}
@@ -222,6 +233,8 @@ class Exchange:
# Assign this directly for easy access
self._ohlcv_partial_candle = self._ft_has["ohlcv_partial_candle"]
self._max_trades_limit = self._ft_has["trades_limit"]
self._trades_pagination = self._ft_has["trades_pagination"]
self._trades_pagination_arg = self._ft_has["trades_pagination_arg"]
@@ -520,6 +533,15 @@ class Exchange:
else:
return DataFrame()
def trades(self, pair_interval: PairWithTimeframe, copy: bool = True) -> DataFrame:
if pair_interval in self._trades:
if copy:
return self._trades[pair_interval].copy()
else:
return self._trades[pair_interval]
else:
return DataFrame()
def get_contract_size(self, pair: str) -> Optional[float]:
if self.trading_mode == TradingMode.FUTURES:
market = self.markets.get(pair, {})
@@ -2600,6 +2622,171 @@ class Exchange:
data = [[x["timestamp"], x["fundingRate"], 0, 0, 0, 0] for x in data]
return data
# fetch Trade data stuff
def needed_candle_for_trades_ms(self, timeframe: str, candle_type: CandleType) -> int:
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
tf_s = timeframe_to_seconds(timeframe)
candles_fetched = candle_limit * self.required_candle_call_count
max_candles = self._config["orderflow"]["max_candles"]
required_candles = min(max_candles, candles_fetched)
move_to = (
tf_s * candle_limit * required_candles
if required_candles > candle_limit
else (max_candles + 1) * tf_s
)
now = timeframe_to_next_date(timeframe)
return int((now - timedelta(seconds=move_to)).timestamp() * 1000)
def _process_trades_df(
self,
pair: str,
timeframe: str,
c_type: CandleType,
ticks: List[List],
cache: bool,
first_required_candle_date: int,
) -> DataFrame:
# keeping parsed dataframe in cache
trades_df = trades_list_to_df(ticks, True)
if cache:
if (pair, timeframe, c_type) in self._trades:
old = self._trades[(pair, timeframe, c_type)]
# Reassign so we return the updated, combined df
combined_df = concat([old, trades_df], axis=0)
logger.debug(f"Clean duplicated ticks from Trades data {pair}")
trades_df = DataFrame(
trades_df_remove_duplicates(combined_df), columns=combined_df.columns
)
# Age out old candles
trades_df = trades_df[first_required_candle_date < trades_df["timestamp"]]
trades_df = trades_df.reset_index(drop=True)
self._trades[(pair, timeframe, c_type)] = trades_df
return trades_df
def refresh_latest_trades(
self,
pair_list: ListPairsWithTimeframes,
*,
cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]:
"""
Refresh in-memory TRADES asynchronously and set `_trades` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
Only used in the dataprovider.refresh() method.
:param pair_list: List of 3 element tuples containing (pair, timeframe, candle_type)
:param cache: Assign result to _trades. Useful for one-off downloads like for pairlists
:return: Dict of [{(pair, timeframe): Dataframe}]
"""
from freqtrade.data.history import get_datahandler
data_handler = get_datahandler(
self._config["datadir"], data_format=self._config["dataformat_trades"]
)
logger.debug("Refreshing TRADES data for %d pairs", len(pair_list))
since_ms = None
results_df = {}
for pair, timeframe, candle_type in set(pair_list):
new_ticks: List = []
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"])
first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type)
# refresh, if
# a. not in _trades
# b. no cache used
# c. need new data
is_in_cache = (pair, timeframe, candle_type) in self._trades
if (
not is_in_cache
or not cache
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)
):
logger.debug(f"Refreshing TRADES data for {pair}")
# fetch trades since latest _trades and
# store together with existing trades
try:
until = None
from_id = None
if is_in_cache:
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"]
until = dt_ts() # now
else:
until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000
all_stored_ticks_df = data_handler.trades_load(
f"{pair}-cached", self.trading_mode
)
if not all_stored_ticks_df.empty:
if (
all_stored_ticks_df.iloc[-1]["timestamp"] > first_candle_ms
and all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms
):
# Use cache and populate further
last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"]
from_id = all_stored_ticks_df.iloc[-1]["id"]
# only use cached if it's closer than first_candle_ms
since_ms = (
last_cached_ms
if last_cached_ms > first_candle_ms
else first_candle_ms
)
else:
# Skip cache, it's too old
all_stored_ticks_df = DataFrame(
columns=DEFAULT_TRADES_COLUMNS + ["date"]
)
# from_id overrules with exchange set to id paginate
[_, new_ticks] = self.get_historic_trades(
pair,
since=since_ms if since_ms else first_candle_ms,
until=until,
from_id=from_id,
)
except Exception:
logger.exception(f"Refreshing TRADES data for {pair} failed")
continue
if new_ticks:
all_stored_ticks_list = all_stored_ticks_df[
DEFAULT_TRADES_COLUMNS
].values.tolist()
all_stored_ticks_list.extend(new_ticks)
trades_df = self._process_trades_df(
pair,
timeframe,
candle_type,
all_stored_ticks_list,
cache,
first_required_candle_date=first_candle_ms,
)
results_df[(pair, timeframe, candle_type)] = trades_df
data_handler.trades_store(
f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode
)
else:
logger.error(f"No new ticks for {pair}")
return results_df
def _now_is_time_to_refresh_trades(
self, pair: str, timeframe: str, candle_type: CandleType
) -> bool: # Timeframe in seconds
trades = self.trades((pair, timeframe, candle_type), False)
pair_last_refreshed = int(trades.iloc[-1]["timestamp"])
full_candle = (
int(timeframe_to_next_date(timeframe, dt_from_ts(pair_last_refreshed)).timestamp())
* 1000
)
now = dt_ts()
return full_candle <= now
# Fetch historic trades
@retrier_async
@@ -2614,10 +2801,11 @@ class Exchange:
returns: List of dicts containing trades, the next iteration value (new "since" or trade_id)
"""
try:
trades_limit = self._max_trades_limit
# fetch trades asynchronously
if params:
logger.debug("Fetching trades for pair %s, params: %s ", pair, params)
trades = await self._api_async.fetch_trades(pair, params=params, limit=1000)
trades = await self._api_async.fetch_trades(pair, params=params, limit=trades_limit)
else:
logger.debug(
"Fetching trades for pair %s, since %s %s...",
@@ -2625,7 +2813,7 @@ class Exchange:
since,
"(" + dt_from_ts(since).isoformat() + ") " if since is not None else "",
)
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
trades = await self._api_async.fetch_trades(pair, since=since, limit=trades_limit)
trades = self._trades_contracts_to_amount(trades)
pagination_value = self._get_trade_pagination_next_value(trades)
return trades_dict_to_list(trades), pagination_value

View File

@@ -5,6 +5,7 @@ This module defines the interface to apply for strategies
import logging
from abc import ABC, abstractmethod
from collections import OrderedDict
from datetime import datetime, timedelta, timezone
from math import isinf, isnan
from typing import Dict, List, Optional, Tuple, Union
@@ -12,6 +13,7 @@ from typing import Dict, List, Optional, Tuple, Union
from pandas import DataFrame
from freqtrade.constants import CUSTOM_TAG_MAX_LENGTH, Config, IntOrInf, ListPairsWithTimeframes
from freqtrade.data.converter import populate_dataframe_with_trades
from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import (
CandleType,
@@ -139,6 +141,11 @@ class IStrategy(ABC, HyperStrategyMixin):
# A self set parameter that represents the market direction. filled from configuration
market_direction: MarketDirection = MarketDirection.NONE
# Global cache dictionary
_cached_grouped_trades_per_pair: Dict[
str, OrderedDict[Tuple[datetime, datetime], DataFrame]
] = {}
def __init__(self, config: Config) -> None:
self.config = config
# Dict to determine if analysis is necessary
@@ -1040,6 +1047,7 @@ class IStrategy(ABC, HyperStrategyMixin):
dataframe = self.advise_indicators(dataframe, metadata)
dataframe = self.advise_entry(dataframe, metadata)
dataframe = self.advise_exit(dataframe, metadata)
logger.debug("TA Analysis Ended")
return dataframe
def _analyze_ticker_internal(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
@@ -1594,6 +1602,29 @@ class IStrategy(ABC, HyperStrategyMixin):
dataframe = self.advise_exit(dataframe, metadata)
return dataframe
def _if_enabled_populate_trades(self, dataframe: DataFrame, metadata: dict):
use_public_trades = self.config.get("exchange", {}).get("use_public_trades", False)
if use_public_trades:
trades = self.dp.trades(pair=metadata["pair"], copy=False)
config = self.config
config["timeframe"] = self.timeframe
pair = metadata["pair"]
# TODO: slice trades to size of dataframe for faster backtesting
cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], DataFrame] = (
self._cached_grouped_trades_per_pair.get(pair, OrderedDict())
)
dataframe, cached_grouped_trades = populate_dataframe_with_trades(
cached_grouped_trades, config, dataframe, trades
)
# dereference old cache
if pair in self._cached_grouped_trades_per_pair:
del self._cached_grouped_trades_per_pair[pair]
self._cached_grouped_trades_per_pair[pair] = cached_grouped_trades
logger.debug("Populated dataframe with trades.")
def advise_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Populate indicators that will be used in the Buy, Sell, short, exit_short strategy
@@ -1610,6 +1641,7 @@ class IStrategy(ABC, HyperStrategyMixin):
self, dataframe, metadata, inf_data, populate_fn
)
self._if_enabled_populate_trades(dataframe, metadata)
return self.populate_indicators(dataframe, metadata)
def advise_entry(self, dataframe: DataFrame, metadata: dict) -> DataFrame:

View File

@@ -48,6 +48,7 @@ nav:
- Recursive analysis: recursive-analysis.md
- Advanced Strategy: strategy-advanced.md
- Advanced Hyperopt: advanced-hyperopt.md
- Orderflow: advanced-orderflow.md
- Producer/Consumer mode: producer-consumer.md
- SQL Cheat-sheet: sql_cheatsheet.md
- Edge Positioning: edge.md

View File

@@ -614,6 +614,7 @@ def get_default_conf(testdatadir):
"internals": {},
"export": "none",
"dataformat_ohlcv": "feather",
"dataformat_trades": "feather",
"runmode": "dry_run",
"candle_type_def": CandleType.SPOT,
}

View File

@@ -0,0 +1,483 @@
from collections import OrderedDict
import numpy as np
import pandas as pd
import pytest
from freqtrade.constants import DEFAULT_TRADES_COLUMNS
from freqtrade.data.converter import populate_dataframe_with_trades
from freqtrade.data.converter.orderflow import trades_to_volumeprofile_with_total_delta_bid_ask
from freqtrade.data.converter.trade_converter import trades_list_to_df
BIN_SIZE_SCALE = 0.5
def read_csv(filename, converter_columns: list = ["side", "type"]):
return pd.read_csv(
filename,
skipinitialspace=True,
index_col=0,
parse_dates=True,
date_format="ISO8601",
converters={col: str.strip for col in converter_columns},
)
@pytest.fixture
def populate_dataframe_with_trades_dataframe(testdatadir):
return pd.read_feather(testdatadir / "orderflow/populate_dataframe_with_trades_DF.feather")
@pytest.fixture
def populate_dataframe_with_trades_trades(testdatadir):
return pd.read_feather(testdatadir / "orderflow/populate_dataframe_with_trades_TRADES.feather")
@pytest.fixture
def candles(testdatadir):
return pd.read_json(testdatadir / "orderflow/candles.json").copy()
@pytest.fixture
def public_trades_list(testdatadir):
return read_csv(testdatadir / "orderflow/public_trades_list.csv").copy()
@pytest.fixture
def public_trades_list_simple(testdatadir):
return read_csv(testdatadir / "orderflow/public_trades_list_simple_example.csv").copy()
def test_public_trades_columns_before_change(
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
assert populate_dataframe_with_trades_dataframe.columns.tolist() == [
"date",
"open",
"high",
"low",
"close",
"volume",
]
assert populate_dataframe_with_trades_trades.columns.tolist() == [
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]
def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow(
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
"""
Tests the `populate_dataframe_with_trades` function's order flow calculation.
This test checks the generated data frame and order flow for specific properties
based on the provided configuration and sample data.
"""
# Create copies of the input data to avoid modifying the originals
dataframe = populate_dataframe_with_trades_dataframe.copy()
trades = populate_dataframe_with_trades_trades.copy()
# Convert the 'date' column to datetime format with milliseconds
dataframe["date"] = pd.to_datetime(dataframe["date"], unit="ms")
# Select the last rows and reset the index (optional, depends on usage)
dataframe = dataframe.copy().tail().reset_index(drop=True)
# Define the configuration for order flow calculation
config = {
"timeframe": "5m",
"orderflow": {
"cache_size": 1000,
"max_candles": 1500,
"scale": 0.005,
"imbalance_volume": 0,
"imbalance_ratio": 3,
"stacked_imbalance_range": 3,
},
}
# Apply the function to populate the data frame with order flow data
df, _ = populate_dataframe_with_trades(OrderedDict(), config, dataframe, trades)
# Extract results from the first row of the DataFrame
results = df.iloc[0]
t = results["trades"]
of = results["orderflow"]
# Assert basic properties of the results
assert 0 != len(results)
assert 151 == len(t)
# --- Order Flow Analysis ---
# Assert number of order flow data points
assert 23 == len(of) # Assert expected number of data points
assert isinstance(of, dict)
of_values = list(of.values())
# Assert specific order flow values at the beginning of the DataFrame
assert of_values[0] == {
"bid": 0.0,
"ask": 1.0,
"delta": 4.999,
"bid_amount": 0.0,
"ask_amount": 4.999,
"total_volume": 4.999,
"total_trades": 1,
}
# Assert specific order flow values at the end of the DataFrame (excluding last row)
assert of_values[-1] == {
"bid": 0.0,
"ask": 1.0,
"delta": 0.103,
"bid_amount": 0.0,
"ask_amount": 0.103,
"total_volume": 0.103,
"total_trades": 1,
}
# Extract order flow from the last row of the DataFrame
of = df.iloc[-1]["orderflow"]
# Assert number of order flow data points in the last row
assert 19 == len(of) # Assert expected number of data points
of_values1 = list(of.values())
# Assert specific order flow values at the beginning of the last row
assert of_values1[0] == {
"bid": 1.0,
"ask": 0.0,
"delta": -12.536,
"bid_amount": 12.536,
"ask_amount": 0.0,
"total_volume": 12.536,
"total_trades": 1,
}
# Assert specific order flow values at the end of the last row
assert pytest.approx(of_values1[-1]) == {
"bid": 4.0,
"ask": 3.0,
"delta": -40.948,
"bid_amount": 59.182,
"ask_amount": 18.23399,
"total_volume": 77.416,
"total_trades": 7,
}
# --- Delta and Other Results ---
# Assert delta value from the first row
assert pytest.approx(results["delta"]) == -50.519
# Assert min and max delta values from the first row
assert results["min_delta"] == -79.469
assert results["max_delta"] == 17.298
# Assert that stacked imbalances are NaN (not applicable in this test)
assert np.isnan(results["stacked_imbalances_bid"])
assert np.isnan(results["stacked_imbalances_ask"])
# Repeat assertions for the third from last row
results = df.iloc[-2]
assert pytest.approx(results["delta"]) == -20.862
assert pytest.approx(results["min_delta"]) == -54.559999
assert 82.842 == results["max_delta"]
assert 234.99 == results["stacked_imbalances_bid"]
assert 234.96 == results["stacked_imbalances_ask"]
# Repeat assertions for the last row
results = df.iloc[-1]
assert pytest.approx(results["delta"]) == -49.302
assert results["min_delta"] == -70.222
assert pytest.approx(results["max_delta"]) == 11.213
assert np.isnan(results["stacked_imbalances_bid"])
assert np.isnan(results["stacked_imbalances_ask"])
def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
"""
Tests the `populate_dataframe_with_trades` function's handling of trades,
ensuring correct integration of trades data into the generated DataFrame.
"""
# Create copies of the input data to avoid modifying the originals
dataframe = populate_dataframe_with_trades_dataframe.copy()
trades = populate_dataframe_with_trades_trades.copy()
# --- Data Preparation ---
# Convert the 'date' column to datetime format with milliseconds
dataframe["date"] = pd.to_datetime(dataframe["date"], unit="ms")
# Select the final row of the DataFrame
dataframe = dataframe.tail().reset_index(drop=True)
# Filter trades to those occurring after or at the same time as the first DataFrame date
trades = trades.loc[trades.date >= dataframe.date[0]]
trades.reset_index(inplace=True, drop=True) # Reset index for clarity
# Assert the first trade ID to ensure filtering worked correctly
assert trades["id"][0] == "313881442"
# --- Configuration and Function Call ---
# Define configuration for order flow calculation (used for context)
config = {
"timeframe": "5m",
"orderflow": {
"cache_size": 1000,
"max_candles": 1500,
"scale": 0.5,
"imbalance_volume": 0,
"imbalance_ratio": 3,
"stacked_imbalance_range": 3,
},
}
# Populate the DataFrame with trades and order flow data
df, _ = populate_dataframe_with_trades(OrderedDict(), config, dataframe, trades)
# --- DataFrame and Trade Data Validation ---
row = df.iloc[0] # Extract the first row for assertions
# Assert DataFrame structure
assert list(df.columns) == [
# ... (list of expected column names)
"date",
"open",
"high",
"low",
"close",
"volume",
"trades",
"orderflow",
"imbalances",
"stacked_imbalances_bid",
"stacked_imbalances_ask",
"max_delta",
"min_delta",
"bid",
"ask",
"delta",
"total_trades",
]
# Assert delta, bid, and ask values
assert pytest.approx(row["delta"]) == -50.519
assert row["bid"] == 219.961
assert row["ask"] == 169.442
# Assert the number of trades
assert len(row["trades"]) == 151
# Assert specific details of the first trade
t = row["trades"][0]
assert list(t.keys()) == ["timestamp", "id", "type", "side", "price", "amount", "cost", "date"]
assert trades["id"][0] == t["id"]
assert int(trades["timestamp"][0]) == int(t["timestamp"])
assert t["side"] == "sell"
assert t["id"] == "313881442"
assert t["price"] == 234.72
def test_public_trades_put_volume_profile_into_ohlcv_candles(public_trades_list_simple, candles):
"""
Tests the integration of volume profile data into OHLCV candles.
This test verifies that
the `trades_to_volumeprofile_with_total_delta_bid_ask`
function correctly calculates the volume profile and that
it correctly assigns the delta value from the volume profile to the
corresponding candle in the `candles` DataFrame.
"""
# Convert the trade list to a DataFrame
trades_df = trades_list_to_df(public_trades_list_simple[DEFAULT_TRADES_COLUMNS].values.tolist())
# Generate the volume profile with the specified bin size
df = trades_to_volumeprofile_with_total_delta_bid_ask(trades_df, scale=BIN_SIZE_SCALE)
# Assert the delta value in the total-bid/delta response of the second candle
assert 0.14 == df.values.tolist()[1][2]
# Alternative assertion using `.iat` accessor (assuming correct assignment logic)
assert 0.14 == df["delta"].iat[1]
def test_public_trades_binned_big_sample_list(public_trades_list):
"""
Tests the `trades_to_volumeprofile_with_total_delta_bid_ask` function
with different bin sizes and verifies the generated DataFrame's structure and values.
"""
# Define the bin size for the first test
BIN_SIZE_SCALE = 0.05
# Convert the trade list to a DataFrame
trades = trades_list_to_df(public_trades_list[DEFAULT_TRADES_COLUMNS].values.tolist())
# Generate the volume profile with the specified bin size
df = trades_to_volumeprofile_with_total_delta_bid_ask(trades, scale=BIN_SIZE_SCALE)
# Assert that the DataFrame has the expected columns
assert df.columns.tolist() == [
"bid",
"ask",
"delta",
"bid_amount",
"ask_amount",
"total_volume",
"total_trades",
]
# Assert the number of rows in the DataFrame (expected 23 for this bin size)
assert len(df) == 23
# Assert that the index values are in ascending order and spaced correctly
assert all(df.index[i] < df.index[i + 1] for i in range(len(df) - 1))
assert df.index[0] + BIN_SIZE_SCALE == df.index[1]
assert (trades["price"].min() - BIN_SIZE_SCALE) < df.index[0] < trades["price"].max()
assert (df.index[0] + BIN_SIZE_SCALE) >= df.index[1]
assert (trades["price"].max() - BIN_SIZE_SCALE) < df.index[-1] < trades["price"].max()
# Assert specific values in the first and last rows of the DataFrame
assert 32 == df["bid"].iloc[0] # bid price
assert 197.512 == df["bid_amount"].iloc[0] # total bid amount
assert 88.98 == df["ask_amount"].iloc[0] # total ask amount
assert 26 == df["ask"].iloc[0] # ask price
assert -108.532 == pytest.approx(df["delta"].iloc[0]) # delta (bid amount - ask amount)
assert 3 == df["bid"].iloc[-1] # bid price
assert 50.659 == df["bid_amount"].iloc[-1] # total bid amount
assert 108.21 == df["ask_amount"].iloc[-1] # total ask amount
assert 44 == df["ask"].iloc[-1] # ask price
assert 57.551 == df["delta"].iloc[-1] # delta (bid amount - ask amount)
# Repeat the process with a larger bin size
BIN_SIZE_SCALE = 1
# Generate the volume profile with the larger bin size
df = trades_to_volumeprofile_with_total_delta_bid_ask(trades, scale=BIN_SIZE_SCALE)
# Assert the number of rows in the DataFrame (expected 2 for this bin size)
assert len(df) == 2
# Repeat similar assertions for index ordering and spacing
assert all(df.index[i] < df.index[i + 1] for i in range(len(df) - 1))
assert (trades["price"].min() - BIN_SIZE_SCALE) < df.index[0] < trades["price"].max()
assert (df.index[0] + BIN_SIZE_SCALE) >= df.index[1]
assert (trades["price"].max() - BIN_SIZE_SCALE) < df.index[-1] < trades["price"].max()
# Assert the value in the last row of the DataFrame with the larger bin size
assert 1667.0 == df.index[-1]
assert 710.98 == df["bid_amount"].iat[0]
assert 111 == df["bid"].iat[0]
assert 52.7199999 == pytest.approx(df["delta"].iat[0]) # delta
def test_public_trades_config_max_trades(
default_conf, populate_dataframe_with_trades_dataframe, populate_dataframe_with_trades_trades
):
dataframe = populate_dataframe_with_trades_dataframe.copy()
trades = populate_dataframe_with_trades_trades.copy()
default_conf["exchange"]["use_public_trades"] = True
orderflow_config = {
"timeframe": "5m",
"orderflow": {
"cache_size": 1000,
"max_candles": 1,
"scale": 0.005,
"imbalance_volume": 0,
"imbalance_ratio": 3,
"stacked_imbalance_range": 3,
},
}
df, _ = populate_dataframe_with_trades(
OrderedDict(), default_conf | orderflow_config, dataframe, trades
)
assert df.delta.count() == 1
def test_public_trades_testdata_sanity(
candles,
public_trades_list,
public_trades_list_simple,
populate_dataframe_with_trades_dataframe,
populate_dataframe_with_trades_trades,
):
assert 10999 == len(candles)
assert 1000 == len(public_trades_list)
assert 999 == len(populate_dataframe_with_trades_dataframe)
assert 293532 == len(populate_dataframe_with_trades_trades)
assert 7 == len(public_trades_list_simple)
assert (
5
== public_trades_list_simple.loc[
public_trades_list_simple["side"].str.contains("sell"), "id"
].count()
)
assert (
2
== public_trades_list_simple.loc[
public_trades_list_simple["side"].str.contains("buy"), "id"
].count()
)
assert public_trades_list.columns.tolist() == [
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]
assert public_trades_list.columns.tolist() == [
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]
assert public_trades_list_simple.columns.tolist() == [
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]
assert populate_dataframe_with_trades_dataframe.columns.tolist() == [
"date",
"open",
"high",
"low",
"close",
"volume",
]
assert populate_dataframe_with_trades_trades.columns.tolist() == [
"timestamp",
"id",
"type",
"side",
"price",
"amount",
"cost",
"date",
]

View File

@@ -62,6 +62,42 @@ def test_historic_ohlcv(mocker, default_conf, ohlcv_history):
assert historymock.call_args_list[0][1]["timeframe"] == "5m"
def test_historic_trades(mocker, default_conf, trades_history_df):
historymock = MagicMock(return_value=trades_history_df)
mocker.patch(
"freqtrade.data.history.datahandlers.featherdatahandler.FeatherDataHandler._trades_load",
historymock,
)
dp = DataProvider(default_conf, None)
# Live mode..
with pytest.raises(OperationalException, match=r"Exchange is not available to DataProvider\."):
dp.trades("UNITTEST/BTC", "5m")
exchange = get_patched_exchange(mocker, default_conf)
dp = DataProvider(default_conf, exchange)
data = dp.trades("UNITTEST/BTC", "5m")
assert isinstance(data, DataFrame)
assert len(data) == 0
# Switch to backtest mode
default_conf["runmode"] = RunMode.BACKTEST
default_conf["dataformat_trades"] = "feather"
exchange = get_patched_exchange(mocker, default_conf)
dp = DataProvider(default_conf, exchange)
data = dp.trades("UNITTEST/BTC", "5m")
assert isinstance(data, DataFrame)
assert len(data) == len(trades_history_df)
# Random other runmode
default_conf["runmode"] = RunMode.UTIL_EXCHANGE
dp = DataProvider(default_conf, None)
data = dp.trades("UNITTEST/BTC", "5m")
assert isinstance(data, DataFrame)
assert len(data) == 0
def test_historic_ohlcv_dataformat(mocker, default_conf, ohlcv_history):
hdf5loadmock = MagicMock(return_value=ohlcv_history)
featherloadmock = MagicMock(return_value=ohlcv_history)
@@ -247,8 +283,8 @@ def test_emit_df(mocker, default_conf, ohlcv_history):
def test_refresh(mocker, default_conf):
refresh_mock = MagicMock()
mocker.patch(f"{EXMS}.refresh_latest_ohlcv", refresh_mock)
refresh_mock = mocker.patch(f"{EXMS}.refresh_latest_ohlcv")
mock_refresh_trades = mocker.patch(f"{EXMS}.refresh_latest_trades")
exchange = get_patched_exchange(mocker, default_conf, exchange="binance")
timeframe = default_conf["timeframe"]
@@ -258,7 +294,7 @@ def test_refresh(mocker, default_conf):
dp = DataProvider(default_conf, exchange)
dp.refresh(pairs)
assert mock_refresh_trades.call_count == 0
assert refresh_mock.call_count == 1
assert len(refresh_mock.call_args[0]) == 1
assert len(refresh_mock.call_args[0][0]) == len(pairs)
@@ -266,11 +302,20 @@ def test_refresh(mocker, default_conf):
refresh_mock.reset_mock()
dp.refresh(pairs, pairs_non_trad)
assert mock_refresh_trades.call_count == 0
assert refresh_mock.call_count == 1
assert len(refresh_mock.call_args[0]) == 1
assert len(refresh_mock.call_args[0][0]) == len(pairs) + len(pairs_non_trad)
assert refresh_mock.call_args[0][0] == pairs + pairs_non_trad
# Test with public trades
refresh_mock.reset_mock()
refresh_mock.reset_mock()
default_conf["exchange"]["use_public_trades"] = True
dp.refresh(pairs, pairs_non_trad)
assert mock_refresh_trades.call_count == 1
assert refresh_mock.call_count == 1
def test_orderbook(mocker, default_conf, order_book_l2):
api_mock = MagicMock()

View File

@@ -675,7 +675,7 @@ def test_download_trades_history(
assert not _download_trades_history(
data_handler=data_handler, exchange=exchange, pair="ETH/BTC", trading_mode=TradingMode.SPOT
)
assert log_has_re('Failed to download historic trades for pair: "ETH/BTC".*', caplog)
assert log_has_re('Failed to download and store historic trades for pair: "ETH/BTC".*', caplog)
file2 = tmp_path / "XRP_ETH-trades.json.gz"
copyfile(testdatadir / file2.name, file2)

View File

@@ -8,8 +8,9 @@ from unittest.mock import MagicMock, Mock, PropertyMock, patch
import ccxt
import pytest
from numpy import nan
from pandas import DataFrame
from pandas import DataFrame, to_datetime
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS
from freqtrade.enums import CandleType, MarginMode, RunMode, TradingMode
from freqtrade.exceptions import (
ConfigurationError,
@@ -2371,6 +2372,160 @@ def test_refresh_latest_ohlcv(mocker, default_conf, caplog, candle_type) -> None
assert len(res) == 1
@pytest.mark.parametrize("candle_type", [CandleType.FUTURES, CandleType.MARK, CandleType.SPOT])
def test_refresh_latest_trades(mocker, default_conf, caplog, candle_type, tmp_path) -> None:
trades = [
{
# unix timestamp ms
"timestamp": dt_ts(dt_now() - timedelta(minutes=5)),
"amount": 16.512,
"cost": 10134.07488,
"fee": None,
"fees": [],
"id": "354669639",
"order": None,
"price": 613.74,
"side": "sell",
"takerOrMaker": None,
"type": None,
},
{
"timestamp": dt_ts(), # unix timestamp ms
"amount": 12.512,
"cost": 1000,
"fee": None,
"fees": [],
"id": "354669640",
"order": None,
"price": 613.84,
"side": "buy",
"takerOrMaker": None,
"type": None,
},
]
caplog.set_level(logging.DEBUG)
use_trades_conf = default_conf
use_trades_conf["exchange"]["use_public_trades"] = True
use_trades_conf["datadir"] = tmp_path
use_trades_conf["orderflow"] = {"max_candles": 1500}
exchange = get_patched_exchange(mocker, use_trades_conf)
exchange._api_async.fetch_trades = get_mock_coro(trades)
exchange._ft_has["exchange_has_overrides"]["fetchTrades"] = True
pairs = [("IOTA/USDT:USDT", "5m", candle_type), ("XRP/USDT:USDT", "5m", candle_type)]
# empty dicts
assert not exchange._trades
res = exchange.refresh_latest_trades(pairs, cache=False)
# No caching
assert not exchange._trades
assert len(res) == len(pairs)
assert exchange._api_async.fetch_trades.call_count == 4
exchange._api_async.fetch_trades.reset_mock()
exchange.required_candle_call_count = 2
res = exchange.refresh_latest_trades(pairs)
assert len(res) == len(pairs)
assert log_has(f"Refreshing TRADES data for {len(pairs)} pairs", caplog)
assert exchange._trades
assert exchange._api_async.fetch_trades.call_count == 4
exchange._api_async.fetch_trades.reset_mock()
for pair in pairs:
assert isinstance(exchange.trades(pair), DataFrame)
assert len(exchange.trades(pair)) > 0
# trades function should return a different object on each call
# if copy is "True"
assert exchange.trades(pair) is not exchange.trades(pair)
assert exchange.trades(pair) is not exchange.trades(pair, copy=True)
assert exchange.trades(pair, copy=True) is not exchange.trades(pair, copy=True)
assert exchange.trades(pair, copy=False) is exchange.trades(pair, copy=False)
# test caching
ohlcv = [
[
dt_ts(dt_now() - timedelta(minutes=5)), # unix timestamp ms
1, # open
2, # high
3, # low
4, # close
5, # volume (in quote currency)
],
[
dt_ts(), # unix timestamp ms
3, # open
1, # high
4, # low
6, # close
5, # volume (in quote currency)
],
]
cols = DEFAULT_DATAFRAME_COLUMNS
trades_df = DataFrame(ohlcv, columns=cols)
trades_df["date"] = to_datetime(trades_df["date"], unit="ms", utc=True)
trades_df["date"] = trades_df["date"].apply(lambda date: timeframe_to_prev_date("5m", date))
exchange._klines[pair] = trades_df
res = exchange.refresh_latest_trades(
[("IOTA/USDT:USDT", "5m", candle_type), ("XRP/USDT:USDT", "5m", candle_type)]
)
assert len(res) == 0
assert exchange._api_async.fetch_trades.call_count == 0
caplog.clear()
# Reset refresh times
for pair in pairs:
# test caching with "expired" candle
trades = [
{
# unix timestamp ms
"timestamp": dt_ts(exchange._klines[pair].iloc[-1].date - timedelta(minutes=5)),
"amount": 16.512,
"cost": 10134.07488,
"fee": None,
"fees": [],
"id": "354669639",
"order": None,
"price": 613.74,
"side": "sell",
"takerOrMaker": None,
"type": None,
}
]
trades_df = DataFrame(trades)
trades_df["date"] = to_datetime(trades_df["timestamp"], unit="ms", utc=True)
exchange._trades[pair] = trades_df
res = exchange.refresh_latest_trades(
[("IOTA/USDT:USDT", "5m", candle_type), ("XRP/USDT:USDT", "5m", candle_type)]
)
assert len(res) == len(pairs)
assert exchange._api_async.fetch_trades.call_count == 4
# cache - but disabled caching
exchange._api_async.fetch_trades.reset_mock()
exchange.required_candle_call_count = 1
pairlist = [
("IOTA/ETH", "5m", candle_type),
("XRP/ETH", "5m", candle_type),
("XRP/ETH", "1d", candle_type),
]
res = exchange.refresh_latest_trades(pairlist, cache=False)
assert len(res) == 3
assert exchange._api_async.fetch_trades.call_count == 6
# Test the same again, should NOT return from cache!
exchange._api_async.fetch_trades.reset_mock()
res = exchange.refresh_latest_trades(pairlist, cache=False)
assert len(res) == 3
assert exchange._api_async.fetch_trades.call_count == 6
exchange._api_async.fetch_trades.reset_mock()
caplog.clear()
@pytest.mark.parametrize("candle_type", [CandleType.FUTURES, CandleType.MARK, CandleType.SPOT])
def test_refresh_latest_ohlcv_cache(mocker, default_conf, candle_type, time_machine) -> None:
start = datetime(2021, 8, 1, 0, 0, 0, 0, tzinfo=timezone.utc)

View File

@@ -27,7 +27,7 @@ from freqtrade.configuration.load_config import (
)
from freqtrade.constants import DEFAULT_DB_DRYRUN_URL, DEFAULT_DB_PROD_URL, ENV_VAR_PREFIX
from freqtrade.enums import RunMode
from freqtrade.exceptions import OperationalException
from freqtrade.exceptions import ConfigurationError, OperationalException
from tests.conftest import (
CURRENT_TEST_STRATEGY,
log_has,
@@ -1084,6 +1084,29 @@ def test__validate_consumers(default_conf, caplog) -> None:
assert log_has_re("To receive best performance with external data.*", caplog)
def test__validate_orderflow(default_conf) -> None:
conf = deepcopy(default_conf)
conf["exchange"]["use_public_trades"] = True
with pytest.raises(
ConfigurationError,
match="Orderflow is a required configuration key when using public trades.",
):
validate_config_consistency(conf)
conf.update(
{
"orderflow": {
"scale": 0.5,
"stacked_imbalance_range": 3,
"imbalance_volume": 100,
"imbalance_ratio": 3,
}
}
)
# Should pass.
validate_config_consistency(conf)
def test_load_config_test_comments() -> None:
"""
Load config with comments

1
tests/testdata/orderflow/candles.json vendored Normal file

File diff suppressed because one or more lines are too long

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,8 @@
,timestamp,id,type,side,price,amount,cost,date
0,1675311000092, 1588563957, ,buy, 23438.0, 0.013, 0, 2023-02-02 04:10:00.092000+00:00
1,1675311000211, 1588563958, ,sell, 23437.5, 0.001, 0, 2023-02-02 04:10:00.211000+00:00
2,1675311000335, 1588563959, ,sell , 23437.5, 0.196, 0, 2023-02-02 04:10:00.335000+00:00
3,1675311000769, 1588563960, , sell, 23437.5, 0.046, 0, 2023-02-02 04:10:00.769000+00:00
4,1675311000773, 1588563961, ,buy , 23438.0, 0.127, 0, 2023-02-02 04:10:00.773000+00:00
5,1675311000774, 1588563959, ,sell, 23437.5, 0.001, 0, 2023-02-02 04:10:00.774000+00:00
6,1675311000775, 1588563960, ,sell, 23437.5, 0.001, 0, 2023-02-02 04:10:00.775000+00:00
1 timestamp id type side price amount cost date
2 0 1675311000092 1588563957 buy 23438.0 0.013 0 2023-02-02 04:10:00.092000+00:00
3 1 1675311000211 1588563958 sell 23437.5 0.001 0 2023-02-02 04:10:00.211000+00:00
4 2 1675311000335 1588563959 sell 23437.5 0.196 0 2023-02-02 04:10:00.335000+00:00
5 3 1675311000769 1588563960 sell 23437.5 0.046 0 2023-02-02 04:10:00.769000+00:00
6 4 1675311000773 1588563961 buy 23438.0 0.127 0 2023-02-02 04:10:00.773000+00:00
7 5 1675311000774 1588563959 sell 23437.5 0.001 0 2023-02-02 04:10:00.774000+00:00
8 6 1675311000775 1588563960 sell 23437.5 0.001 0 2023-02-02 04:10:00.775000+00:00