ruff format: pairlist plugins

This commit is contained in:
Matthias
2024-05-12 16:37:11 +02:00
parent c9d301e4f9
commit 700b7acb6f
18 changed files with 652 additions and 446 deletions

View File

@@ -1,6 +1,7 @@
"""
Minimum age (days listed) pair list filter
"""
import logging
from copy import deepcopy
from datetime import timedelta
@@ -20,32 +21,40 @@ logger = logging.getLogger(__name__)
class AgeFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
# Checked symbols cache (dictionary of ticker symbol => timestamp)
self._symbolsChecked: Dict[str, int] = {}
self._symbolsCheckFailed = PeriodicCache(maxsize=1000, ttl=86_400)
self._min_days_listed = pairlistconfig.get('min_days_listed', 10)
self._max_days_listed = pairlistconfig.get('max_days_listed')
self._min_days_listed = pairlistconfig.get("min_days_listed", 10)
self._max_days_listed = pairlistconfig.get("max_days_listed")
candle_limit = exchange.ohlcv_candle_limit('1d', self._config['candle_type_def'])
candle_limit = exchange.ohlcv_candle_limit("1d", self._config["candle_type_def"])
if self._min_days_listed < 1:
raise OperationalException("AgeFilter requires min_days_listed to be >= 1")
if self._min_days_listed > candle_limit:
raise OperationalException("AgeFilter requires min_days_listed to not exceed "
"exchange max request size "
f"({candle_limit})")
raise OperationalException(
"AgeFilter requires min_days_listed to not exceed "
"exchange max request size "
f"({candle_limit})"
)
if self._max_days_listed and self._max_days_listed <= self._min_days_listed:
raise OperationalException("AgeFilter max_days_listed <= min_days_listed not permitted")
if self._max_days_listed and self._max_days_listed > candle_limit:
raise OperationalException("AgeFilter requires max_days_listed to not exceed "
"exchange max request size "
f"({candle_limit})")
raise OperationalException(
"AgeFilter requires max_days_listed to not exceed "
"exchange max request size "
f"({candle_limit})"
)
@property
def needstickers(self) -> bool:
@@ -63,10 +72,11 @@ class AgeFilter(IPairList):
return (
f"{self.name} - Filtering pairs with age less than "
f"{self._min_days_listed} {plural(self._min_days_listed, 'day')}"
) + ((
" or more than "
f"{self._max_days_listed} {plural(self._max_days_listed, 'day')}"
) if self._max_days_listed else '')
) + (
(" or more than " f"{self._max_days_listed} {plural(self._max_days_listed, 'day')}")
if self._max_days_listed
else ""
)
@staticmethod
def description() -> str:
@@ -96,21 +106,26 @@ class AgeFilter(IPairList):
:return: new allowlist
"""
needed_pairs: ListPairsWithTimeframes = [
(p, '1d', self._config['candle_type_def']) for p in pairlist
if p not in self._symbolsChecked and p not in self._symbolsCheckFailed]
(p, "1d", self._config["candle_type_def"])
for p in pairlist
if p not in self._symbolsChecked and p not in self._symbolsCheckFailed
]
if not needed_pairs:
# Remove pairs that have been removed before
return [p for p in pairlist if p not in self._symbolsCheckFailed]
since_days = -(
self._max_days_listed if self._max_days_listed else self._min_days_listed
) - 1
since_days = (
-(self._max_days_listed if self._max_days_listed else self._min_days_listed) - 1
)
since_ms = dt_ts(dt_floor_day(dt_now()) + timedelta(days=since_days))
candles = self._exchange.refresh_latest_ohlcv(needed_pairs, since_ms=since_ms, cache=False)
if self._enabled:
for p in deepcopy(pairlist):
daily_candles = candles[(p, '1d', self._config['candle_type_def'])] if (
p, '1d', self._config['candle_type_def']) in candles else None
daily_candles = (
candles[(p, "1d", self._config["candle_type_def"])]
if (p, "1d", self._config["candle_type_def"]) in candles
else None
)
if not self._validate_pair_loc(p, daily_candles):
pairlist.remove(p)
self.log_once(f"Validated {len(pairlist)} pairs.", logger.info)
@@ -128,23 +143,30 @@ class AgeFilter(IPairList):
return True
if daily_candles is not None:
if (
len(daily_candles) >= self._min_days_listed
and (not self._max_days_listed or len(daily_candles) <= self._max_days_listed)
if len(daily_candles) >= self._min_days_listed and (
not self._max_days_listed or len(daily_candles) <= self._max_days_listed
):
# We have fetched at least the minimum required number of daily candles
# Add to cache, store the time we last checked this symbol
self._symbolsChecked[pair] = dt_ts()
return True
else:
self.log_once((
f"Removed {pair} from whitelist, because age "
f"{len(daily_candles)} is less than {self._min_days_listed} "
f"{plural(self._min_days_listed, 'day')}"
) + ((
" or more than "
f"{self._max_days_listed} {plural(self._max_days_listed, 'day')}"
) if self._max_days_listed else ''), logger.info)
self.log_once(
(
f"Removed {pair} from whitelist, because age "
f"{len(daily_candles)} is less than {self._min_days_listed} "
f"{plural(self._min_days_listed, 'day')}"
)
+ (
(
" or more than "
f"{self._max_days_listed} {plural(self._max_days_listed, 'day')}"
)
if self._max_days_listed
else ""
),
logger.info,
)
self._symbolsCheckFailed[pair] = dt_ts()
return False
return False

View File

@@ -1,6 +1,7 @@
"""
Full trade slots pair list filter
"""
import logging
from typing import Any, Dict, List
@@ -14,10 +15,14 @@ logger = logging.getLogger(__name__)
class FullTradesFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
@property
@@ -49,7 +54,7 @@ class FullTradesFilter(IPairList):
"""
# Get the number of open trades and max open trades config
num_open = Trade.get_open_trade_count()
max_trades = self._config['max_open_trades']
max_trades = self._config["max_open_trades"]
if (num_open >= max_trades) and (max_trades > 0):
return []

View File

