Merge remote-tracking branch 'upstream/develop' into feature/fetch-public-trades

This commit is contained in:
Joe Schr
2024-05-15 14:33:41 +02:00
427 changed files with 48908 additions and 36710 deletions

View File

@@ -4,6 +4,7 @@ Responsible to provide data to the bot
including ticker and orderbook data, live and historical candle (OHLCV) data
Common Interface for bot and strategy to access data.
"""
import logging
from collections import deque
from datetime import datetime, timezone
@@ -12,8 +13,12 @@ from typing import Any, Dict, List, Optional, Tuple
from pandas import DataFrame, Timedelta, Timestamp, to_timedelta
from freqtrade.configuration import TimeRange
from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes,
PairWithTimeframe)
from freqtrade.constants import (
FULL_DATAFRAME_THRESHOLD,
Config,
ListPairsWithTimeframes,
PairWithTimeframe,
)
from freqtrade.data.history import get_datahandler, load_pair_history
from freqtrade.enums import CandleType, RPCMessageType, RunMode, TradingMode
from freqtrade.exceptions import ExchangeError, OperationalException
@@ -27,18 +32,17 @@ from freqtrade.util import PeriodicCache
logger = logging.getLogger(__name__)
NO_EXCHANGE_EXCEPTION = 'Exchange is not available to DataProvider.'
NO_EXCHANGE_EXCEPTION = "Exchange is not available to DataProvider."
MAX_DATAFRAME_CANDLES = 1000
class DataProvider:
def __init__(
self,
config: Config,
exchange: Optional[Exchange],
pairlists=None,
rpc: Optional[RPCManager] = None
rpc: Optional[RPCManager] = None,
) -> None:
self._config = config
self._exchange = exchange
@@ -49,18 +53,20 @@ class DataProvider:
self.__slice_date: Optional[datetime] = None
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
self.__producer_pairs_df: Dict[str,
Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]] = {}
self.__producer_pairs_df: Dict[
str, Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]
] = {}
self.__producer_pairs: Dict[str, List[str]] = {}
self._msg_queue: deque = deque()
self._default_candle_type = self._config.get('candle_type_def', CandleType.SPOT)
self._default_timeframe = self._config.get('timeframe', '1h')
self._default_candle_type = self._config.get("candle_type_def", CandleType.SPOT)
self._default_timeframe = self._config.get("timeframe", "1h")
self.__msg_cache = PeriodicCache(
maxsize=1000, ttl=timeframe_to_seconds(self._default_timeframe))
maxsize=1000, ttl=timeframe_to_seconds(self._default_timeframe)
)
self.producers = self._config.get('external_message_consumer', {}).get('producers', [])
self.producers = self._config.get("external_message_consumer", {}).get("producers", [])
self.external_data_enabled = len(self.producers) > 0
def _set_dataframe_max_index(self, limit_index: int):
@@ -80,11 +86,7 @@ class DataProvider:
self.__slice_date = limit_date
def _set_cached_df(
self,
pair: str,
timeframe: str,
dataframe: DataFrame,
candle_type: CandleType
self, pair: str, timeframe: str, dataframe: DataFrame, candle_type: CandleType
) -> None:
"""
Store cached Dataframe.
@@ -96,8 +98,7 @@ class DataProvider:
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
pair_key = (pair, timeframe, candle_type)
self.__cached_pairs[pair_key] = (
dataframe, datetime.now(timezone.utc))
self.__cached_pairs[pair_key] = (dataframe, datetime.now(timezone.utc))
# For multiple producers we will want to merge the pairlists instead of overwriting
def _set_producer_pairs(self, pairlist: List[str], producer_name: str = "default"):
@@ -116,12 +117,7 @@ class DataProvider:
"""
return self.__producer_pairs.get(producer_name, []).copy()
def _emit_df(
self,
pair_key: PairWithTimeframe,
dataframe: DataFrame,
new_candle: bool
) -> None:
def _emit_df(self, pair_key: PairWithTimeframe, dataframe: DataFrame, new_candle: bool) -> None:
"""
Send this dataframe as an ANALYZED_DF message to RPC
@@ -131,19 +127,21 @@ class DataProvider:
"""
if self.__rpc:
msg: RPCAnalyzedDFMsg = {
'type': RPCMessageType.ANALYZED_DF,
'data': {
'key': pair_key,
'df': dataframe.tail(1),
'la': datetime.now(timezone.utc)
}
}
"type": RPCMessageType.ANALYZED_DF,
"data": {
"key": pair_key,
"df": dataframe.tail(1),
"la": datetime.now(timezone.utc),
},
}
self.__rpc.send_msg(msg)
if new_candle:
self.__rpc.send_msg({
'type': RPCMessageType.NEW_CANDLE,
'data': pair_key,
})
self.__rpc.send_msg(
{
"type": RPCMessageType.NEW_CANDLE,
"data": pair_key,
}
)
def _replace_external_df(
self,
@@ -152,7 +150,7 @@ class DataProvider:
last_analyzed: datetime,
timeframe: str,
candle_type: CandleType,
producer_name: str = "default"
producer_name: str = "default",
) -> None:
"""
Add the pair data to this class from an external source.
@@ -178,7 +176,7 @@ class DataProvider:
last_analyzed: datetime,
timeframe: str,
candle_type: CandleType,
producer_name: str = "default"
producer_name: str = "default",
) -> Tuple[bool, int]:
"""
Append a candle to the existing external dataframe. The incoming dataframe
@@ -204,12 +202,14 @@ class DataProvider:
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
producer_name=producer_name,
)
return (True, 0)
if (producer_name not in self.__producer_pairs_df
or pair_key not in self.__producer_pairs_df[producer_name]):
if (
producer_name not in self.__producer_pairs_df
or pair_key not in self.__producer_pairs_df[producer_name]
):
# We don't have data from this producer yet,
# or we don't have data for this pair_key
# return False and 1000 for the full df
@@ -220,12 +220,12 @@ class DataProvider:
# CHECK FOR MISSING CANDLES
# Convert the timeframe to a timedelta for pandas
timeframe_delta: Timedelta = to_timedelta(timeframe)
local_last: Timestamp = existing_df.iloc[-1]['date'] # We want the last date from our copy
local_last: Timestamp = existing_df.iloc[-1]["date"] # We want the last date from our copy
# We want the first date from the incoming
incoming_first: Timestamp = dataframe.iloc[0]['date']
incoming_first: Timestamp = dataframe.iloc[0]["date"]
# Remove existing candles that are newer than the incoming first candle
existing_df1 = existing_df[existing_df['date'] < incoming_first]
existing_df1 = existing_df[existing_df["date"] < incoming_first]
candle_difference = (incoming_first - local_last) / timeframe_delta
@@ -243,13 +243,13 @@ class DataProvider:
# Everything is good, we appended
self._replace_external_df(
pair,
appended_df,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
pair,
appended_df,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name,
)
return (True, 0)
def get_producer_df(
@@ -257,7 +257,7 @@ class DataProvider:
pair: str,
timeframe: Optional[str] = None,
candle_type: Optional[CandleType] = None,
producer_name: str = "default"
producer_name: str = "default",
) -> Tuple[DataFrame, datetime]:
"""
Get the pair data from producers.
@@ -292,64 +292,64 @@ class DataProvider:
"""
self._pairlists = pairlists
def historic_ohlcv(
self,
pair: str,
timeframe: str,
candle_type: str = ''
) -> DataFrame:
def historic_ohlcv(self, pair: str, timeframe: str, candle_type: str = "") -> DataFrame:
"""
Get stored historical candle (OHLCV) data
:param pair: pair to get the data for
:param timeframe: timeframe to get data for
:param candle_type: '', mark, index, premiumIndex, or funding_rate
"""
_candle_type = CandleType.from_string(
candle_type) if candle_type != '' else self._config['candle_type_def']
_candle_type = (
CandleType.from_string(candle_type)
if candle_type != ""
else self._config["candle_type_def"]
)
saved_pair: PairWithTimeframe = (pair, str(timeframe), _candle_type)
if saved_pair not in self.__cached_pairs_backtesting:
timerange = TimeRange.parse_timerange(None if self._config.get(
'timerange') is None else str(self._config.get('timerange')))
timerange = TimeRange.parse_timerange(
None
if self._config.get("timerange") is None
else str(self._config.get("timerange"))
)
startup_candles = self.get_required_startup(str(timeframe))
tf_seconds = timeframe_to_seconds(str(timeframe))
timerange.subtract_start(tf_seconds * startup_candles)
logger.info(f"Loading data for {pair} {timeframe} "
f"from {timerange.start_fmt} to {timerange.stop_fmt}")
logger.info(
f"Loading data for {pair} {timeframe} "
f"from {timerange.start_fmt} to {timerange.stop_fmt}"
)
self.__cached_pairs_backtesting[saved_pair] = load_pair_history(
pair=pair,
timeframe=timeframe,
datadir=self._config['datadir'],
datadir=self._config["datadir"],
timerange=timerange,
data_format=self._config['dataformat_ohlcv'],
data_format=self._config["dataformat_ohlcv"],
candle_type=_candle_type,
)
return self.__cached_pairs_backtesting[saved_pair].copy()
def get_required_startup(self, timeframe: str) -> int:
freqai_config = self._config.get('freqai', {})
if not freqai_config.get('enabled', False):
return self._config.get('startup_candle_count', 0)
freqai_config = self._config.get("freqai", {})
if not freqai_config.get("enabled", False):
return self._config.get("startup_candle_count", 0)
else:
startup_candles = self._config.get('startup_candle_count', 0)
indicator_periods = freqai_config['feature_parameters']['indicator_periods_candles']
startup_candles = self._config.get("startup_candle_count", 0)
indicator_periods = freqai_config["feature_parameters"]["indicator_periods_candles"]
# make sure the startupcandles is at least the set maximum indicator periods
self._config['startup_candle_count'] = max(startup_candles, max(indicator_periods))
self._config["startup_candle_count"] = max(startup_candles, max(indicator_periods))
tf_seconds = timeframe_to_seconds(timeframe)
train_candles = freqai_config['train_period_days'] * 86400 / tf_seconds
total_candles = int(self._config['startup_candle_count'] + train_candles)
train_candles = freqai_config["train_period_days"] * 86400 / tf_seconds
total_candles = int(self._config["startup_candle_count"] + train_candles)
logger.info(
f'Increasing startup_candle_count for freqai on {timeframe} to {total_candles}')
f"Increasing startup_candle_count for freqai on {timeframe} to {total_candles}"
)
return total_candles
def get_pair_dataframe(
self,
pair: str,
timeframe: Optional[str] = None,
candle_type: str = ''
self, pair: str, timeframe: Optional[str] = None, candle_type: str = ""
) -> DataFrame:
"""
Return pair candle (OHLCV) data, either live or cached historical -- depending
@@ -366,13 +366,13 @@ class DataProvider:
data = self.ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type)
else:
# Get historical OHLCV data (cached on disk).
timeframe = timeframe or self._config['timeframe']
timeframe = timeframe or self._config["timeframe"]
data = self.historic_ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type)
# Cut date to timeframe-specific date.
# This is necessary to prevent lookahead bias in callbacks through informative pairs.
if self.__slice_date:
cutoff_date = timeframe_to_prev_date(timeframe, self.__slice_date)
data = data.loc[data['date'] < cutoff_date]
data = data.loc[data["date"] < cutoff_date]
if len(data) == 0:
logger.warning(f"No data found for ({pair}, {timeframe}, {candle_type}).")
return data
@@ -387,7 +387,7 @@ class DataProvider:
combination.
Returns empty dataframe and Epoch 0 (1970-01-01) if no dataframe was cached.
"""
pair_key = (pair, timeframe, self._config.get('candle_type_def', CandleType.SPOT))
pair_key = (pair, timeframe, self._config.get("candle_type_def", CandleType.SPOT))
if pair_key in self.__cached_pairs:
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
df, date = self.__cached_pairs[pair_key]
@@ -395,7 +395,7 @@ class DataProvider:
df, date = self.__cached_pairs[pair_key]
if self.__slice_index is not None:
max_index = self.__slice_index
df = df.iloc[max(0, max_index - MAX_DATAFRAME_CANDLES):max_index]
df = df.iloc[max(0, max_index - MAX_DATAFRAME_CANDLES) : max_index]
return df, date
else:
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
@@ -406,7 +406,7 @@ class DataProvider:
Get runmode of the bot
can be "live", "dry-run", "backtest", "edgecli", "hyperopt" or "other".
"""
return RunMode(self._config.get('runmode', RunMode.OTHER))
return RunMode(self._config.get("runmode", RunMode.OTHER))
def current_whitelist(self) -> List[str]:
"""
@@ -434,9 +434,11 @@ class DataProvider:
# Exchange functions
def refresh(self,
pairlist: ListPairsWithTimeframes,
helping_pairs: Optional[ListPairsWithTimeframes] = None) -> None:
def refresh(
self,
pairlist: ListPairsWithTimeframes,
helping_pairs: Optional[ListPairsWithTimeframes] = None,
) -> None:
"""
Refresh data, called with each cycle
"""
@@ -469,11 +471,7 @@ class DataProvider:
return list(self._exchange._klines.keys())
def ohlcv(
self,
pair: str,
timeframe: Optional[str] = None,
copy: bool = True,
candle_type: str = ''
self, pair: str, timeframe: Optional[str] = None, copy: bool = True, candle_type: str = ""
) -> DataFrame:
"""
Get candle (OHLCV) data for the given pair as DataFrame
@@ -487,11 +485,13 @@ class DataProvider:
if self._exchange is None:
raise OperationalException(NO_EXCHANGE_EXCEPTION)
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
_candle_type = CandleType.from_string(
candle_type) if candle_type != '' else self._config['candle_type_def']
_candle_type = (
CandleType.from_string(candle_type)
if candle_type != ""
else self._config["candle_type_def"]
)
return self._exchange.klines(
(pair, timeframe or self._config['timeframe'], _candle_type),
copy=copy
(pair, timeframe or self._config["timeframe"], _candle_type), copy=copy
)
else:
return DataFrame()
@@ -572,7 +572,7 @@ class DataProvider:
Send custom RPC Notifications from your bot.
Will not send any bot in modes other than Dry-run or Live.
:param message: Message to be sent. Must be below 4096.
:param always_send: If False, will send the message only once per candle, and surpress
:param always_send: If False, will send the message only once per candle, and suppress
identical messages.
Careful as this can end up spaming your chat.
Defaults to False