use fetch_trades' public trades to populate dataframe

This commit is contained in:
Joe Schr
2023-04-26 15:14:45 +02:00
parent b88a561eb5
commit 0f4e147035
6 changed files with 953 additions and 94 deletions

View File

@@ -3,7 +3,7 @@
""" """
bot constants bot constants
""" """
from typing import Any, Dict, List, Literal, Tuple from typing import Any, Dict, List, Literal, Tuple, Optional
from freqtrade.enums import CandleType, PriceType, RPCMessageType from freqtrade.enums import CandleType, PriceType, RPCMessageType
@@ -588,7 +588,7 @@ CONF_SCHEMA = {
"properties": { "properties": {
"shuffle": {"type": "boolean", "default": False}, "shuffle": {"type": "boolean", "default": False},
"nu": {"type": "number", "default": 0.1} "nu": {"type": "number", "default": 0.1}
}, },
}, },
"shuffle_after_split": {"type": "boolean", "default": False}, "shuffle_after_split": {"type": "boolean", "default": False},
"buffer_train_data_candles": {"type": "integer", "default": 0} "buffer_train_data_candles": {"type": "integer", "default": 0}
@@ -704,6 +704,9 @@ ListPairsWithTimeframes = List[PairWithTimeframe]
# Type for trades list # Type for trades list
TradeList = List[List] TradeList = List[List]
# ticks, pair, timeframe, CandleType
TickWithTimeframe = Tuple[str, str, CandleType, Optional[int], Optional[int]]
ListTicksWithTimeframes = List[TickWithTimeframe]
LongShort = Literal['long', 'short'] LongShort = Literal['long', 'short']
EntryExit = Literal['entry', 'exit'] EntryExit = Literal['entry', 'exit']

View File