@@ -1,6 +1,7 @@
"""
PairList Handler base class
"""
import logging
from abc import ABC, abstractmethod, abstractproperty
from copy import deepcopy
@@ -46,17 +47,21 @@ PairlistParameter = Union[
__NumberPairlistParameter,
__StringPairlistParameter,
__OptionPairlistParameter,
__BoolPairlistParameter
]
__BoolPairlistParameter,
]
class IPairList(LoggingMixin, ABC):
is_pairlist_generator = False
def __init__(self, exchange: Exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange: Exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
"""
:param exchange: Exchange instance
:param pairlistmanager: Instantiated Pairlist manager
@@ -71,7 +76,7 @@ class IPairList(LoggingMixin, ABC):
self._config = config
self._pairlistconfig = pairlistconfig
self._pairlist_pos = pairlist_pos
self.refresh_period = self._pairlistconfig.get('refresh_period', 1800)
self.refresh_period = self._pairlistconfig.get("refresh_period", 1800)
LoggingMixin.__init__(self, logger, self.refresh_period)
@property
@@ -155,8 +160,10 @@ class IPairList(LoggingMixin, ABC):
:param tickers: Tickers (from exchange.get_tickers). May be cached.
:return: List of pairs
"""
raise OperationalException("This Pairlist Handler should not be used "
"at the first position in the list of Pairlist Handlers.")
raise OperationalException(
"This Pairlist Handler should not be used "
"at the first position in the list of Pairlist Handlers."
)
def filter_pairlist(self, pairlist: List[str], tickers: Tickers) -> List[str]:
"""
@@ -191,8 +198,9 @@ class IPairList(LoggingMixin, ABC):
"""
return self._pairlistmanager.verify_blacklist(pairlist, logmethod)
def verify_whitelist(self, pairlist: List[str], logmethod,
keep_invalid: bool = False) -> List[str]:
def verify_whitelist(
self, pairlist: List[str], logmethod, keep_invalid: bool = False
) -> List[str]:
"""
Proxy method to verify_whitelist for easy access for child classes.
:param pairlist: Pairlist to validate
@@ -212,26 +220,33 @@ class IPairList(LoggingMixin, ABC):
markets = self._exchange.markets
if not markets:
raise OperationalException(
'Markets not loaded. Make sure that exchange is initialized correctly.')
"Markets not loaded. Make sure that exchange is initialized correctly."
)
sanitized_whitelist: List[str] = []
for pair in pairlist:
# pair is not in the generated dynamic market or has the wrong stake currency
if pair not in markets:
self.log_once(f"Pair {pair} is not compatible with exchange "
f"{self._exchange.name}. Removing it from whitelist..",
logger.warning)
self.log_once(
f"Pair {pair} is not compatible with exchange "
f"{self._exchange.name}. Removing it from whitelist..",
logger.warning,
)
continue
if not self._exchange.market_is_tradable(markets[pair]):
self.log_once(f"Pair {pair} is not tradable with Freqtrade."
"Removing it from whitelist..", logger.warning)
self.log_once(
f"Pair {pair} is not tradable with Freqtrade." "Removing it from whitelist..",
logger.warning,
)
continue
if self._exchange.get_pair_quote_currency(pair) != self._config['stake_currency']:
self.log_once(f"Pair {pair} is not compatible with your stake currency "
f"{self._config['stake_currency']}. Removing it from whitelist..",
logger.warning)
if self._exchange.get_pair_quote_currency(pair) != self._config["stake_currency"]:
self.log_once(
f"Pair {pair} is not compatible with your stake currency "
f"{self._config['stake_currency']}. Removing it from whitelist..",
logger.warning,
)
continue
# Check if market is active

View File

@@ -3,6 +3,7 @@ Market Cap PairList provider
Provides dynamic pair list based on Market Cap
"""
import logging
from typing import Any, Dict, List
@@ -19,31 +20,34 @@ logger = logging.getLogger(__name__)
class MarketCapPairList(IPairList):
is_pairlist_generator = True
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
if 'number_assets' not in self._pairlistconfig:
if "number_assets" not in self._pairlistconfig:
raise OperationalException(
'`number_assets` not specified. Please check your configuration '
'for "pairlist.config.number_assets"')
"`number_assets` not specified. Please check your configuration "
'for "pairlist.config.number_assets"'
)
self._stake_currency = config['stake_currency']
self._number_assets = self._pairlistconfig['number_assets']
self._max_rank = self._pairlistconfig.get('max_rank', 30)
self._refresh_period = self._pairlistconfig.get('refresh_period', 86400)
self._stake_currency = config["stake_currency"]
self._number_assets = self._pairlistconfig["number_assets"]
self._max_rank = self._pairlistconfig.get("max_rank", 30)
self._refresh_period = self._pairlistconfig.get("refresh_period", 86400)
self._marketcap_cache: TTLCache = TTLCache(maxsize=1, ttl=self._refresh_period)
self._def_candletype = self._config['candle_type_def']
self._def_candletype = self._config["candle_type_def"]
self._coingecko: CoinGeckoAPI = CoinGeckoAPI()
if self._max_rank > 250:
raise OperationalException(
"This filter only support marketcap rank up to 250."
)
raise OperationalException("This filter only support marketcap rank up to 250.")
@property
def needstickers(self) -> bool:
@@ -87,7 +91,7 @@ class MarketCapPairList(IPairList):
"default": 86400,
"description": "Refresh period",
"help": "Refresh period in seconds",
}
},
}
def gen_pairlist(self, tickers: Tickers) -> List[str]:
@@ -98,21 +102,24 @@ class MarketCapPairList(IPairList):
"""
# Generate dynamic whitelist
# Must always run if this pairlist is the first in the list.
pairlist = self._marketcap_cache.get('pairlist_mc')
pairlist = self._marketcap_cache.get("pairlist_mc")
if pairlist:
# Item found - no refresh necessary
return pairlist.copy()
else:
# Use fresh pairlist
# Check if pair quote currency equals to the stake currency.
_pairlist = [k for k in self._exchange.get_markets(
quote_currencies=[self._stake_currency],
tradable_only=True, active_only=True).keys()]
_pairlist = [
k
for k in self._exchange.get_markets(
quote_currencies=[self._stake_currency], tradable_only=True, active_only=True
).keys()
]
# No point in testing for blacklisted pairs...
_pairlist = self.verify_blacklist(_pairlist, logger.info)
pairlist = self.filter_pairlist(_pairlist, tickers)
self._marketcap_cache['pairlist_mc'] = pairlist.copy()
self._marketcap_cache["pairlist_mc"] = pairlist.copy()
return pairlist
@@ -124,25 +131,30 @@ class MarketCapPairList(IPairList):
:param tickers: Tickers (from exchange.get_tickers). May be cached.
:return: new whitelist
"""
marketcap_list = self._marketcap_cache.get('marketcap')
marketcap_list = self._marketcap_cache.get("marketcap")
if marketcap_list is None:
data = self._coingecko.get_coins_markets(vs_currency='usd', order='market_cap_desc',
per_page='250', page='1', sparkline='false',
locale='en')
data = self._coingecko.get_coins_markets(
vs_currency="usd",
order="market_cap_desc",
per_page="250",
page="1",
sparkline="false",
locale="en",
)
if data:
marketcap_list = [row['symbol'] for row in data]
self._marketcap_cache['marketcap'] = marketcap_list
marketcap_list = [row["symbol"] for row in data]
self._marketcap_cache["marketcap"] = marketcap_list
if marketcap_list:
filtered_pairlist = []
market = self._config['trading_mode']
market = self._config["trading_mode"]
pair_format = f"{self._stake_currency.upper()}"
if (market == 'futures'):
if market == "futures":
pair_format += f":{self._stake_currency.upper()}"
top_marketcap = marketcap_list[:self._max_rank:]
top_marketcap = marketcap_list[: self._max_rank :]
for mc_pair in top_marketcap:
test_pair = f"{mc_pair.upper()}/{pair_format}"

View File

@@ -1,6 +1,7 @@
"""
Offset pair list filter
"""
import logging
from typing import Any, Dict, List
@@ -14,14 +15,18 @@ logger = logging.getLogger(__name__)
class OffsetFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._offset = pairlistconfig.get('offset', 0)
self._number_pairs = pairlistconfig.get('number_assets', 0)
self._offset = pairlistconfig.get("offset", 0)
self._number_pairs = pairlistconfig.get("number_assets", 0)
if self._offset < 0:
raise OperationalException("OffsetFilter requires offset to be >= 0")
@@ -73,11 +78,13 @@ class OffsetFilter(IPairList):
:return: new whitelist
"""
if self._offset > len(pairlist):
self.log_once(f"Offset of {self._offset} is larger than " +
f"pair count of {len(pairlist)}", logger.warning)
pairs = pairlist[self._offset:]
self.log_once(
f"Offset of {self._offset} is larger than " + f"pair count of {len(pairlist)}",
logger.warning,
)
pairs = pairlist[self._offset :]
if self._number_pairs:
pairs = pairs[:self._number_pairs]
pairs = pairs[: self._number_pairs]
self.log_once(f"Searching {len(pairs)} pairs: {pairs}", logger.info)

View File

@@ -1,6 +1,7 @@
"""
Performance pair list filter
"""
import logging
from typing import Any, Dict, List
@@ -16,14 +17,18 @@ logger = logging.getLogger(__name__)
class PerformanceFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._minutes = pairlistconfig.get('minutes', 0)
self._min_profit = pairlistconfig.get('min_profit')
self._minutes = pairlistconfig.get("minutes", 0)
self._min_profit = pairlistconfig.get("min_profit")
@property
def needstickers(self) -> bool:
@@ -82,25 +87,29 @@ class PerformanceFilter(IPairList):
return pairlist
# Get pairlist from performance dataframe values
list_df = pd.DataFrame({'pair': pairlist})
list_df['prior_idx'] = list_df.index
list_df = pd.DataFrame({"pair": pairlist})
list_df["prior_idx"] = list_df.index
# Set initial value for pairs with no trades to 0
# Sort the list using:
# - primarily performance (high to low)
# - then count (low to high, so as to favor same performance with fewer trades)
# - then by prior index, keeping original sorting order
sorted_df = list_df.merge(performance, on='pair', how='left')\
.fillna(0).sort_values(by=['profit_ratio', 'count', 'prior_idx'],
ascending=[False, True, True])
sorted_df = (
list_df.merge(performance, on="pair", how="left")
.fillna(0)
.sort_values(by=["profit_ratio", "count", "prior_idx"], ascending=[False, True, True])
)
if self._min_profit is not None:
removed = sorted_df[sorted_df['profit_ratio'] < self._min_profit]
removed = sorted_df[sorted_df["profit_ratio"] < self._min_profit]
for _, row in removed.iterrows():
self.log_once(
f"Removing pair {row['pair']} since {row['profit_ratio']} is "
f"below {self._min_profit}", logger.info)
sorted_df = sorted_df[sorted_df['profit_ratio'] >= self._min_profit]
f"below {self._min_profit}",
logger.info,
)
sorted_df = sorted_df[sorted_df["profit_ratio"] >= self._min_profit]
pairlist = sorted_df['pair'].tolist()
pairlist = sorted_df["pair"].tolist()
return pairlist

View File

@@ -1,6 +1,7 @@
"""
Precision pair list filter
"""
import logging
from typing import Any, Dict, Optional
@@ -15,17 +16,22 @@ logger = logging.getLogger(__name__)
class PrecisionFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
if 'stoploss' not in self._config:
if "stoploss" not in self._config:
raise OperationalException(
'PrecisionFilter can only work with stoploss defined. Please add the '
'stoploss key to your configuration (overwrites eventual strategy settings).')
self._stoploss = self._config['stoploss']
"PrecisionFilter can only work with stoploss defined. Please add the "
"stoploss key to your configuration (overwrites eventual strategy settings)."
)
self._stoploss = self._config["stoploss"]
self._enabled = self._stoploss != 0
# Precalculate sanitized stoploss value to avoid recalculation for every pair
@@ -58,23 +64,29 @@ class PrecisionFilter(IPairList):
:param ticker: ticker dict as returned from ccxt.fetch_ticker
:return: True if the pair can stay, false if it should be removed
"""
if not ticker or ticker.get('last', None) is None:
self.log_once(f"Removed {pair} from whitelist, because "
"ticker['last'] is empty (Usually no trade in the last 24h).",
logger.info)
if not ticker or ticker.get("last", None) is None:
self.log_once(
f"Removed {pair} from whitelist, because "
"ticker['last'] is empty (Usually no trade in the last 24h).",
logger.info,
)
return False
stop_price = ticker['last'] * self._stoploss
stop_price = ticker["last"] * self._stoploss
# Adjust stop-prices to precision
sp = self._exchange.price_to_precision(pair, stop_price, rounding_mode=ROUND_UP)
stop_gap_price = self._exchange.price_to_precision(pair, stop_price * 0.99,
rounding_mode=ROUND_UP)
stop_gap_price = self._exchange.price_to_precision(
pair, stop_price * 0.99, rounding_mode=ROUND_UP
)
logger.debug(f"{pair} - {sp} : {stop_gap_price}")
if sp <= stop_gap_price:
self.log_once(f"Removed {pair} from whitelist, because "
f"stop price {sp} would be <= stop limit {stop_gap_price}", logger.info)
self.log_once(
f"Removed {pair} from whitelist, because "
f"stop price {sp} would be <= stop limit {stop_gap_price}",
logger.info,
)
return False
return True

View File

@@ -1,6 +1,7 @@
"""
Price pair list filter
"""
import logging
from typing import Any, Dict, Optional
@@ -14,28 +15,34 @@ logger = logging.getLogger(__name__)
class PriceFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._low_price_ratio = pairlistconfig.get('low_price_ratio', 0)
self._low_price_ratio = pairlistconfig.get("low_price_ratio", 0)
if self._low_price_ratio < 0:
raise OperationalException("PriceFilter requires low_price_ratio to be >= 0")
self._min_price = pairlistconfig.get('min_price', 0)
self._min_price = pairlistconfig.get("min_price", 0)
if self._min_price < 0:
raise OperationalException("PriceFilter requires min_price to be >= 0")
self._max_price = pairlistconfig.get('max_price', 0)
self._max_price = pairlistconfig.get("max_price", 0)
if self._max_price < 0:
raise OperationalException("PriceFilter requires max_price to be >= 0")
self._max_value = pairlistconfig.get('max_value', 0)
self._max_value = pairlistconfig.get("max_value", 0)
if self._max_value < 0:
raise OperationalException("PriceFilter requires max_value to be >= 0")
self._enabled = ((self._low_price_ratio > 0) or
(self._min_price > 0) or
(self._max_price > 0) or
(self._max_value > 0))
self._enabled = (
(self._low_price_ratio > 0)
or (self._min_price > 0)
or (self._max_price > 0)
or (self._max_value > 0)
)
@property
def needstickers(self) -> bool:
@@ -76,8 +83,9 @@ class PriceFilter(IPairList):
"type": "number",
"default": 0,
"description": "Low price ratio",
"help": ("Remove pairs where a price move of 1 price unit (pip) "
"is above this ratio."),
"help": (
"Remove pairs where a price move of 1 price unit (pip) " "is above this ratio."
),
},
"min_price": {
"type": "number",
@@ -106,12 +114,14 @@ class PriceFilter(IPairList):
:param ticker: ticker dict as returned from ccxt.fetch_ticker
:return: True if the pair can stay, false if it should be removed
"""
if ticker and 'last' in ticker and ticker['last'] is not None and ticker.get('last') != 0:
price: float = ticker['last']
if ticker and "last" in ticker and ticker["last"] is not None and ticker.get("last") != 0:
price: float = ticker["last"]
else:
self.log_once(f"Removed {pair} from whitelist, because "
"ticker['last'] is empty (Usually no trade in the last 24h).",
logger.info)
self.log_once(
f"Removed {pair} from whitelist, because "
"ticker['last'] is empty (Usually no trade in the last 24h).",
logger.info,
)
return False
# Perform low_price_ratio check.
@@ -119,17 +129,19 @@ class PriceFilter(IPairList):
compare = self._exchange.price_get_one_pip(pair, price)
changeperc = compare / price
if changeperc > self._low_price_ratio:
self.log_once(f"Removed {pair} from whitelist, "
f"because 1 unit is {changeperc:.3%}", logger.info)
self.log_once(
f"Removed {pair} from whitelist, " f"because 1 unit is {changeperc:.3%}",
logger.info,
)
return False
# Perform low_amount check
if self._max_value != 0:
market = self._exchange.markets[pair]
limits = market['limits']
if (limits['amount']['min'] is not None):
min_amount = limits['amount']['min']
min_precision = market['precision']['amount']
limits = market["limits"]
if limits["amount"]["min"] is not None:
min_amount = limits["amount"]["min"]
min_precision = market["precision"]["amount"]
min_value = min_amount * price
if self._exchange.precisionMode == 4:
@@ -142,23 +154,31 @@ class PriceFilter(IPairList):
diff = next_value - min_value
if diff > self._max_value:
self.log_once(f"Removed {pair} from whitelist, "
f"because min value change of {diff} > {self._max_value}.",
logger.info)
self.log_once(
f"Removed {pair} from whitelist, "
f"because min value change of {diff} > {self._max_value}.",
logger.info,
)
return False
# Perform min_price check.
if self._min_price != 0:
if price < self._min_price:
self.log_once(f"Removed {pair} from whitelist, "
f"because last price < {self._min_price:.8f}", logger.info)
self.log_once(
f"Removed {pair} from whitelist, "
f"because last price < {self._min_price:.8f}",
logger.info,
)
return False
# Perform max_price check.
if self._max_price != 0:
if price > self._max_price:
self.log_once(f"Removed {pair} from whitelist, "
f"because last price > {self._max_price:.8f}", logger.info)
self.log_once(
f"Removed {pair} from whitelist, "
f"because last price > {self._max_price:.8f}",
logger.info,
)
return False
return True

View File

@@ -3,6 +3,7 @@ External Pair List provider
Provides pair list from Leader data
"""
import logging
from typing import Any, Dict, List, Optional
@@ -28,18 +29,25 @@ class ProducerPairList(IPairList):
}
],
"""
is_pairlist_generator = True
def __init__(self, exchange, pairlistmanager,
config: Dict[str, Any], pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Dict[str, Any],
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._num_assets: int = self._pairlistconfig.get('number_assets', 0)
self._producer_name = self._pairlistconfig.get('producer_name', 'default')
if not config.get('external_message_consumer', {}).get('enabled'):
self._num_assets: int = self._pairlistconfig.get("number_assets", 0)
self._producer_name = self._pairlistconfig.get("producer_name", "default")
if not config.get("external_message_consumer", {}).get("enabled"):
raise OperationalException(
"ProducerPairList requires external_message_consumer to be enabled.")
"ProducerPairList requires external_message_consumer to be enabled."
)
@property
def needstickers(self) -> bool:
@@ -74,21 +82,24 @@ class ProducerPairList(IPairList):
"type": "string",
"default": "default",
"description": "Producer name",
"help": ("Name of the producer to use. Requires additional "
"external_message_consumer configuration.")
"help": (
"Name of the producer to use. Requires additional "
"external_message_consumer configuration."
),
},
}
def _filter_pairlist(self, pairlist: Optional[List[str]]):
upstream_pairlist = self._pairlistmanager._dataprovider.get_producer_pairs(
self._producer_name)
self._producer_name
)
if pairlist is None:
pairlist = self._pairlistmanager._dataprovider.get_producer_pairs(self._producer_name)
pairs = list(dict.fromkeys(pairlist + upstream_pairlist))
if self._num_assets:
pairs = pairs[:self._num_assets]
pairs = pairs[: self._num_assets]
return pairs

View File

@@ -3,6 +3,7 @@ Remote PairList provider
Provides pair list fetched from a remote source
"""
import logging
from pathlib import Path
from typing import Any, Dict, List, Tuple
@@ -24,51 +25,59 @@ logger = logging.getLogger(__name__)
class RemotePairList(IPairList):
is_pairlist_generator = True
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
if 'number_assets' not in self._pairlistconfig:
if "number_assets" not in self._pairlistconfig:
raise OperationalException(
'`number_assets` not specified. Please check your configuration '
'for "pairlist.config.number_assets"')
"`number_assets` not specified. Please check your configuration "
'for "pairlist.config.number_assets"'
)
if 'pairlist_url' not in self._pairlistconfig:
if "pairlist_url" not in self._pairlistconfig:
raise OperationalException(
'`pairlist_url` not specified. Please check your configuration '
'for "pairlist.config.pairlist_url"')
"`pairlist_url` not specified. Please check your configuration "
'for "pairlist.config.pairlist_url"'
)
self._mode = self._pairlistconfig.get('mode', 'whitelist')
self._processing_mode = self._pairlistconfig.get('processing_mode', 'filter')
self._number_pairs = self._pairlistconfig['number_assets']
self._refresh_period: int = self._pairlistconfig.get('refresh_period', 1800)
self._keep_pairlist_on_failure = self._pairlistconfig.get('keep_pairlist_on_failure', True)
self._mode = self._pairlistconfig.get("mode", "whitelist")
self._processing_mode = self._pairlistconfig.get("processing_mode", "filter")
self._number_pairs = self._pairlistconfig["number_assets"]
self._refresh_period: int = self._pairlistconfig.get("refresh_period", 1800)
self._keep_pairlist_on_failure = self._pairlistconfig.get("keep_pairlist_on_failure", True)
self._pair_cache: TTLCache = TTLCache(maxsize=1, ttl=self._refresh_period)
self._pairlist_url = self._pairlistconfig.get('pairlist_url', '')
self._read_timeout = self._pairlistconfig.get('read_timeout', 60)
self._bearer_token = self._pairlistconfig.get('bearer_token', '')
self._pairlist_url = self._pairlistconfig.get("pairlist_url", "")
self._read_timeout = self._pairlistconfig.get("read_timeout", 60)
self._bearer_token = self._pairlistconfig.get("bearer_token", "")
self._init_done = False
self._save_to_file = self._pairlistconfig.get('save_to_file', None)
self._save_to_file = self._pairlistconfig.get("save_to_file", None)
self._last_pairlist: List[Any] = list()
if self._mode not in ['whitelist', 'blacklist']:
if self._mode not in ["whitelist", "blacklist"]:
raise OperationalException(
'`mode` not configured correctly. Supported Modes '
'are "whitelist","blacklist"')
"`mode` not configured correctly. Supported Modes " 'are "whitelist","blacklist"'
)
if self._processing_mode not in ['filter', 'append']:
if self._processing_mode not in ["filter", "append"]:
raise OperationalException(
'`processing_mode` not configured correctly. Supported Modes '
'are "filter","append"')
"`processing_mode` not configured correctly. Supported Modes "
'are "filter","append"'
)
if self._pairlist_pos == 0 and self._mode == 'blacklist':
if self._pairlist_pos == 0 and self._mode == "blacklist":
raise OperationalException(
'A `blacklist` mode RemotePairList can not be on the first '
'position of your pairlist.')
"A `blacklist` mode RemotePairList can not be on the first "
"position of your pairlist."
)
@property
def needstickers(self) -> bool:
@@ -146,13 +155,15 @@ class RemotePairList(IPairList):
}
def process_json(self, jsonparse) -> List[str]:
pairlist = jsonparse.get('pairs', [])
remote_refresh_period = int(jsonparse.get('refresh_period', self._refresh_period))
pairlist = jsonparse.get("pairs", [])
remote_refresh_period = int(jsonparse.get("refresh_period", self._refresh_period))
if self._refresh_period < remote_refresh_period:
self.log_once(f'Refresh Period has been increased from {self._refresh_period}'
f' to minimum allowed: {remote_refresh_period} from Remote.', logger.info)
self.log_once(
f"Refresh Period has been increased from {self._refresh_period}"
f" to minimum allowed: {remote_refresh_period} from Remote.",
logger.info,
)
self._refresh_period = remote_refresh_period
self._pair_cache = TTLCache(maxsize=1, ttl=remote_refresh_period)
@@ -164,25 +175,21 @@ class RemotePairList(IPairList):
def return_last_pairlist(self) -> List[str]:
if self._keep_pairlist_on_failure:
pairlist = self._last_pairlist
self.log_once('Keeping last fetched pairlist', logger.info)
self.log_once("Keeping last fetched pairlist", logger.info)
else:
pairlist = []
return pairlist
def fetch_pairlist(self) -> Tuple[List[str], float]:
headers = {
'User-Agent': 'Freqtrade/' + __version__ + ' Remotepairlist'
}
headers = {"User-Agent": "Freqtrade/" + __version__ + " Remotepairlist"}
if self._bearer_token:
headers['Authorization'] = f'Bearer {self._bearer_token}'
headers["Authorization"] = f"Bearer {self._bearer_token}"
try:
response = requests.get(self._pairlist_url, headers=headers,
timeout=self._read_timeout)
content_type = response.headers.get('content-type')
response = requests.get(self._pairlist_url, headers=headers, timeout=self._read_timeout)
content_type = response.headers.get("content-type")
time_elapsed = response.elapsed.total_seconds()
if "application/json" in str(content_type):
@@ -191,14 +198,16 @@ class RemotePairList(IPairList):
try:
pairlist = self.process_json(jsonparse)
except Exception as e:
pairlist = self._handle_error(f'Failed processing JSON data: {type(e)}')
pairlist = self._handle_error(f"Failed processing JSON data: {type(e)}")
else:
pairlist = self._handle_error(f'RemotePairList is not of type JSON.'
f' {self._pairlist_url}')
pairlist = self._handle_error(
f"RemotePairList is not of type JSON." f" {self._pairlist_url}"
)
except requests.exceptions.RequestException:
pairlist = self._handle_error(f'Was not able to fetch pairlist from:'
f' {self._pairlist_url}')
pairlist = self._handle_error(
f"Was not able to fetch pairlist from:" f" {self._pairlist_url}"
)
time_elapsed = 0
@@ -219,7 +228,7 @@ class RemotePairList(IPairList):
"""
if self._init_done:
pairlist = self._pair_cache.get('pairlist')
pairlist = self._pair_cache.get("pairlist")
if pairlist == [None]:
# Valid but empty pairlist.
return []
@@ -243,7 +252,7 @@ class RemotePairList(IPairList):
jsonparse = rapidjson.load(json_file, parse_mode=CONFIG_PARSE_MODE)
pairlist = self.process_json(jsonparse)
except Exception as e:
pairlist = self._handle_error(f'processing JSON data: {type(e)}')
pairlist = self._handle_error(f"processing JSON data: {type(e)}")
else:
pairlist = self._handle_error(f"{self._pairlist_url} does not exist.")
@@ -255,18 +264,18 @@ class RemotePairList(IPairList):
pairlist = expand_pairlist(pairlist, list(self._exchange.get_markets().keys()))
pairlist = self._whitelist_for_active_markets(pairlist)
pairlist = pairlist[:self._number_pairs]
pairlist = pairlist[: self._number_pairs]
if pairlist:
self._pair_cache['pairlist'] = pairlist.copy()
self._pair_cache["pairlist"] = pairlist.copy()
else:
# If pairlist is empty, set a dummy value to avoid fetching again
self._pair_cache['pairlist'] = [None]
self._pair_cache["pairlist"] = [None]
if time_elapsed != 0.0:
self.log_once(f'Pairlist Fetched in {time_elapsed} seconds.', logger.info)
self.log_once(f"Pairlist Fetched in {time_elapsed} seconds.", logger.info)
else:
self.log_once('Fetched Pairlist.', logger.info)
self.log_once("Fetched Pairlist.", logger.info)
self._last_pairlist = list(pairlist)
@@ -276,12 +285,10 @@ class RemotePairList(IPairList):
return pairlist
def save_pairlist(self, pairlist: List[str], filename: str) -> None:
pairlist_data = {
"pairs": pairlist
}
pairlist_data = {"pairs": pairlist}
try:
file_path = Path(filename)
with file_path.open('w') as json_file:
with file_path.open("w") as json_file:
rapidjson.dump(pairlist_data, json_file)
logger.info(f"Processed pairlist saved to {filename}")
except Exception as e:
@@ -314,5 +321,5 @@ class RemotePairList(IPairList):
if filtered:
self.log_once(f"Blacklist - Filtered out pairs: {filtered}", logger.info)
merged_list = merged_list[:self._number_pairs]
merged_list = merged_list[: self._number_pairs]
return merged_list

View File

@@ -1,6 +1,7 @@
"""
Shuffle pair list filter
"""
import logging
import random
from typing import Any, Dict, List, Literal
@@ -15,29 +16,34 @@ from freqtrade.util.periodic_cache import PeriodicCache
logger = logging.getLogger(__name__)
ShuffleValues = Literal['candle', 'iteration']
ShuffleValues = Literal["candle", "iteration"]
class ShuffleFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
# Apply seed in backtesting mode to get comparable results,
# but not in live modes to get a non-repeating order of pairs during live modes.
if config.get('runmode') in (RunMode.LIVE, RunMode.DRY_RUN):
if config.get("runmode") in (RunMode.LIVE, RunMode.DRY_RUN):
self._seed = None
logger.info("Live mode detected, not applying seed.")
else:
self._seed = pairlistconfig.get('seed')
self._seed = pairlistconfig.get("seed")
logger.info(f"Backtesting mode detected, applying seed value: {self._seed}")
self._random = random.Random(self._seed)
self._shuffle_freq: ShuffleValues = pairlistconfig.get('shuffle_frequency', 'candle')
self._shuffle_freq: ShuffleValues = pairlistconfig.get("shuffle_frequency", "candle")
self.__pairlist_cache = PeriodicCache(
maxsize=1000, ttl=timeframe_to_seconds(self._config['timeframe']))
maxsize=1000, ttl=timeframe_to_seconds(self._config["timeframe"])
)
@property
def needstickers(self) -> bool:
@@ -52,8 +58,9 @@ class ShuffleFilter(IPairList):
"""
Short whitelist method description - used for startup-messages
"""
return (f"{self.name} - Shuffling pairs every {self._shuffle_freq}" +
(f", seed = {self._seed}." if self._seed is not None else "."))
return f"{self.name} - Shuffling pairs every {self._shuffle_freq}" + (
f", seed = {self._seed}." if self._seed is not None else "."
)
@staticmethod
def description() -> str:
@@ -87,7 +94,7 @@ class ShuffleFilter(IPairList):
"""
pairlist_bef = tuple(pairlist)
pairlist_new = self.__pairlist_cache.get(pairlist_bef)
if pairlist_new and self._shuffle_freq == 'candle':
if pairlist_new and self._shuffle_freq == "candle":
# Use cached pairlist.
return pairlist_new
# Shuffle is done inplace

View File

@@ -1,6 +1,7 @@
"""
Spread pair list filter
"""
import logging
from typing import Any, Dict, Optional
@@ -14,16 +15,20 @@ logger = logging.getLogger(__name__)
class SpreadFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._max_spread_ratio = pairlistconfig.get('max_spread_ratio', 0.005)
self._max_spread_ratio = pairlistconfig.get("max_spread_ratio", 0.005)
self._enabled = self._max_spread_ratio != 0
if not self._exchange.get_option('tickers_have_bid_ask'):
if not self._exchange.get_option("tickers_have_bid_ask"):
raise OperationalException(
f"{self.name} requires exchange to have bid/ask data for tickers, "
"which is not available for the selected exchange / trading mode."
@@ -42,8 +47,10 @@ class SpreadFilter(IPairList):
"""
Short whitelist method description - used for startup-messages
"""
return (f"{self.name} - Filtering pairs with ask/bid diff above "
f"{self._max_spread_ratio:.2%}.")
return (
f"{self.name} - Filtering pairs with ask/bid diff above "
f"{self._max_spread_ratio:.2%}."
)
@staticmethod
def description() -> str:
@@ -67,15 +74,18 @@ class SpreadFilter(IPairList):
:param ticker: ticker dict as returned from ccxt.fetch_ticker
:return: True if the pair can stay, false if it should be removed
"""
if ticker and 'bid' in ticker and 'ask' in ticker and ticker['ask'] and ticker['bid']:
spread = 1 - ticker['bid'] / ticker['ask']
if ticker and "bid" in ticker and "ask" in ticker and ticker["ask"] and ticker["bid"]:
spread = 1 - ticker["bid"] / ticker["ask"]
if spread > self._max_spread_ratio:
self.log_once(f"Removed {pair} from whitelist, because spread "
f"{spread:.3%} > {self._max_spread_ratio:.3%}",
logger.info)
self.log_once(
f"Removed {pair} from whitelist, because spread "
f"{spread:.3%} > {self._max_spread_ratio:.3%}",
logger.info,
)
return False
else:
return True
self.log_once(f"Removed {pair} from whitelist due to invalid ticker data: {ticker}",
logger.info)
self.log_once(
f"Removed {pair} from whitelist due to invalid ticker data: {ticker}", logger.info
)
return False

View File

@@ -3,6 +3,7 @@ Static Pair List provider
Provides pair white list as it configured in config
"""
import logging
from copy import deepcopy
from typing import Any, Dict, List
@@ -16,15 +17,19 @@ logger = logging.getLogger(__name__)
class StaticPairList(IPairList):
is_pairlist_generator = True
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._allow_inactive = self._pairlistconfig.get('allow_inactive', False)
self._allow_inactive = self._pairlistconfig.get("allow_inactive", False)
@property
def needstickers(self) -> bool:
@@ -65,11 +70,12 @@ class StaticPairList(IPairList):
"""
if self._allow_inactive:
return self.verify_whitelist(
self._config['exchange']['pair_whitelist'], logger.info, keep_invalid=True
self._config["exchange"]["pair_whitelist"], logger.info, keep_invalid=True
)
else:
return self._whitelist_for_active_markets(
self.verify_whitelist(self._config['exchange']['pair_whitelist'], logger.info))
self.verify_whitelist(self._config["exchange"]["pair_whitelist"], logger.info)
)
def filter_pairlist(self, pairlist: List[str], tickers: Tickers) -> List[str]:
"""
@@ -80,7 +86,7 @@ class StaticPairList(IPairList):
:return: new whitelist
"""
pairlist_ = deepcopy(pairlist)
for pair in self._config['exchange']['pair_whitelist']:
for pair in self._config["exchange"]["pair_whitelist"]:
if pair not in pairlist_:
pairlist_.append(pair)
return pairlist_

View File

@@ -1,6 +1,7 @@
"""
Volatility pairlist filter
"""
import logging
import sys
from datetime import timedelta
@@ -26,29 +27,38 @@ class VolatilityFilter(IPairList):
Filters pairs by volatility
"""
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._days = pairlistconfig.get('lookback_days', 10)
self._min_volatility = pairlistconfig.get('min_volatility', 0)
self._max_volatility = pairlistconfig.get('max_volatility', sys.maxsize)
self._refresh_period = pairlistconfig.get('refresh_period', 1440)
self._def_candletype = self._config['candle_type_def']
self._sort_direction: Optional[str] = pairlistconfig.get('sort_direction', None)
self._days = pairlistconfig.get("lookback_days", 10)
self._min_volatility = pairlistconfig.get("min_volatility", 0)
self._max_volatility = pairlistconfig.get("max_volatility", sys.maxsize)
self._refresh_period = pairlistconfig.get("refresh_period", 1440)
self._def_candletype = self._config["candle_type_def"]
self._sort_direction: Optional[str] = pairlistconfig.get("sort_direction", None)
self._pair_cache: TTLCache = TTLCache(maxsize=1000, ttl=self._refresh_period)
candle_limit = exchange.ohlcv_candle_limit('1d', self._config['candle_type_def'])
candle_limit = exchange.ohlcv_candle_limit("1d", self._config["candle_type_def"])
if self._days < 1:
raise OperationalException("VolatilityFilter requires lookback_days to be >= 1")
if self._days > candle_limit:
raise OperationalException("VolatilityFilter requires lookback_days to not "
f"exceed exchange max request size ({candle_limit})")
if self._sort_direction not in [None, 'asc', 'desc']:
raise OperationalException("VolatilityFilter requires sort_direction to be "
"either None (undefined), 'asc' or 'desc'")
raise OperationalException(
"VolatilityFilter requires lookback_days to not "
f"exceed exchange max request size ({candle_limit})"
)
if self._sort_direction not in [None, "asc", "desc"]:
raise OperationalException(
"VolatilityFilter requires sort_direction to be "
"either None (undefined), 'asc' or 'desc'"
)
@property
def needstickers(self) -> bool:
@@ -63,9 +73,11 @@ class VolatilityFilter(IPairList):
"""
Short whitelist method description - used for startup-messages
"""
return (f"{self.name} - Filtering pairs with volatility range "
f"{self._min_volatility}-{self._max_volatility} "
f" the last {self._days} {plural(self._days, 'day')}.")
return (
f"{self.name} - Filtering pairs with volatility range "
f"{self._min_volatility}-{self._max_volatility} "
f" the last {self._days} {plural(self._days, 'day')}."
)
@staticmethod
def description() -> str:
@@ -99,7 +111,7 @@ class VolatilityFilter(IPairList):
"description": "Sort pairlist",
"help": "Sort Pairlist ascending or descending by volatility.",
},
**IPairList.refresh_period_parameter()
**IPairList.refresh_period_parameter(),
}
def filter_pairlist(self, pairlist: List[str], tickers: Tickers) -> List[str]:
@@ -110,7 +122,8 @@ class VolatilityFilter(IPairList):
:return: new allowlist
"""
needed_pairs: ListPairsWithTimeframes = [
(p, '1d', self._def_candletype) for p in pairlist if p not in self._pair_cache]
(p, "1d", self._def_candletype) for p in pairlist if p not in self._pair_cache
]
since_ms = dt_ts(dt_floor_day(dt_now()) - timedelta(days=self._days))
candles = self._exchange.refresh_ohlcv_with_cache(needed_pairs, since_ms=since_ms)
@@ -118,7 +131,7 @@ class VolatilityFilter(IPairList):
resulting_pairlist: List[str] = []
volatilitys: Dict[str, float] = {}
for p in pairlist:
daily_candles = candles.get((p, '1d', self._def_candletype), None)
daily_candles = candles.get((p, "1d", self._def_candletype), None)
volatility_avg = self._calculate_volatility(p, daily_candles)
@@ -132,18 +145,20 @@ class VolatilityFilter(IPairList):
self.log_once(f"Removed {p} from whitelist, no candles found.", logger.info)
if self._sort_direction:
resulting_pairlist = sorted(resulting_pairlist,
key=lambda p: volatilitys[p],
reverse=self._sort_direction == 'desc')
resulting_pairlist = sorted(
resulting_pairlist,
key=lambda p: volatilitys[p],
reverse=self._sort_direction == "desc",
)
return resulting_pairlist
def _calculate_volatility(self, pair: str, daily_candles: DataFrame) -> Optional[float]:
def _calculate_volatility(self, pair: str, daily_candles: DataFrame) -> Optional[float]:
# Check symbol in cache
if (volatility_avg := self._pair_cache.get(pair, None)) is not None:
return volatility_avg
if daily_candles is not None and not daily_candles.empty:
returns = (np.log(daily_candles["close"].shift(1) / daily_candles["close"]))
returns = np.log(daily_candles["close"].shift(1) / daily_candles["close"])
returns.fillna(0, inplace=True)
volatility_series = returns.rolling(window=self._days).std() * np.sqrt(self._days)
@@ -165,11 +180,13 @@ class VolatilityFilter(IPairList):
if self._min_volatility <= volatility_avg <= self._max_volatility:
result = True
else:
self.log_once(f"Removed {pair} from whitelist, because volatility "
f"over {self._days} {plural(self._days, 'day')} "
f"is: {volatility_avg:.3f} "
f"which is not in the configured range of "
f"{self._min_volatility}-{self._max_volatility}.",
logger.info)
self.log_once(
f"Removed {pair} from whitelist, because volatility "
f"over {self._days} {plural(self._days, 'day')} "
f"is: {volatility_avg:.3f} "
f"which is not in the configured range of "
f"{self._min_volatility}-{self._max_volatility}.",
logger.info,
)
result = False
return result

View File

@@ -3,6 +3,7 @@ Volume PairList provider
Provides dynamic pair list based on trade volumes
"""
import logging
from datetime import timedelta
from typing import Any, Dict, List, Literal
@@ -20,45 +21,50 @@ from freqtrade.util import dt_now, format_ms_time
logger = logging.getLogger(__name__)
SORT_VALUES = ['quoteVolume']
SORT_VALUES = ["quoteVolume"]
class VolumePairList(IPairList):
is_pairlist_generator = True
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
if 'number_assets' not in self._pairlistconfig:
if "number_assets" not in self._pairlistconfig:
raise OperationalException(
'`number_assets` not specified. Please check your configuration '
'for "pairlist.config.number_assets"')
"`number_assets` not specified. Please check your configuration "
'for "pairlist.config.number_assets"'
)
self._stake_currency = config['stake_currency']
self._number_pairs = self._pairlistconfig['number_assets']
self._sort_key: Literal['quoteVolume'] = self._pairlistconfig.get('sort_key', 'quoteVolume')
self._min_value = self._pairlistconfig.get('min_value', 0)
self._stake_currency = config["stake_currency"]
self._number_pairs = self._pairlistconfig["number_assets"]
self._sort_key: Literal["quoteVolume"] = self._pairlistconfig.get("sort_key", "quoteVolume")
self._min_value = self._pairlistconfig.get("min_value", 0)
self._max_value = self._pairlistconfig.get("max_value", None)
self._refresh_period = self._pairlistconfig.get('refresh_period', 1800)
self._refresh_period = self._pairlistconfig.get("refresh_period", 1800)
self._pair_cache: TTLCache = TTLCache(maxsize=1, ttl=self._refresh_period)
self._lookback_days = self._pairlistconfig.get('lookback_days', 0)
self._lookback_timeframe = self._pairlistconfig.get('lookback_timeframe', '1d')
self._lookback_period = self._pairlistconfig.get('lookback_period', 0)
self._def_candletype = self._config['candle_type_def']
self._lookback_days = self._pairlistconfig.get("lookback_days", 0)
self._lookback_timeframe = self._pairlistconfig.get("lookback_timeframe", "1d")
self._lookback_period = self._pairlistconfig.get("lookback_period", 0)
self._def_candletype = self._config["candle_type_def"]
if (self._lookback_days > 0) & (self._lookback_period > 0):
raise OperationalException(
'Ambigous configuration: lookback_days and lookback_period both set in pairlist '
'config. Please set lookback_days only or lookback_period and lookback_timeframe '
'and restart the bot.'
"Ambigous configuration: lookback_days and lookback_period both set in pairlist "
"config. Please set lookback_days only or lookback_period and lookback_timeframe "
"and restart the bot."
)
# overwrite lookback timeframe and days when lookback_days is set
if self._lookback_days > 0:
self._lookback_timeframe = '1d'
self._lookback_timeframe = "1d"
self._lookback_period = self._lookback_days
# get timeframe in minutes and seconds
@@ -70,14 +76,15 @@ class VolumePairList(IPairList):
if self._use_range & (self._refresh_period < _tf_in_sec):
raise OperationalException(
f'Refresh period of {self._refresh_period} seconds is smaller than one '
f'timeframe of {self._lookback_timeframe}. Please adjust refresh_period '
f'to at least {_tf_in_sec} and restart the bot.'
f"Refresh period of {self._refresh_period} seconds is smaller than one "
f"timeframe of {self._lookback_timeframe}. Please adjust refresh_period "
f"to at least {_tf_in_sec} and restart the bot."
)
if (not self._use_range and not (
self._exchange.exchange_has('fetchTickers')
and self._exchange.get_option("tickers_have_quoteVolume"))):
if not self._use_range and not (
self._exchange.exchange_has("fetchTickers")
and self._exchange.get_option("tickers_have_quoteVolume")
):
raise OperationalException(
"Exchange does not support dynamic whitelist in this configuration. "
"Please edit your config and either remove Volumepairlist, "
@@ -85,16 +92,18 @@ class VolumePairList(IPairList):
)
if not self._validate_keys(self._sort_key):
raise OperationalException(
f'key {self._sort_key} not in {SORT_VALUES}')
raise OperationalException(f"key {self._sort_key} not in {SORT_VALUES}")
candle_limit = exchange.ohlcv_candle_limit(
self._lookback_timeframe, self._config['candle_type_def'])
self._lookback_timeframe, self._config["candle_type_def"]
)
if self._lookback_period < 0:
raise OperationalException("VolumeFilter requires lookback_period to be >= 0")
if self._lookback_period > candle_limit:
raise OperationalException("VolumeFilter requires lookback_period to not "
f"exceed exchange max request size ({candle_limit})")
raise OperationalException(
"VolumeFilter requires lookback_period to not "
f"exceed exchange max request size ({candle_limit})"
)
@property
def needstickers(self) -> bool:
@@ -175,30 +184,37 @@ class VolumePairList(IPairList):
"""
# Generate dynamic whitelist
# Must always run if this pairlist is not the first in the list.
pairlist = self._pair_cache.get('pairlist')
pairlist = self._pair_cache.get("pairlist")
if pairlist:
# Item found - no refresh necessary
return pairlist.copy()
else:
# Use fresh pairlist
# Check if pair quote currency equals to the stake currency.
_pairlist = [k for k in self._exchange.get_markets(
quote_currencies=[self._stake_currency],
tradable_only=True, active_only=True).keys()]
_pairlist = [
k
for k in self._exchange.get_markets(
quote_currencies=[self._stake_currency], tradable_only=True, active_only=True
).keys()
]
# No point in testing for blacklisted pairs...
_pairlist = self.verify_blacklist(_pairlist, logger.info)
if not self._use_range:
filtered_tickers = [
v for k, v in tickers.items()
if (self._exchange.get_pair_quote_currency(k) == self._stake_currency
v
for k, v in tickers.items()
if (
self._exchange.get_pair_quote_currency(k) == self._stake_currency
and (self._use_range or v.get(self._sort_key) is not None)
and v['symbol'] in _pairlist)]
pairlist = [s['symbol'] for s in filtered_tickers]
and v["symbol"] in _pairlist
)
]
pairlist = [s["symbol"] for s in filtered_tickers]
else:
pairlist = _pairlist
pairlist = self.filter_pairlist(pairlist, tickers)
self._pair_cache['pairlist'] = pairlist.copy()
self._pair_cache["pairlist"] = pairlist.copy()
return pairlist
@@ -212,81 +228,95 @@ class VolumePairList(IPairList):
"""
if self._use_range:
# Create bare minimum from tickers structure.
filtered_tickers: List[Dict[str, Any]] = [{'symbol': k} for k in pairlist]
filtered_tickers: List[Dict[str, Any]] = [{"symbol": k} for k in pairlist]
# get lookback period in ms, for exchange ohlcv fetch
since_ms = int(timeframe_to_prev_date(
self._lookback_timeframe,
dt_now() + timedelta(
minutes=-(self._lookback_period * self._tf_in_min) - self._tf_in_min)
).timestamp()) * 1000
since_ms = (
int(
timeframe_to_prev_date(
self._lookback_timeframe,
dt_now()
+ timedelta(
minutes=-(self._lookback_period * self._tf_in_min) - self._tf_in_min
),
).timestamp()
)
* 1000
)
to_ms = int(timeframe_to_prev_date(
self._lookback_timeframe,
dt_now() - timedelta(minutes=self._tf_in_min)
).timestamp()) * 1000
to_ms = (
int(
timeframe_to_prev_date(
self._lookback_timeframe, dt_now() - timedelta(minutes=self._tf_in_min)
).timestamp()
)
* 1000
)
# todo: utc date output for starting date
self.log_once(f"Using volume range of {self._lookback_period} candles, timeframe: "
f"{self._lookback_timeframe}, starting from {format_ms_time(since_ms)} "
f"till {format_ms_time(to_ms)}", logger.info)
self.log_once(
f"Using volume range of {self._lookback_period} candles, timeframe: "
f"{self._lookback_timeframe}, starting from {format_ms_time(since_ms)} "
f"till {format_ms_time(to_ms)}",
logger.info,
)
needed_pairs: ListPairsWithTimeframes = [
(p, self._lookback_timeframe, self._def_candletype) for p in
[s['symbol'] for s in filtered_tickers]
(p, self._lookback_timeframe, self._def_candletype)
for p in [s["symbol"] for s in filtered_tickers]
if p not in self._pair_cache
]
candles = self._exchange.refresh_ohlcv_with_cache(needed_pairs, since_ms)
for i, p in enumerate(filtered_tickers):
contract_size = self._exchange.markets[p['symbol']].get('contractSize', 1.0) or 1.0
pair_candles = candles[
(p['symbol'], self._lookback_timeframe, self._def_candletype)
] if (
p['symbol'], self._lookback_timeframe, self._def_candletype
) in candles else None
contract_size = self._exchange.markets[p["symbol"]].get("contractSize", 1.0) or 1.0
pair_candles = (
candles[(p["symbol"], self._lookback_timeframe, self._def_candletype)]
if (p["symbol"], self._lookback_timeframe, self._def_candletype) in candles
else None
)
# in case of candle data calculate typical price and quoteVolume for candle
if pair_candles is not None and not pair_candles.empty:
if self._exchange.get_option("ohlcv_volume_currency") == "base":
pair_candles['typical_price'] = (pair_candles['high'] + pair_candles['low']
+ pair_candles['close']) / 3
pair_candles["typical_price"] = (
pair_candles["high"] + pair_candles["low"] + pair_candles["close"]
) / 3
pair_candles['quoteVolume'] = (
pair_candles['volume'] * pair_candles['typical_price']
* contract_size
pair_candles["quoteVolume"] = (
pair_candles["volume"] * pair_candles["typical_price"] * contract_size
)
else:
# Exchange ohlcv data is in quote volume already.
pair_candles['quoteVolume'] = pair_candles['volume']
pair_candles["quoteVolume"] = pair_candles["volume"]
# ensure that a rolling sum over the lookback_period is built
# if pair_candles contains more candles than lookback_period
quoteVolume = (pair_candles['quoteVolume']
.rolling(self._lookback_period)
.sum()
.fillna(0)
.iloc[-1])
quoteVolume = (
pair_candles["quoteVolume"]
.rolling(self._lookback_period)
.sum()
.fillna(0)
.iloc[-1]
)
# replace quoteVolume with range quoteVolume sum calculated above
filtered_tickers[i]['quoteVolume'] = quoteVolume
filtered_tickers[i]["quoteVolume"] = quoteVolume
else:
filtered_tickers[i]['quoteVolume'] = 0
filtered_tickers[i]["quoteVolume"] = 0
else:
# Tickers mode - filter based on incoming pairlist.
filtered_tickers = [v for k, v in tickers.items() if k in pairlist]
if self._min_value > 0:
filtered_tickers = [
v for v in filtered_tickers if v[self._sort_key] > self._min_value]
filtered_tickers = [v for v in filtered_tickers if v[self._sort_key] > self._min_value]
if self._max_value is not None:
filtered_tickers = [
v for v in filtered_tickers if v[self._sort_key] < self._max_value]
filtered_tickers = [v for v in filtered_tickers if v[self._sort_key] < self._max_value]
sorted_tickers = sorted(filtered_tickers, reverse=True, key=lambda t: t[self._sort_key])
# Validate whitelist to only have active market pairs
pairs = self._whitelist_for_active_markets([s['symbol'] for s in sorted_tickers])
pairs = self._whitelist_for_active_markets([s["symbol"] for s in sorted_tickers])
pairs = self.verify_blacklist(pairs, logmethod=logger.info)
# Limit pairlist to the requested number of pairs
pairs = pairs[:self._number_pairs]
pairs = pairs[: self._number_pairs]
return pairs

View File

@@ -4,8 +4,9 @@ from typing import List
from freqtrade.constants import Config
def expand_pairlist(wildcardpl: List[str], available_pairs: List[str],
keep_invalid: bool = False) -> List[str]:
def expand_pairlist(
wildcardpl: List[str], available_pairs: List[str], keep_invalid: bool = False
) -> List[str]:
"""
Expand pairlist potentially containing wildcards based on available markets.
This will implicitly filter all pairs in the wildcard-list which are not in available_pairs.
@@ -20,34 +21,29 @@ def expand_pairlist(wildcardpl: List[str], available_pairs: List[str],
for pair_wc in wildcardpl:
try:
comp = re.compile(pair_wc, re.IGNORECASE)
result_partial = [
pair for pair in available_pairs if re.fullmatch(comp, pair)
]
result_partial = [pair for pair in available_pairs if re.fullmatch(comp, pair)]
# Add all matching pairs.
# If there are no matching pairs (Pair not on exchange) keep it.
result += result_partial or [pair_wc]
except re.error as err:
raise ValueError(f"Wildcard error in {pair_wc}, {err}")
result = [element for element in result if re.fullmatch(r'^[A-Za-z0-9:/-]+$', element)]
result = [element for element in result if re.fullmatch(r"^[A-Za-z0-9:/-]+$", element)]
else:
for pair_wc in wildcardpl:
try:
comp = re.compile(pair_wc, re.IGNORECASE)
result += [
pair for pair in available_pairs if re.fullmatch(comp, pair)
]
result += [pair for pair in available_pairs if re.fullmatch(comp, pair)]
except re.error as err:
raise ValueError(f"Wildcard error in {pair_wc}, {err}")
return result
def dynamic_expand_pairlist(config: Config, markets: List[str]) -> List[str]:
expanded_pairs = expand_pairlist(config['pairs'], markets)
if config.get('freqai', {}).get('enabled', False):
corr_pairlist = config['freqai']['feature_parameters']['include_corr_pairlist']
expanded_pairs += [pair for pair in corr_pairlist
if pair not in config['pairs']]
expanded_pairs = expand_pairlist(config["pairs"], markets)
if config.get("freqai", {}).get("enabled", False):
corr_pairlist = config["freqai"]["feature_parameters"]["include_corr_pairlist"]
expanded_pairs += [pair for pair in corr_pairlist if pair not in config["pairs"]]
return expanded_pairs

View File

@@ -1,6 +1,7 @@
"""
Rate of change pairlist filter
"""
import logging
from datetime import timedelta
from typing import Any, Dict, List, Optional
@@ -20,30 +21,38 @@ logger = logging.getLogger(__name__)
class RangeStabilityFilter(IPairList):
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
def __init__(
self,
exchange,
pairlistmanager,
config: Config,
pairlistconfig: Dict[str, Any],
pairlist_pos: int,
) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
self._days = pairlistconfig.get('lookback_days', 10)
self._min_rate_of_change = pairlistconfig.get('min_rate_of_change', 0.01)
self._max_rate_of_change = pairlistconfig.get('max_rate_of_change')
self._refresh_period = pairlistconfig.get('refresh_period', 86400)
self._def_candletype = self._config['candle_type_def']
self._sort_direction: Optional[str] = pairlistconfig.get('sort_direction', None)
self._days = pairlistconfig.get("lookback_days", 10)
self._min_rate_of_change = pairlistconfig.get("min_rate_of_change", 0.01)
self._max_rate_of_change = pairlistconfig.get("max_rate_of_change")
self._refresh_period = pairlistconfig.get("refresh_period", 86400)
self._def_candletype = self._config["candle_type_def"]
self._sort_direction: Optional[str] = pairlistconfig.get("sort_direction", None)
self._pair_cache: TTLCache = TTLCache(maxsize=1000, ttl=self._refresh_period)
candle_limit = exchange.ohlcv_candle_limit('1d', self._config['candle_type_def'])
candle_limit = exchange.ohlcv_candle_limit("1d", self._config["candle_type_def"])
if self._days < 1:
raise OperationalException("RangeStabilityFilter requires lookback_days to be >= 1")
if self._days > candle_limit:
raise OperationalException("RangeStabilityFilter requires lookback_days to not "
f"exceed exchange max request size ({candle_limit})")
if self._sort_direction not in [None, 'asc', 'desc']:
raise OperationalException("RangeStabilityFilter requires sort_direction to be "
"either None (undefined), 'asc' or 'desc'")
raise OperationalException(
"RangeStabilityFilter requires lookback_days to not "
f"exceed exchange max request size ({candle_limit})"
)
if self._sort_direction not in [None, "asc", "desc"]:
raise OperationalException(
"RangeStabilityFilter requires sort_direction to be "
"either None (undefined), 'asc' or 'desc'"
)
@property
def needstickers(self) -> bool:
@@ -60,10 +69,12 @@ class RangeStabilityFilter(IPairList):
"""
max_rate_desc = ""
if self._max_rate_of_change:
max_rate_desc = (f" and above {self._max_rate_of_change}")
return (f"{self.name} - Filtering pairs with rate of change below "
f"{self._min_rate_of_change}{max_rate_desc} over the "
f"last {plural(self._days, 'day')}.")
max_rate_desc = f" and above {self._max_rate_of_change}"
return (
f"{self.name} - Filtering pairs with rate of change below "
f"{self._min_rate_of_change}{max_rate_desc} over the "
f"last {plural(self._days, 'day')}."
)
@staticmethod
def description() -> str:
@@ -97,7 +108,7 @@ class RangeStabilityFilter(IPairList):
"description": "Sort pairlist",
"help": "Sort Pairlist ascending or descending by rate of change.",
},
**IPairList.refresh_period_parameter()
**IPairList.refresh_period_parameter(),
}
def filter_pairlist(self, pairlist: List[str], tickers: Tickers) -> List[str]:
@@ -108,7 +119,8 @@ class RangeStabilityFilter(IPairList):
:return: new allowlist
"""
needed_pairs: ListPairsWithTimeframes = [
(p, '1d', self._def_candletype) for p in pairlist if p not in self._pair_cache]
(p, "1d", self._def_candletype) for p in pairlist if p not in self._pair_cache
]
since_ms = dt_ts(dt_floor_day(dt_now()) - timedelta(days=self._days + 1))
candles = self._exchange.refresh_ohlcv_with_cache(needed_pairs, since_ms=since_ms)
@@ -117,7 +129,7 @@ class RangeStabilityFilter(IPairList):
pct_changes: Dict[str, float] = {}
for p in pairlist:
daily_candles = candles.get((p, '1d', self._def_candletype), None)
daily_candles = candles.get((p, "1d", self._def_candletype), None)
pct_change = self._calculate_rate_of_change(p, daily_candles)
@@ -129,9 +141,11 @@ class RangeStabilityFilter(IPairList):
self.log_once(f"Removed {p} from whitelist, no candles found.", logger.info)
if self._sort_direction:
resulting_pairlist = sorted(resulting_pairlist,
key=lambda p: pct_changes[p],
reverse=self._sort_direction == 'desc')
resulting_pairlist = sorted(
resulting_pairlist,
key=lambda p: pct_changes[p],
reverse=self._sort_direction == "desc",
)
return resulting_pairlist
def _calculate_rate_of_change(self, pair: str, daily_candles: DataFrame) -> Optional[float]:
@@ -139,9 +153,8 @@ class RangeStabilityFilter(IPairList):
if (pct_change := self._pair_cache.get(pair, None)) is not None:
return pct_change
if daily_candles is not None and not daily_candles.empty:
highest_high = daily_candles['high'].max()
lowest_low = daily_candles['low'].min()
highest_high = daily_candles["high"].max()
lowest_low = daily_candles["low"].min()
pct_change = ((highest_high - lowest_low) / lowest_low) if lowest_low > 0 else 0
self._pair_cache[pair] = pct_change
return pct_change
@@ -158,10 +171,12 @@ class RangeStabilityFilter(IPairList):
result = True
if pct_change < self._min_rate_of_change:
self.log_once(f"Removed {pair} from whitelist, because rate of change "
f"over {self._days} {plural(self._days, 'day')} is {pct_change:.3f}, "
f"which is below the threshold of {self._min_rate_of_change}.",
logger.info)
self.log_once(
f"Removed {pair} from whitelist, because rate of change "
f"over {self._days} {plural(self._days, 'day')} is {pct_change:.3f}, "
f"which is below the threshold of {self._min_rate_of_change}.",
logger.info,
)
result = False
if self._max_rate_of_change:
if pct_change > self._max_rate_of_change:
@@ -169,6 +184,7 @@ class RangeStabilityFilter(IPairList):
f"Removed {pair} from whitelist, because rate of change "
f"over {self._days} {plural(self._days, 'day')} is {pct_change:.3f}, "
f"which is above the threshold of {self._max_rate_of_change}.",
logger.info)
logger.info,
)
result = False
return result

View File

@@ -1,6 +1,7 @@
"""
PairList manager class
"""
import logging
from functools import partial
from typing import Dict, List, Optional
@@ -22,24 +23,24 @@ logger = logging.getLogger(__name__)
class PairListManager(LoggingMixin):
def __init__(
self, exchange, config: Config, dataprovider: Optional[DataProvider] = None) -> None:
self, exchange, config: Config, dataprovider: Optional[DataProvider] = None
) -> None:
self._exchange = exchange
self._config = config
self._whitelist = self._config['exchange'].get('pair_whitelist')
self._blacklist = self._config['exchange'].get('pair_blacklist', [])
self._whitelist = self._config["exchange"].get("pair_whitelist")
self._blacklist = self._config["exchange"].get("pair_blacklist", [])
self._pairlist_handlers: List[IPairList] = []
self._tickers_needed = False
self._dataprovider: Optional[DataProvider] = dataprovider
for pairlist_handler_config in self._config.get('pairlists', []):
for pairlist_handler_config in self._config.get("pairlists", []):
pairlist_handler = PairListResolver.load_pairlist(
pairlist_handler_config['method'],
pairlist_handler_config["method"],
exchange=exchange,
pairlistmanager=self,
config=config,
pairlistconfig=pairlist_handler_config,
pairlist_pos=len(self._pairlist_handlers)
pairlist_pos=len(self._pairlist_handlers),
)
self._tickers_needed |= pairlist_handler.needstickers
self._pairlist_handlers.append(pairlist_handler)
@@ -47,7 +48,7 @@ class PairListManager(LoggingMixin):
if not self._pairlist_handlers:
raise OperationalException("No Pairlist Handlers defined")
if self._tickers_needed and not self._exchange.exchange_has('fetchTickers'):
if self._tickers_needed and not self._exchange.exchange_has("fetchTickers"):
invalid = ". ".join([p.name for p in self._pairlist_handlers if p.needstickers])
raise OperationalException(
@@ -56,7 +57,7 @@ class PairListManager(LoggingMixin):
f"{invalid}."
)
refresh_period = config.get('pairlist_refresh_period', 3600)
refresh_period = config.get("pairlist_refresh_period", 3600)
LoggingMixin.__init__(self, logger, refresh_period)
@property
@@ -135,8 +136,9 @@ class PairListManager(LoggingMixin):
pairlist.remove(pair)
return pairlist
def verify_whitelist(self, pairlist: List[str], logmethod,
keep_invalid: bool = False) -> List[str]:
def verify_whitelist(
self, pairlist: List[str], logmethod, keep_invalid: bool = False
) -> List[str]:
"""
Verify and remove items from pairlist - returning a filtered pairlist.
Logs a warning or info depending on `aswarning`.
@@ -155,14 +157,16 @@ class PairListManager(LoggingMixin):
return whitelist
def create_pair_list(
self, pairs: List[str], timeframe: Optional[str] = None) -> ListPairsWithTimeframes:
self, pairs: List[str], timeframe: Optional[str] = None
) -> ListPairsWithTimeframes:
"""
Create list of pair tuples with (pair, timeframe)
"""
return [
(
pair,
timeframe or self._config['timeframe'],
self._config.get('candle_type_def', CandleType.SPOT)
) for pair in pairs
timeframe or self._config["timeframe"],
self._config.get("candle_type_def", CandleType.SPOT),
)
for pair in pairs
]