@@ -28,7 +28,8 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *,
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete :param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame :return: DataFrame
""" """
logger.debug(f"Converting candle (OHLCV) data to dataframe for pair {pair}.") logger.debug(
f"Converting candle (OHLCV) data to dataframe for pair {pair}.")
cols = DEFAULT_DATAFRAME_COLUMNS cols = DEFAULT_DATAFRAME_COLUMNS
df = DataFrame(ohlcv, columns=cols) df = DataFrame(ohlcv, columns=cols)
@@ -44,6 +45,331 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *,
drop_incomplete=drop_incomplete) drop_incomplete=drop_incomplete)
def _init_dataframe_with_trades_columns(dataframe: DataFrame):
"""
Populates a dataframe with trades columns
:param dataframe: Dataframe to populate
"""
dataframe['trades'] = dataframe.apply(lambda _: [], axis=1)
dataframe['orderflow'] = dataframe.apply(lambda _: {}, axis=1)
dataframe['bid'] = np.nan
dataframe['ask'] = np.nan
dataframe['delta'] = np.nan
dataframe['min_delta'] = np.nan
dataframe['max_delta'] = np.nan
dataframe['total_trades'] = np.nan
dataframe['stacked_imbalances_bid'] = np.nan
dataframe['stacked_imbalances_ask'] = np.nan
def _convert_timeframe_to_pandas_frequency(timeframe: str) -> str:
# convert timeframe to format usable by pandas
from freqtrade.exchange import timeframe_to_minutes
timeframe_minutes = timeframe_to_minutes(timeframe)
timeframe_frequency = f'{timeframe_minutes}min'
return (timeframe_frequency, timeframe_minutes)
def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str):
timeframe_frequency, timeframe_minutes = _convert_timeframe_to_pandas_frequency(
timeframe)
# calculate ohlcv candle start and end
df['datetime'] = pd.to_datetime(df['date'], unit='ms')
df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency)
df['candle_end'] = df['candle_start'] + pd.Timedelta(timeframe_minutes)
df.drop(columns=['datetime'], inplace=True)
cached_grouped_trades_pair = {}
def populate_dataframe_with_trades(config: Config, dataframe: DataFrame, trades: DataFrame, *, pair: str) -> DataFrame:
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
:param trades: Trades to populate with
:return: Dataframe with trades populated
"""
config_orderflow = config['orderflow']
timeframe = config['timeframe']
# create columns for trades
_init_dataframe_with_trades_columns(dataframe)
df = dataframe.copy()
try:
start_time = time.time()
# calculate ohlcv candle start and end
_calculate_ohlcv_candle_start_and_end(df, timeframe)
_calculate_ohlcv_candle_start_and_end(trades, timeframe)
# slice of trades that are before current ohlcv candles to make groupby faster
trades = trades.loc[trades.candle_start >= df.candle_start[0]]
trades.reset_index(inplace=True, drop=True)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby(
'candle_start', group_keys=False)
# groups = trades_grouped_by_candle_start.groups
new_grouped_trades_dict = {key1: group for key1,
group in trades_grouped_by_candle_start}
logger.debug(
f"{len(new_grouped_trades_dict.keys())} candles to process")
cached_grouped_trades_dict = cached_grouped_trades_pair.get(
(pair, timeframe), {})
new_keys = set(list(new_grouped_trades_dict.keys()))
# don't process twice
for candle_start in cached_grouped_trades_dict:
# TODO: don't delete last candle
# to allow refresh in case of wrong data
if candle_start in new_grouped_trades_dict:
del new_grouped_trades_dict[candle_start]
cached_keys = set(list(cached_grouped_trades_dict.keys()))
old_keys = cached_keys - new_keys
# return values not in cached
for key in old_keys:
if key in cached_grouped_trades_dict and not np.any((key == df.date) == True):
del cached_grouped_trades_dict[key]
for candle_start in cached_grouped_trades_dict:
is_between = (candle_start == df['candle_start'])
for column in list(dataframe.columns):
# special case 'trades','oderflow'
# they don't have a single value
if column in ['trades', 'orderflow']:
dataframe.loc[is_between,
column] = dataframe.loc[is_between,
column].apply(lambda _:
cached_grouped_trades_dict[candle_start][column].values[0])
else:
dataframe.loc[is_between,
column] = cached_grouped_trades_dict[candle_start][column].values[0]
# repair 'date' datetime type (otherwise crashes on each compare)
if "date" in dataframe.columns:
dataframe['date'] = pd.to_datetime(dataframe['date'])
for candle_start in new_grouped_trades_dict.keys():
trades_grouped_df = new_grouped_trades_dict[candle_start]
is_between = (candle_start == df['candle_start'])
if np.any(is_between == True):
(timeframe_frequency, timeframe_minutes) = _convert_timeframe_to_pandas_frequency(
timeframe)
candle_next = candle_start + \
pd.Timedelta(minutes=timeframe_minutes)
# skip if there are no trades at next candle because that this candle isn't finished yet
# if not np.any((candle_next == df.candle_start)):
if not candle_next in trades_grouped_by_candle_start.groups:
logger.debug(
f"Skipping candle at {candle_start} with {len(trades_grouped_df)} trades, because no finished trades at {candle_next}")
continue
# add trades to each candle
df.loc[is_between, 'trades'] = df.loc[is_between,
'trades'].apply(lambda _: trades_grouped_df)
# calculate orderflow for each candle
df.loc[is_between, 'orderflow'] = df.loc[is_between, 'orderflow'].apply(
lambda _: trades_to_volumeprofile_with_total_delta_bid_ask(pd.DataFrame(trades_grouped_df), scale=config_orderflow['scale']))
# calculate imbalances for each candle's orderflow
df.loc[is_between, 'imbalances'] = df.loc[is_between, 'orderflow'].apply(
lambda x: trades_orderflow_to_imbalances(x, imbalance_ratio=config_orderflow['imbalance_ratio'], imbalance_volume=config_orderflow['imbalance_volume']))
df.loc[is_between, 'stacked_imbalances_bid'] = df.loc[is_between,
'imbalances'].apply(lambda x: stacked_imbalance_bid(x, stacked_imbalance_range=config_orderflow['stacked_imbalance_range']))
df.loc[is_between, 'stacked_imbalances_ask'] = df.loc[is_between,
'imbalances'].apply(lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=config_orderflow['stacked_imbalance_range']))
buy = df.loc[is_between, 'bid'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount']))
sell = df.loc[is_between, 'ask'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('sell'), 0, trades_grouped_df['amount']))
deltas_per_trade = sell - buy
min_delta = 0
max_delta = 0
delta = 0
for deltas in deltas_per_trade:
for d in deltas:
delta += d
if delta > max_delta:
max_delta = delta
if delta < min_delta:
min_delta = delta
df.loc[is_between, 'max_delta'] = max_delta
df.loc[is_between, 'min_delta'] = min_delta
df.loc[is_between, 'bid'] = np.where(trades_grouped_df['side'].str.contains(
'buy'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'ask'] = np.where(trades_grouped_df['side'].str.contains(
'sell'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'delta'] = df.loc[is_between,
'ask'] - df.loc[is_between, 'bid']
min_delta = np.min(deltas_per_trade)
max_delta = np.max(deltas_per_trade)
df.loc[is_between, 'total_trades'] = len(trades_grouped_df)
# cache
cached_grouped_trades_dict[candle_start] = df.loc[is_between].copy() # copy() to avoid memleak
dataframe.loc[is_between] = df.loc[is_between]
else:
logger.debug(
f"Found NO candles for trades starting with {candle_start}")
logger.debug(
f"trades.groups_keys in {time.time() - start_time} seconds")
logger.debug(
f"trades.singleton_iterate in {time.time() - start_time} seconds")
del cached_grouped_trades_pair[(pair, timeframe)]
cached_grouped_trades_pair[(pair, timeframe)
] = cached_grouped_trades_dict
except Exception as e:
logger.error(f"Error populating dataframe with trades: {e}")
return dataframe
# TODO: remove timeframe and pair
def public_trades_to_dataframe(trades: list, timeframe: str, pair: str, *,
fill_missing: bool = True, drop_incomplete: bool = True) -> DataFrame:
"""
Converts a list with candle (TRADES) data (in format returned by ccxt.fetch_trades)
to a Dataframe
:param trades: list with candle (TRADES) data, as returned by exchange.async_get_candle_history
:param timeframe: timeframe (e.g. 5m). Used to fill up eventual missing data
:param pair: Pair this data is for (used to warn if fillup was necessary)
:param fill_missing: fill up missing candles with 0 candles
(see trades_fill_up_missing_data for details)
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame
"""
logger.debug(
f"Converting candle (TRADES) data to dataframe for pair {pair}.")
cols = DEFAULT_TRADES_COLUMNS
df = DataFrame(trades, columns=cols)
df['date'] = pd.to_datetime(
df['timestamp'], unit='ms', utc=True, infer_datetime_format=True)
# Some exchanges return int values for Volume and even for OHLC.
# Convert them since TA-LIB indicators used in the strategy assume floats
# and fail with exception...
df = df.astype(dtype={'amount': 'float', 'cost': 'float',
'price': 'float'})
#
# df.columns
# df = clean_duplicate_trades(df, timeframe, pair,
# fill_missing=fill_missing,
# drop_incomplete=drop_incomplete)
# df = drop_incomplete_and_fill_missing_trades(df, timeframe, pair,
# fill_missing=fill_missing,
# drop_incomplete=drop_incomplete)
return df
def trades_to_volumeprofile_with_total_delta_bid_ask(trades: DataFrame, scale: int):
"""
:param trades: dataframe
:param scale: scale aka bin size e.g. 0.5
:return: trades binned to levels according to scale aka orderflow
"""
df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS)
# create bid, ask where side is sell or buy
df['bid_amount'] = np.where(
trades['side'].str.contains('buy'), 0, trades['amount'])
df['ask_amount'] = np.where(
trades['side'].str.contains('sell'), 0, trades['amount'])
df['bid'] = np.where(
trades['side'].str.contains('buy'), 0, 1)
df['ask'] = np.where(
trades['side'].str.contains('sell'), 0, 1)
# round the prices to the nearest multiple of the scale
df['price'] = ((trades['price'] / scale).round()
* scale).astype('float64').values
if df.empty:
df['total'] = np.nan
df['delta'] = np.nan
return df
df['delta'] = df['ask_amount'] - df['bid_amount']
df['total_volume'] = df['ask_amount'] + df['bid_amount']
df['total_trades'] = df['ask'] + df['bid']
# group to bins aka apply scale
df = df.groupby('price').sum(numeric_only=True)
return df
def trades_orderflow_to_imbalances(df: DataFrame, imbalance_ratio: int, imbalance_volume: int):
"""
:param df: dataframes with bid and ask
:param imbalance_ratio: imbalance_ratio e.g. 300
:param imbalance_volume: imbalance volume e.g. 3)
:return: dataframe with bid and ask imbalance
"""
bid = df.bid
ask = df.ask.shift(-1)
bid_imbalance = (bid / ask) > (imbalance_ratio / 100)
# overwrite bid_imbalance with False if volume is not big enough
bid_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, bid_imbalance)
ask_imbalance = (ask / bid) > (imbalance_ratio / 100)
# overwrite ask_imbalance with False if volume is not big enough
ask_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, ask_imbalance)
dataframe = DataFrame(
{'bid_imbalance': bid_imbalance_filtered, 'ask_imbalance': ask_imbalance_filtered}, index=df.index)
return dataframe
def stacked_imbalance(df: DataFrame, label: str = "bid", stacked_imbalance_range: int = 3, should_reverse: bool = False):
"""
y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)
https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array
"""
imbalance = df[f'{label}_imbalance']
int_series = pd.Series(np.where(imbalance, 1, 0))
stacked = int_series * \
(int_series.groupby((int_series != int_series.shift()).cumsum()).cumcount() + 1)
max_stacked_imbalance_idx = stacked.index[stacked >=
stacked_imbalance_range]
stacked_imbalance_price = np.nan
if not max_stacked_imbalance_idx.empty:
# TODO: do better than just take first
idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud(
max_stacked_imbalance_idx)[0]
stacked_imbalance_price = imbalance.index[idx]
return stacked_imbalance_price
def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int = 3):
return stacked_imbalance(df, 'bid', stacked_imbalance_range)
def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int = 3):
return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True)
def orderflow_to_volume_profile(orderflow: DataFrame):
"""
:param orderflow: dataframe
:return: volume profile dataframe
"""
df = orderflow
bid = df.groupby('level').bid.sum()
ask = df.groupby('level').ask.sum()
df.groupby('level')['level'].sum()
delta = df.groupby('level').ask.sum() - df.groupby('level').bid.sum()
df = pd.DataFrame({'bid': bid, 'ask': ask, 'delta': delta})
return df
def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *, def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool, drop_incomplete: bool) -> DataFrame: fill_missing: bool, drop_incomplete: bool) -> DataFrame:
""" """
@@ -78,6 +404,70 @@ def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *,
return data return data
def warn_of_tick_duplicates(data: DataFrame, pair: str) -> None:
no_dupes_colunms = ['id', 'timestamp', 'datetime']
for col in no_dupes_colunms:
if col in data.columns and data[col].duplicated().any():
sum = data[col].duplicated().sum()
message = f'{sum} duplicated ticks for {pair} in {col} detected.'
if col == 'id':
logger.warning(message)
else:
logger.debug(message)
def clean_duplicate_trades(trades: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool, drop_incomplete: bool) -> DataFrame:
"""
Cleanse a TRADES dataframe by
* Grouping it by date (removes duplicate tics)
* dropping last candles if requested
* Filling up missing data (if requested)
:param data: DataFrame containing candle (TRADES) data.
:param timeframe: timeframe (e.g. 5m). Used to fill up eventual missing data
:param pair: Pair this data is for (used to warn if fillup was necessary)
:param fill_missing: fill up missing candles with 0 candles
(see trades_fill_up_missing_data for details)
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame
"""
# group by index and aggregate results to eliminate duplicate ticks
# check if data has duplicate ticks
logger.debug(f"Clean duplicated ticks from Trades data {pair}")
df = pd.DataFrame(trades_remove_duplicates(
trades.values.tolist()), columns=trades.columns)
#
# from freqtrade.exchange import timeframe_to_minutes
# timeframe_minutes = timeframe_to_minutes(timeframe)
# sum_dict = {}
# for col in ['amount']: # TODO: remove side,etc
# sum_dict[col] = 'sum'
# group by index and aggregate results to eliminate duplicate ticks
# df = data.groupby(
# by='date', as_index=False, sort=True).agg(sum_dict) # NOTE: sum doesn't make much sense for eliminating duplicates?
return df
def drop_incomplete_and_fill_missing_trades(data: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool, drop_incomplete: bool) -> DataFrame:
# eliminate partial candle
if drop_incomplete:
# TODO: this is not correct, as it drops the last trade only
# but we need to drop the last candle until closed
pass
data.drop(data.tail(1).index, inplace=True)
logger.debug('Dropping last trade')
if fill_missing:
return trades_fill_up_missing_data(data, timeframe, pair)
else:
return data
def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) -> DataFrame: def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) -> DataFrame:
""" """
Fills up missing data with 0 volume rows, Fills up missing data with 0 volume rows,
@@ -114,7 +504,8 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str)
df.reset_index(inplace=True) df.reset_index(inplace=True)
len_before = len(dataframe) len_before = len(dataframe)
len_after = len(df) len_after = len(df)
pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0 pct_missing = (len_after - len_before) / \
len_before if len_before > 0 else 0
if len_before != len_after: if len_before != len_after:
message = (f"Missing data fillup for {pair}: before: {len_before} - after: {len_after}" message = (f"Missing data fillup for {pair}: before: {len_before} - after: {len_after}"
f" - {pct_missing:.2%}") f" - {pct_missing:.2%}")
@@ -159,7 +550,8 @@ def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange,
processed: Dict[str, DataFrame] = {} processed: Dict[str, DataFrame] = {}
for pair, df in preprocessed.items(): for pair, df in preprocessed.items():
trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles) trimed_df = trim_dataframe(
df, timerange, startup_candles=startup_candles)
if not trimed_df.empty: if not trimed_df.empty:
processed[pair] = trimed_df processed[pair] = trimed_df
else: else:

View File

@@ -12,8 +12,9 @@ from typing import Any, Dict, List, Optional, Tuple
from pandas import DataFrame, Timedelta, Timestamp, to_timedelta from pandas import DataFrame, Timedelta, Timestamp, to_timedelta
from freqtrade.configuration import TimeRange from freqtrade.configuration import TimeRange
from freqtrade.data.history.idatahandler import get_datahandler
from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes, from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes,
PairWithTimeframe) PairWithTimeframe, ListTicksWithTimeframes)
from freqtrade.data.history import load_pair_history from freqtrade.data.history import load_pair_history
from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.enums import CandleType, RPCMessageType, RunMode
from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.exceptions import ExchangeError, OperationalException
@@ -24,6 +25,7 @@ from freqtrade.rpc import RPCManager
from freqtrade.rpc.rpc_types import RPCAnalyzedDFMsg from freqtrade.rpc.rpc_types import RPCAnalyzedDFMsg
from freqtrade.util import PeriodicCache from freqtrade.util import PeriodicCache
from freqtrade.data.converter import public_trades_to_dataframe
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -442,7 +444,12 @@ class DataProvider:
if self._exchange is None: if self._exchange is None:
raise OperationalException(NO_EXCHANGE_EXCEPTION) raise OperationalException(NO_EXCHANGE_EXCEPTION)
final_pairs = (pairlist + helping_pairs) if helping_pairs else pairlist final_pairs = (pairlist + helping_pairs) if helping_pairs else pairlist
# refresh latest ohlcv data
self._exchange.refresh_latest_ohlcv(final_pairs) self._exchange.refresh_latest_ohlcv(final_pairs)
# refresh latest trades data (if enabled)
self._exchange.refresh_latest_trades(final_pairs,
get_datahandler(self._config['datadir'],
data_format=self._config['dataformat_trades']))
@property @property
def available_pairs(self) -> ListPairsWithTimeframes: def available_pairs(self) -> ListPairsWithTimeframes:
@@ -482,6 +489,44 @@ class DataProvider:
else: else:
return DataFrame() return DataFrame()
def trades(
self,
pair: str,
timeframe: Optional[str] = None,
copy: bool = True,
candle_type: str = ''
) -> DataFrame:
"""
Get candle (TRADES) data for the given pair as DataFrame
Please use the `available_pairs` method to verify which pairs are currently cached.
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
:param candle_type: '', mark, index, premiumIndex, or funding_rate
:param copy: copy dataframe before returning if True.
Use False only for read-only operations (where the dataframe is not modified)
"""
if self._exchange is None:
raise OperationalException(NO_EXCHANGE_EXCEPTION)
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
_candle_type = CandleType.from_string(
candle_type) if candle_type != '' else self._config['candle_type_def']
return self._exchange.trades(
(pair, timeframe or self._config['timeframe'], _candle_type),
copy=copy
)
elif self.runmode in (RunMode.BACKTEST, RunMode.HYPEROPT):
_candle_type = CandleType.from_string(
candle_type) if candle_type != '' else self._config['candle_type_def']
data_handler = get_datahandler(
self._config['datadir'], data_format=self._config['dataformat_trades'])
ticks = data_handler.trades_load(pair)
trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False,
drop_incomplete=False)
return trades_df
else:
return DataFrame()
def market(self, pair: str) -> Optional[Dict[str, Any]]: def market(self, pair: str) -> Optional[Dict[str, Any]]:
""" """
Return market data for the pair Return market data for the pair

View File

@@ -329,7 +329,9 @@ def _download_trades_history(exchange: Exchange,
pair: str, *, pair: str, *,
new_pairs_days: int = 30, new_pairs_days: int = 30,
timerange: Optional[TimeRange] = None, timerange: Optional[TimeRange] = None,
data_handler: IDataHandler data_handler: IDataHandler,
since: Optional[int] = None,
until: Optional[int] = None
) -> bool: ) -> bool:
""" """
Download trade history from the exchange. Download trade history from the exchange.
@@ -395,7 +397,7 @@ def _download_trades_history(exchange: Exchange,
except Exception: except Exception:
logger.exception( logger.exception(
f'Failed to download historic trades for pair: "{pair}". ' f'Failed to download and store historic trades for pair: "{pair}". '
) )
return False return False
@@ -506,8 +508,6 @@ def download_data_main(config: Config) -> None:
# Start downloading # Start downloading
try: try:
if config.get('download_trades'): if config.get('download_trades'):
if config.get('trading_mode') == 'futures':
raise OperationalException("Trade download not supported for futures.")
pairs_not_available = refresh_backtest_trades_data( pairs_not_available = refresh_backtest_trades_data(
exchange, pairs=expanded_pairs, datadir=config['datadir'], exchange, pairs=expanded_pairs, datadir=config['datadir'],
timerange=timerange, new_pairs_days=config['new_pairs_days'], timerange=timerange, new_pairs_days=config['new_pairs_days'],

View File

@@ -10,7 +10,7 @@ from copy import deepcopy
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from math import floor from math import floor
from threading import Lock from threading import Lock
from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union, Callable
import ccxt import ccxt
import ccxt.async_support as ccxt_async import ccxt.async_support as ccxt_async
@@ -19,7 +19,7 @@ from ccxt import TICK_SIZE
from dateutil import parser from dateutil import parser
from pandas import DataFrame, concat from pandas import DataFrame, concat
from freqtrade.constants import (DEFAULT_AMOUNT_RESERVE_PERCENT, NON_OPEN_EXCHANGE_STATES, BidAsk, from freqtrade.constants import (DEFAULT_TRADES_COLUMNS, DEFAULT_AMOUNT_RESERVE_PERCENT, NON_OPEN_EXCHANGE_STATES, BidAsk,
BuySell, Config, EntryExit, ExchangeConfig, BuySell, Config, EntryExit, ExchangeConfig,
ListPairsWithTimeframes, MakerTaker, OBLiteral, PairWithTimeframe) ListPairsWithTimeframes, MakerTaker, OBLiteral, PairWithTimeframe)
from freqtrade.data.converter import clean_ohlcv_dataframe, ohlcv_to_dataframe, trades_dict_to_list from freqtrade.data.converter import clean_ohlcv_dataframe, ohlcv_to_dataframe, trades_dict_to_list
@@ -115,6 +115,7 @@ class Exchange:
# Holds last candle refreshed time of each pair # Holds last candle refreshed time of each pair
self._pairs_last_refresh_time: Dict[PairWithTimeframe, int] = {} self._pairs_last_refresh_time: Dict[PairWithTimeframe, int] = {}
self._trades_last_refresh_time: Dict[PairWithTimeframe, int] = {}
# Timestamp of last markets refresh # Timestamp of last markets refresh
self._last_markets_refresh: int = 0 self._last_markets_refresh: int = 0
@@ -130,6 +131,9 @@ class Exchange:
# Holds candles # Holds candles
self._klines: Dict[PairWithTimeframe, DataFrame] = {} self._klines: Dict[PairWithTimeframe, DataFrame] = {}
# Holds public_trades
self._trades: Dict[PairWithTimeframe, DataFrame] = {}
# Holds all open sell orders for dry_run # Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {} self._dry_run_open_orders: Dict[str, Any] = {}
@@ -160,10 +164,13 @@ class Exchange:
# Assign this directly for easy access # Assign this directly for easy access
self._ohlcv_partial_candle = self._ft_has['ohlcv_partial_candle'] self._ohlcv_partial_candle = self._ft_has['ohlcv_partial_candle']
self._max_trades_candle_limit = self._config.get('exchange', {}).get('trades_candle_limit', 1000)
self._trades_pagination = self._ft_has['trades_pagination'] self._trades_pagination = self._ft_has['trades_pagination']
self._trades_pagination_arg = self._ft_has['trades_pagination_arg'] self._trades_pagination_arg = self._ft_has['trades_pagination_arg']
self._trades_bin_size_scale = self._config.get('exchange', {}).get('bin_size_scale', 0.5)
# Initialize ccxt objects # Initialize ccxt objects
ccxt_config = self._ccxt_config ccxt_config = self._ccxt_config
ccxt_config = deep_merge_dicts(exchange_conf.get('ccxt_config', {}), ccxt_config) ccxt_config = deep_merge_dicts(exchange_conf.get('ccxt_config', {}), ccxt_config)
@@ -338,6 +345,22 @@ class Exchange:
return int(self._ft_has.get('ohlcv_candle_limit_per_timeframe', {}).get( return int(self._ft_has.get('ohlcv_candle_limit_per_timeframe', {}).get(
timeframe, self._ft_has.get('ohlcv_candle_limit'))) timeframe, self._ft_has.get('ohlcv_candle_limit')))
def trades_candle_limit(
self, timeframe: str, candle_type: CandleType, since_ms: Optional[int] = None) -> int:
"""
Exchange trades candle limit
Uses trades_candle_limit_per_timeframe if the exchange has different limits
per timeframe (e.g. bittrex), otherwise falls back to trades_candle_limit
:param timeframe: Timeframe to check
:param candle_type: Candle-type
:param since_ms: Starting timestamp
:return: Candle limit as integer
"""
#TODO: check if there are trades candle limits
return int(self._ft_has.get('trade_candle_limit_per_timeframe', {}).get(
timeframe, self._ft_has.get('trade_candle_limit',self._max_trades_candle_limit)))
def get_markets(self, base_currencies: List[str] = [], quote_currencies: List[str] = [], def get_markets(self, base_currencies: List[str] = [], quote_currencies: List[str] = [],
spot_only: bool = False, margin_only: bool = False, futures_only: bool = False, spot_only: bool = False, margin_only: bool = False, futures_only: bool = False,
tradable_only: bool = True, tradable_only: bool = True,
@@ -415,6 +438,16 @@ class Exchange:
else: else:
return DataFrame() return DataFrame()
def trades(self, pair_interval: PairWithTimeframe, copy: bool = True) -> DataFrame:
if pair_interval in self._trades:
if copy:
import copy
return copy.deepcopy(self._trades[pair_interval])
else:
return self._trades[pair_interval]
else:
return DataFrame()
def get_contract_size(self, pair: str) -> Optional[float]: def get_contract_size(self, pair: str) -> Optional[float]:
if self.trading_mode == TradingMode.FUTURES: if self.trading_mode == TradingMode.FUTURES:
market = self.markets.get(pair, {}) market = self.markets.get(pair, {})
@@ -1927,24 +1960,68 @@ class Exchange:
return data return data
async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, async def _async_get_historic_ohlcv(self, pair: str, timeframe: str,
since_ms: int, candle_type: CandleType,
is_new_pair: bool = False, raise_: bool = False,
until_ms: Optional[int] = None
) -> OHLCVResponse:
"""
Download historic ohlcv
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type, since_ms)
logger.debug(
"one_call: %s msecs (%s)",
one_call,
arrow.utcnow().shift(seconds=one_call // 1000).humanize(only_distance=True)
)
input_coroutines = [self._async_get_candle_history(
pair, timeframe, candle_type, since) for since in
range(since_ms, until_ms or (arrow.utcnow().int_timestamp * 1000), one_call)]
data: List = []
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100):
results = await asyncio.gather(*input_coro, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
logger.warning(f"Async code raised an exception: {repr(res)}")
if raise_:
raise
continue
else:
# Deconstruct tuple if it's not an exception
p, _, c, new_data, _ = res
if p == pair and c == candle_type:
data.extend(new_data)
# Sort data again after extending the result - above calls return in "async order"
data = sorted(data, key=lambda x: x[0])
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
async def _async_get_historic_trades(self, pair: str, timeframe: str,
since_ms: int, candle_type: CandleType, since_ms: int, candle_type: CandleType,
is_new_pair: bool = False, raise_: bool = False, is_new_pair: bool = False, raise_: bool = False,
until_ms: Optional[int] = None until_ms: Optional[int] = None
) -> OHLCVResponse: ) -> Ticker:
""" """
Download historic ohlcv Download historic trades
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading :param is_new_pair: used by binance subclass to allow "fast" new pair downloading
:param candle_type: Any of the enum CandleType (must match trading mode!) :param candle_type: Any of the enum CandleType (must match trading mode!)
""" """
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( one_call = timeframe_to_msecs(timeframe) * self.trades_candle_limit(
timeframe, candle_type, since_ms) timeframe, candle_type, since_ms)
logger.debug( logger.debug(
"one_call: %s msecs (%s)", "one_call: %s msecs (%s)",
one_call, one_call,
dt_humanize(dt_now() - timedelta(milliseconds=one_call), only_distance=True) dt_humanize(dt_now() - timedelta(milliseconds=one_call), only_distance=True)
) )
input_coroutines = [self._async_get_candle_history( until_ms = until_ms if until_ms else (arrow.utcnow().int_timestamp * 1000)
input_coroutines = [self._async_get_trades_history(
pair, timeframe, candle_type, since) for since in pair, timeframe, candle_type, since) for since in
range(since_ms, until_ms or dt_ts(), one_call)] range(since_ms, until_ms or dt_ts(), one_call)]
@@ -1963,18 +2040,49 @@ class Exchange:
# Deconstruct tuple if it's not an exception # Deconstruct tuple if it's not an exception
p, _, c, new_data, _ = res p, _, c, new_data, _ = res
if p == pair and c == candle_type: if p == pair and c == candle_type:
data.extend(new_data) data.extend(new_data)
# Sort data again after extending the result - above calls return in "async order" # Sort data again after extending the result - above calls return in "async order"
data = sorted(data, key=lambda x: x[0]) data = sorted(data, key=lambda x: x['timestamp'])# TODO: sort via 'timestamp' or 'id'?
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
def _build_coroutine( def _build_coroutine_get_ohlcv(
self, pair: str, timeframe: str, candle_type: CandleType,
since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]:
not_all_data = cache and self.required_candle_call_count > 1
if cache and (pair, timeframe, candle_type) in self._klines:
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
min_date = date_minus_candles(timeframe, candle_limit - self._required_candle_call_count_max).timestamp()
# Check if 1 call can get us updated candles without hole in the data.
if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0):
# Cache can be used - do one-off call.
not_all_data = False
else:
# Time jump detected, evict cache
logger.info(
f"Time jump detected. Evicting ohlcv cache for {pair}, {timeframe}, {candle_type}")
del self._klines[(pair, timeframe, candle_type)]
if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)):
# Multiple calls for one pair - to get more history
since_ms = self.needed_candle_ms(timeframe,candle_type)
# TODO: fetch_trades and return as results
if since_ms:
return self._async_get_historic_ohlcv(
pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type)
else:
# One call ... "regular" refresh
return self._async_get_candle_history(
pair, timeframe, since_ms=since_ms, candle_type=candle_type)
def _build_coroutine_get_trades(
self, pair: str, timeframe: str, candle_type: CandleType, self, pair: str, timeframe: str, candle_type: CandleType,
since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]: since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]:
not_all_data = cache and self.required_candle_call_count > 1 not_all_data = cache and self.required_candle_call_count > 1
if cache and (pair, timeframe, candle_type) in self._klines: if cache and (pair, timeframe, candle_type) in self._trades:
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type) candle_limit = self.trades_candle_limit(timeframe, candle_type)
min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp() min_date = date_minus_candles(timeframe, candle_limit - self._required_candle_call_count_max).timestamp()
# Check if 1 call can get us updated candles without hole in the data. # Check if 1 call can get us updated candles without hole in the data.
if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0): if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0):
# Cache can be used - do one-off call. # Cache can be used - do one-off call.
@@ -1982,10 +2090,11 @@ class Exchange:
else: else:
# Time jump detected, evict cache # Time jump detected, evict cache
logger.info( logger.info(
f"Time jump detected. Evicting cache for {pair}, {timeframe}, {candle_type}") f"Time jump detected. Evicting trades cache for {pair}, {timeframe}, {candle_type}")
del self._klines[(pair, timeframe, candle_type)] del self._trades[(pair, timeframe, candle_type)]
if (not since_ms and (self._ft_has["ohlcv_require_since"] or not_all_data)): #TODO: change to trades candle limit
if (not since_ms or not_all_data):
# Multiple calls for one pair - to get more history # Multiple calls for one pair - to get more history
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type, since_ms) timeframe, candle_type, since_ms)
@@ -1994,22 +2103,70 @@ class Exchange:
since_ms = int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) since_ms = int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000)
if since_ms: if since_ms:
return self._async_get_historic_ohlcv( return self._async_get_historic_trades(
pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type) pair, timeframe, since_ms=since_ms, raise_=True, candle_type=candle_type)
else: else:
# One call ... "regular" refresh # One call ... "regular" refresh
return self._async_get_candle_history( return self._async_get_trades_history(
pair, timeframe, since_ms=since_ms, candle_type=candle_type) pair, timeframe, since_ms=since_ms, candle_type=candle_type)
def _build_ohlcv_dl_jobs( def _build_ohlcv_dl_jobs(
self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int],
cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]:
"""
Build Coroutines to execute as part of refresh_latest_ohlcv
"""
input_coroutines: List[Coroutine[Any, Any, Ticker]] = []
cached_pairs = []
for pair, timeframe, candle_type in set(pair_list):
if (timeframe not in self.timeframes
and candle_type in (CandleType.SPOT, CandleType.FUTURES)):
logger.warning(
f"Cannot download ({pair}, {timeframe}) combination as this timeframe is "
f"not available on {self.name}. Available timeframes are "
f"{', '.join(self.timeframes)}.")
continue
if ((pair, timeframe, candle_type) not in self._klines or not cache
or self._now_is_time_to_refresh(pair, timeframe, candle_type)):
input_coroutines.append(
self._build_coroutine_get_ohlcv(pair, timeframe, candle_type, since_ms, cache))
else:
logger.debug(
f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..."
)
cached_pairs.append((pair, timeframe, candle_type))
return input_coroutines, cached_pairs
def _build_trades_dl_jobs(
self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int],
cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]: cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]:
""" """
Build Coroutines to execute as part of refresh_latest_ohlcv Build Coroutines to execute as part of refresh_latest_trades
""" """
input_coroutines: List[Coroutine[Any, Any, OHLCVResponse]] = [] input_coroutines: List[Coroutine[Any, Any, TRADESResponse]] = []
cached_pairs = [] cached_pairs = []
for pair, timeframe, candle_type in set(pair_list): for pair, timeframe, candle_type in set(pair_list):
if not since_ms:
plr = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0)
# If we don't have a last refresh time, we need to download all trades
# This is the case when the bot is started
if not plr:
# using ohlcv_candle_limit here, because we calculate the distance
# to first required candle
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type, since_ms)
target_candle = one_call * self.required_candle_call_count
now = timeframe_to_next_date(timeframe)
since_ms = int((now - timedelta(seconds=target_candle // 1000)).timestamp() * 1000)
else: since_ms = plr
if (timeframe not in self.timeframes if (timeframe not in self.timeframes
and candle_type in (CandleType.SPOT, CandleType.FUTURES)): and candle_type in (CandleType.SPOT, CandleType.FUTURES)):
logger.warning( logger.warning(
@@ -2018,94 +2175,222 @@ class Exchange:
f"{', '.join(self.timeframes)}.") f"{', '.join(self.timeframes)}.")
continue continue
if ((pair, timeframe, candle_type) not in self._klines or not cache if ((pair, timeframe, candle_type) not in self._trades or not cache
or self._now_is_time_to_refresh(pair, timeframe, candle_type)): or self._now_is_time_to_refresh(pair, timeframe, candle_type)):
input_coroutines.append( input_coroutines.append(
self._build_coroutine(pair, timeframe, candle_type, since_ms, cache)) self._build_coroutine_get_trades(pair, timeframe, candle_type, since_ms, cache))
else: else:
logger.debug( logger.debug(
f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..." f"Using cached candle (TRADES) data for {pair}, {timeframe}, {candle_type} ..."
) )
cached_pairs.append((pair, timeframe, candle_type)) cached_pairs.append((pair, timeframe, candle_type))
return input_coroutines, cached_pairs return input_coroutines, cached_pairs
def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List],
cache: bool, drop_incomplete: bool) -> DataFrame: cache: bool, drop_incomplete: bool) -> DataFrame:
# keeping last candle time as last refreshed time of the pair
if ticks and cache:
self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000
# keeping parsed dataframe in cache
ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True,
drop_incomplete=drop_incomplete)
if cache:
if (pair, timeframe, c_type) in self._klines:
old = self._klines[(pair, timeframe, c_type)]
# Reassign so we return the updated, combined df
ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair,
fill_missing=True, drop_incomplete=False)
candle_limit = self.ohlcv_candle_limit(timeframe, self._config['candle_type_def'])
# Age out old candles
ohlcv_df = ohlcv_df.tail(candle_limit + self._startup_candle_count)
ohlcv_df = ohlcv_df.reset_index(drop=True)
self._klines[(pair, timeframe, c_type)] = ohlcv_df
else:
self._klines[(pair, timeframe, c_type)] = ohlcv_df
return ohlcv_df
def _process_trades_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List],
cache: bool, drop_incomplete: bool, first_required_candle_date:Optional[int]) -> DataFrame:
# keeping parsed dataframe in cache
# TODO: pass last_full_candle_date to drop as incomplete
trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False,
drop_incomplete=drop_incomplete)
# keeping last candle time as last refreshed time of the pair # keeping last candle time as last refreshed time of the pair
if ticks and cache: if ticks and cache:
idx = -2 if drop_incomplete and len(ticks) > 1 else -1 idx = -2 if drop_incomplete and len(ticks) > 1 else -1
self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[idx][0] // 1000 self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # NOTE: // is floor: divides and rounds to nearest int
# keeping parsed dataframe in cache
ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True,
drop_incomplete=drop_incomplete)
if cache: if cache:
if (pair, timeframe, c_type) in self._klines: if (pair, timeframe, c_type) in self._trades:
old = self._klines[(pair, timeframe, c_type)] old = self._trades[(pair, timeframe, c_type)]
# Reassign so we return the updated, combined df # Reassign so we return the updated, combined df
ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair, trades_df = clean_duplicate_trades(concat([old, trades_df], axis=0), timeframe, pair, fill_missing=False, drop_incomplete=False)
fill_missing=True, drop_incomplete=False) # warn_of_tick_duplicates(trades_df, pair)
candle_limit = self.ohlcv_candle_limit(timeframe, self._config['candle_type_def'])
# Age out old candles # Age out old candles
ohlcv_df = ohlcv_df.tail(candle_limit + self._startup_candle_count) if first_required_candle_date:
ohlcv_df = ohlcv_df.reset_index(drop=True) # slice of older dates
self._klines[(pair, timeframe, c_type)] = ohlcv_df trades_df = trades_df[first_required_candle_date < trades_df['timestamp']]
else: trades_df = trades_df.reset_index(drop=True)
self._klines[(pair, timeframe, c_type)] = ohlcv_df self._trades[(pair, timeframe, c_type)] = trades_df
return ohlcv_df return trades_df
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None, cache: bool = True, since_ms: Optional[int] = None, cache: bool = True,
drop_incomplete: Optional[bool] = None drop_incomplete: Optional[bool] = None
) -> Dict[PairWithTimeframe, DataFrame]: ) -> Dict[PairWithTimeframe, DataFrame]:
"""
Refresh in-memory OHLCV asynchronously and set `_trades` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
Only used in the dataprovider.refresh() method.
:param pair_list: List of 2 element tuples containing pair, interval to refresh
:param since_ms: time since when to download, in milliseconds
:param cache: Assign result to _trades. Usefull for one-off downloads like for pairlists
:param drop_incomplete: Control candle dropping.
Specifying None defaults to _ohlcv_partial_candle
:return: Dict of [{(pair, timeframe): Dataframe}]
"""
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
# Gather coroutines to run
input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache)
results_df = {}
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100):
async def gather_stuff():
return await asyncio.gather(*input_coro, return_exceptions=True)
with self._loop_lock:
results = self.loop.run_until_complete(gather_stuff())
for res in results:
if isinstance(res, Exception):
logger.warning(f"Async code raised an exception: {repr(res)}")
continue
# Deconstruct tuple (has 5 elements)
pair, timeframe, c_type, ticks, drop_hint = res
drop_incomplete = drop_hint if drop_incomplete is None else drop_incomplete
# TODO: here ohlcv candles get saved into self._trades
ohlcv_df = self._process_ohlcv_df(
pair, timeframe, c_type, ticks, cache, drop_incomplete)
results_df[(pair, timeframe, c_type)] = ohlcv_df
# Return cached trades
for pair, timeframe, c_type in cached_pairs:
results_df[(pair, timeframe, c_type)] = self.trades(
(pair, timeframe, c_type),
copy=False
)
return results_df
def needed_candle_ms(self, timeframe:str, candle_type:CandleType):
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type)
move_to = one_call * self.required_candle_call_count
now = timeframe_to_next_date(timeframe)
return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000)
def refresh_latest_trades(self,
pair_list: ListPairsWithTimeframes ,
data_handler: Callable,# IDataHandler,
*,
cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]:
use_public_trades = self._config.get(
'exchange', {}).get('use_public_trades', False)
if use_public_trades:
self._refresh_latest_trades(pair_list, data_handler, cache=cache)
def _refresh_latest_trades(self,
pair_list: ListPairsWithTimeframes ,
data_handler: Callable,# IDataHandler,
*,
cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]:
""" """
Refresh in-memory OHLCV asynchronously and set `_klines` with the result Refresh in-memory TRADES asynchronously and set `_trades` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
Only used in the dataprovider.refresh() method. Only used in the dataprovider.refresh() method.
:param pair_list: List of 2 element tuples containing pair, interval to refresh :param pair_list: List of 3 element tuples containing (pair, timeframe, candle_type)
:param since_ms: time since when to download, in milliseconds :param since_ms: time since when to download, in milliseconds
:param cache: Assign result to _klines. Usefull for one-off downloads like for pairlists :param cache: Assign result to _trades. Usefull for one-off downloads like for pairlists
:param drop_incomplete: Control candle dropping. :param drop_incomplete: Control candle dropping.
Specifying None defaults to _ohlcv_partial_candle Specifying None defaults to _ohlcv_partial_candle
:return: Dict of [{(pair, timeframe): Dataframe}] :return: Dict of [{(pair, timeframe): Dataframe}]
""" """
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) logger.debug("Refreshing TRADES data for %d pairs", len(pair_list))
since_ms = None
# Gather coroutines to run
input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache)
results_df = {} results_df = {}
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling for pair, timeframe, candle_type in set(pair_list):
for input_coro in chunks(input_coroutines, 100): new_ticks = []
async def gather_stuff(): all_stored_ticks = []
return await asyncio.gather(*input_coro, return_exceptions=True) first_candle_ms = self.needed_candle_ms(timeframe, candle_type)
# refresh, if
# a. not in _trades
# b. no cache used
# c. need new data
is_in_cache = (pair, timeframe, candle_type) in self._trades
if ( not is_in_cache or not cache or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)):
logger.debug(f"Refreshing TRADES data for {pair}")
# fetch trades since latest _trades and
# store together with existing trades
try:
until = None
from_id = None
if is_in_cache:
trades = self._trades[(pair, timeframe, candle_type)]
from_id = trades.iloc[-1]['id']
with self._loop_lock: last_candle_refresh = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0)
results = self.loop.run_until_complete(gather_stuff()) until = last_candle_refresh * 1000 if last_candle_refresh else arrow.now('UTC').int_timestamp * 1000
for res in results: else:
if isinstance(res, Exception): next_closed_candle_time = timeframe_to_next_date(timeframe)
logger.warning(f"Async code raised an exception: {repr(res)}") until = int(next_closed_candle_time.timestamp()) * 1000
continue all_stored_ticks = data_handler.trades_load(f"{pair}-cached")
# Deconstruct tuple (has 5 elements) if all_stored_ticks:
pair, timeframe, c_type, ticks, drop_hint = res if all_stored_ticks[0][0] <= first_candle_ms:
drop_incomplete_ = drop_hint if drop_incomplete is None else drop_incomplete from_id = all_stored_ticks[-1][1]
ohlcv_df = self._process_ohlcv_df( # from_id overrides simce_ms
pair, timeframe, c_type, ticks, cache, drop_incomplete_) since_ms = all_stored_ticks[-1][0]
# doesn't go far enough
else:
all_stored_ticks = []
results_df[(pair, timeframe, c_type)] = ohlcv_df # from_id overrules with exchange set to id paginate
# TODO: DEBUG:
# since_ms = 1681284338000
# from_id = None
# TODO: /DEBUG
[ticks_pair, new_ticks]=self._download_trades_history(pair,
since=since_ms if since_ms else first_candle_ms,
until=until,
from_id=from_id)
# Return cached klines except Exception as e:
for pair, timeframe, c_type in cached_pairs: logger.error(f"Refreshing TRADES data for {pair} failed")
results_df[(pair, timeframe, c_type)] = self.klines( logger.error(e)
(pair, timeframe, c_type),
copy=False
) if new_ticks:
drop_incomplete = False # TODO: remove, no incomplete trades
all_stored_ticks.extend(new_ticks)
# NOTE: only process new trades
# self._trades = until_first_candle(stored_trades) + fetch_trades
trades_df = self._process_trades_df(pair, timeframe, candle_type, all_stored_ticks, cache, drop_incomplete, first_candle_ms)
results_df[(pair, timeframe, candle_type)] = trades_df
data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS].values.tolist())
return results_df return results_df
def _now_is_time_to_refresh(self, pair: str, timeframe: str, candle_type: CandleType) -> bool: def _now_is_time_to_refresh(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
# Timeframe in seconds # Timeframe in seconds
interval_in_sec = timeframe_to_seconds(timeframe) interval_in_sec = timeframe_to_seconds(timeframe)
@@ -2114,6 +2399,13 @@ class Exchange:
now = int(timeframe_to_prev_date(timeframe).timestamp()) now = int(timeframe_to_prev_date(timeframe).timestamp())
return plr < now return plr < now
def _now_is_time_to_refresh_trades(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
# Timeframe in seconds
interval_in_sec = timeframe_to_seconds(timeframe)
plr = self._trades_last_refresh_time.get((pair, timeframe, candle_type), 0) + interval_in_sec
REFRESH_EARLIER_SECONDS = 5
return plr < arrow.utcnow().int_timestamp - REFRESH_EARLIER_SECONDS
@retrier_async @retrier_async
async def _async_get_candle_history( async def _async_get_candle_history(
self, self,
@@ -2179,6 +2471,75 @@ class Exchange:
raise OperationalException(f'Could not fetch historical candle (OHLCV) data ' raise OperationalException(f'Could not fetch historical candle (OHLCV) data '
f'for pair {pair}. Message: {e}') from e f'for pair {pair}. Message: {e}') from e
@retrier_async
async def _async_get_trades_history(
self,
pair: str,
timeframe: str,
candle_type: CandleType,
since_ms: Optional[int] = None,
) -> Ticker:
"""
Asynchronously get candle history data using fetch_trades
:param candle_type: '', mark, index, premiumIndex, or funding_rate
returns tuple: (pair, timeframe, trades_list)
"""
try:
# Fetch TRADES asynchronously
s = '(' + arrow.get(since_ms // 1000).isoformat() + ') ' if since_ms is not None else ''
logger.debug(
"Fetching pair %s, %s, interval %s, since %s %s...",
pair, candle_type, timeframe, since_ms, s
)
params = deepcopy(self._ft_has.get('trades_params', {}))
candle_limit = self.trades_candle_limit(
timeframe, candle_type=candle_type, since_ms=since_ms)
if candle_type and candle_type != CandleType.SPOT:
params.update({'price': candle_type.value})
if candle_type != CandleType.FUNDING_RATE:
assert since_ms is not None # NOTE: with none there seems no response
data = await self._api_async.fetch_trades(
pair, since=since_ms,
limit=candle_limit, params=params)
else:
# TODO: debug?
# Funding rate
data = await self._fetch_funding_rate_history(
pair=pair,
timeframe=timeframe,
limit=candle_limit,
since_ms=since_ms,
)
# Some exchanges sort TRADES in ASC order and others in DESC.
# Ex: Bittrex returns the list of TRADES in ASC order (oldest first, newest last)
# while GDAX returns the list of TRADES in DESC order (newest first, oldest last)
# Only sort if necessary to save computing time
try:
# TODO: check if even needed?
if data and data[0]['timestamp'] > data[-1]['timestamp']:
data = sorted(data, key=lambda x: x[0])
except KeyError:
logger.exception("Error loading %s. Result was %s.", pair, data)
return pair, timeframe, candle_type, [], True
logger.debug("Done fetching pair %s, interval %s ...", pair, timeframe)
return pair, timeframe, candle_type, data, True
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical '
f'candle (TRADES) data. Message: {e}') from e
except ccxt.DDoSProtection as e:
raise DDosProtection(e) from e
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(f'Could not fetch historical candle (TRADES) data '
f'for pair {pair} due to {e.__class__.__name__}. '
f'Message: {e}') from e
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch historical candle (TRADES) data '
f'for pair {pair}. Message: {e}') from e
async def _fetch_funding_rate_history( async def _fetch_funding_rate_history(
self, self,
pair: str, pair: str,
@@ -2211,18 +2572,21 @@ class Exchange:
returns: List of dicts containing trades returns: List of dicts containing trades
""" """
try: try:
candle_limit = self.trades_candle_limit("1m", candle_type=CandleType.FUTURES, since_ms=since)
# fetch trades asynchronously # fetch trades asynchronously
if params: if params:
logger.debug("Fetching trades for pair %s, params: %s ", pair, params) logger.debug("Fetching trades for pair %s, params: %s ", pair, params)
trades = await self._api_async.fetch_trades(pair, params=params, limit=1000) trades = await self._api_async.fetch_trades(pair, params=params, limit=candle_limit)
else: else:
logger.debug( logger.debug(
"Fetching trades for pair %s, since %s %s...", "Fetching trades for pair %s, since %s %s...",
pair, since, pair, since,
'(' + dt_from_ts(since).isoformat() + ') ' if since is not None else '' '(' + dt_from_ts(since).isoformat() + ') ' if since is not None else ''
) )
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000) trades = await self._api_async.fetch_trades(pair, since=since, limit=candle_limit)
trades = self._trades_contracts_to_amount(trades) trades = self._trades_contracts_to_amount(trades)
logger.debug( "Fetched trades for pair %s, datetime: %s (%d).", pair, trades[0]['datetime'], trades[0]['timestamp'] )
return trades_dict_to_list(trades) return trades_dict_to_list(trades)
except ccxt.NotSupported as e: except ccxt.NotSupported as e:
raise OperationalException( raise OperationalException(
@@ -2239,7 +2603,8 @@ class Exchange:
async def _async_get_trade_history_id(self, pair: str, async def _async_get_trade_history_id(self, pair: str,
until: int, until: int,
since: Optional[int] = None, since: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[List]]: from_id: Optional[str] = None,
stop_on_from_id: Optional[bool] = True) -> Tuple[str, List[List]]:
""" """
Asyncronously gets trade history using fetch_trades Asyncronously gets trade history using fetch_trades
use this when exchange uses id-based iteration (check `self._trades_pagination`) use this when exchange uses id-based iteration (check `self._trades_pagination`)
@@ -2252,17 +2617,20 @@ class Exchange:
trades: List[List] = [] trades: List[List] = []
if not until and not stop_on_from_id:
raise "stop_on_from_id must be set if until is not set"
if not from_id: if not from_id:
# Fetch first elements using timebased method to get an ID to paginate on # Fetch first elements using timebased method to get an ID to paginate on
# Depending on the Exchange, this can introduce a drift at the start of the interval # Depending on the Exchange, this can introduce a drift at the start of the interval
# of up to an hour. # of up to an hour.
# e.g. Binance returns the "last 1000" candles within a 1h time interval # e.g. Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first trades. # - so we will miss the first trades.
t = await self._async_fetch_trades(pair, since=since) trade = await self._async_fetch_trades(pair, since=since)
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp # DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id # DEFAULT_TRADES_COLUMNS: 1 -> id
from_id = t[-1][1] from_id = trade[-1][1]
trades.extend(t[:-1]) trades.extend(trade[:-1])
while True: while True:
try: try:
t = await self._async_fetch_trades(pair, t = await self._async_fetch_trades(pair,
@@ -2322,7 +2690,9 @@ class Exchange:
async def _async_get_trade_history(self, pair: str, async def _async_get_trade_history(self, pair: str,
since: Optional[int] = None, since: Optional[int] = None,
until: Optional[int] = None, until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[List]]: from_id: Optional[str] = None,
stop_on_from_id: Optional[bool] = True,
) -> Tuple[str, List[List]]:
""" """
Async wrapper handling downloading trades using either time or id based methods. Async wrapper handling downloading trades using either time or id based methods.
""" """
@@ -2330,16 +2700,17 @@ class Exchange:
logger.debug(f"_async_get_trade_history(), pair: {pair}, " logger.debug(f"_async_get_trade_history(), pair: {pair}, "
f"since: {since}, until: {until}, from_id: {from_id}") f"since: {since}, until: {until}, from_id: {from_id}")
if until is None: if self._trades_pagination == 'time':
until = ccxt.Exchange.milliseconds() if until is None:
until = ccxt.Exchange.milliseconds()
logger.debug(f"Exchange milliseconds: {until}") logger.debug(f"Exchange milliseconds: {until}")
if self._trades_pagination == 'time':
return await self._async_get_trade_history_time( return await self._async_get_trade_history_time(
pair=pair, since=since, until=until) pair=pair, since=since, until=until)
elif self._trades_pagination == 'id': elif self._trades_pagination == 'id':
return await self._async_get_trade_history_id( return await self._async_get_trade_history_id(
pair=pair, since=since, until=until, from_id=from_id pair=pair, since=since, until=until, from_id=from_id,
stop_on_from_id=stop_on_from_id
) )
else: else:
raise OperationalException(f"Exchange {self.name} does use neither time, " raise OperationalException(f"Exchange {self.name} does use neither time, "
@@ -2348,7 +2719,9 @@ class Exchange:
def get_historic_trades(self, pair: str, def get_historic_trades(self, pair: str,
since: Optional[int] = None, since: Optional[int] = None,
until: Optional[int] = None, until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List]: from_id: Optional[str] = None,
stop_on_from_id: Optional[bool] = True
) -> Tuple[str, List]:
""" """
Get trade history data using asyncio. Get trade history data using asyncio.
Handles all async work and returns the list of candles. Handles all async work and returns the list of candles.
@@ -2374,6 +2747,33 @@ class Exchange:
pass pass
return self.loop.run_until_complete(task) return self.loop.run_until_complete(task)
def _download_trades_history(self,
pair: str,
*,
new_pairs_days: int = 30,
since: Optional[int] = None,
until: Optional[int] = None,
from_id: Optional[int] = None,
stop_on_from_id: Optional[bool] = False
) -> bool:
"""
Download trade history from the exchange.
Appends to previously downloaded trades data.
:param until: is in msecs
:param since: is in msecs
:return Boolean of success
"""
# if not until:
# until = arrow.utcnow().int_timestamp * 1000
new_trades = self.get_historic_trades(pair=pair,
since=since,
until=until,
from_id=from_id,
stop_on_from_id=stop_on_from_id)
return new_trades
@retrier @retrier
def _get_funding_fees_from_exchange(self, pair: str, since: Union[datetime, int]) -> float: def _get_funding_fees_from_exchange(self, pair: str, since: Union[datetime, int]) -> float:
""" """

View File

@@ -14,7 +14,8 @@ from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, MarketDirection, RunMode, from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, MarketDirection, RunMode,
SignalDirection, SignalTagType, SignalType, TradingMode) SignalDirection, SignalTagType, SignalType, TradingMode)
from freqtrade.exceptions import OperationalException, StrategyError from freqtrade.exceptions import OperationalException, StrategyError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds, timeframe_to_msecs
from freqtrade.data import converter
from freqtrade.misc import remove_entry_exit_signals from freqtrade.misc import remove_entry_exit_signals
from freqtrade.persistence import Order, PairLocks, Trade from freqtrade.persistence import Order, PairLocks, Trade
from freqtrade.strategy.hyper import HyperStrategyMixin from freqtrade.strategy.hyper import HyperStrategyMixin
@@ -840,6 +841,7 @@ class IStrategy(ABC, HyperStrategyMixin):
dataframe = self.advise_indicators(dataframe, metadata) dataframe = self.advise_indicators(dataframe, metadata)
dataframe = self.advise_entry(dataframe, metadata) dataframe = self.advise_entry(dataframe, metadata)
dataframe = self.advise_exit(dataframe, metadata) dataframe = self.advise_exit(dataframe, metadata)
logger.debug("TA Analysis Ended")
return dataframe return dataframe
def _analyze_ticker_internal(self, dataframe: DataFrame, metadata: dict) -> DataFrame: def _analyze_ticker_internal(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
@@ -1364,6 +1366,23 @@ class IStrategy(ABC, HyperStrategyMixin):
dataframe = _create_and_merge_informative_pair( dataframe = _create_and_merge_informative_pair(
self, dataframe, metadata, inf_data, populate_fn) self, dataframe, metadata, inf_data, populate_fn)
# TODO: extract this into a separate method e.g. if_enabled_populate_trades()
use_public_trades = self.config.get(
'exchange', {}).get('use_public_trades', False)
if use_public_trades:
trades = self.dp.trades(pair=metadata['pair'], copy=False)
config = self.config
config['timeframe'] = self.timeframe
# TODO: slice trades to size of dataframe for faster backtesting
dataframe = converter.populate_dataframe_with_trades(
config,
dataframe,
trades,
pair=metadata['pair'])
logger.debug("Populated dataframe with trades.")
return self.populate_indicators(dataframe, metadata) return self.populate_indicators(dataframe, metadata)
def advise_entry(self, dataframe: DataFrame, metadata: dict) -> DataFrame: def advise_entry(self, dataframe: DataFrame, metadata: dict) -> DataFrame: