chore: update rpc to modern typing syntax

This commit is contained in:
Matthias
2024-10-04 07:06:27 +02:00
parent b8bbf3b69e
commit 1d4658e978
20 changed files with 163 additions and 161 deletions

View File

@@ -1,7 +1,7 @@
import logging
import secrets
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Union
from typing import Any, Union
import jwt
from fastapi import APIRouter, Depends, HTTPException, Query, WebSocket, status
@@ -56,7 +56,7 @@ def get_user_from_token(token, secret_key: str, token_type: str = "access") -> s
async def validate_ws_token(
ws: WebSocket,
ws_token: Union[str, None] = Query(default=None, alias="token"),
api_config: Dict[str, Any] = Depends(get_api_config),
api_config: dict[str, Any] = Depends(get_api_config),
):
secret_ws_token = api_config.get("ws_token", None)
secret_jwt_key = api_config.get("jwt_secret_key", "super-secret")

View File

@@ -1,6 +1,5 @@
import logging
from copy import deepcopy
from typing import List
from fastapi import APIRouter, BackgroundTasks, Depends
from fastapi.exceptions import HTTPException
@@ -27,7 +26,7 @@ logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/background", response_model=List[BackgroundTaskStatus], tags=["webserver"])
@router.get("/background", response_model=list[BackgroundTaskStatus], tags=["webserver"])
def background_job_list():
return [
{

View File

@@ -3,7 +3,7 @@ import logging
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
from typing import Any
from fastapi import APIRouter, BackgroundTasks, Depends
from fastapi.exceptions import HTTPException
@@ -260,7 +260,7 @@ def api_backtest_abort():
@router.get(
"/backtest/history", response_model=List[BacktestHistoryEntry], tags=["webserver", "backtest"]
"/backtest/history", response_model=list[BacktestHistoryEntry], tags=["webserver", "backtest"]
)
def api_backtest_history(config=Depends(get_config)):
# Get backtest result history, read from metadata files
@@ -275,7 +275,7 @@ def api_backtest_history_result(filename: str, strategy: str, config=Depends(get
bt_results_base: Path = config["user_data_dir"] / "backtest_results"
fn = (bt_results_base / filename).with_suffix(".json")
results: Dict[str, Any] = {
results: dict[str, Any] = {
"metadata": {},
"strategy": {},
"strategy_comparison": [],
@@ -295,7 +295,7 @@ def api_backtest_history_result(filename: str, strategy: str, config=Depends(get
@router.delete(
"/backtest/history/{file}",
response_model=List[BacktestHistoryEntry],
response_model=list[BacktestHistoryEntry],
tags=["webserver", "backtest"],
)
def api_delete_backtest_history_entry(file: str, config=Depends(get_config)):
@@ -312,7 +312,7 @@ def api_delete_backtest_history_entry(file: str, config=Depends(get_config)):
@router.patch(
"/backtest/history/{file}",
response_model=List[BacktestHistoryEntry],
response_model=list[BacktestHistoryEntry],
tags=["webserver", "backtest"],
)
def api_update_backtest_history_entry(

View File

@@ -1,5 +1,5 @@
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Union
from typing import Any, Optional, Union
from pydantic import AwareDatetime, BaseModel, RootModel, SerializeAsAny
@@ -73,7 +73,7 @@ class Balance(BaseModel):
class Balances(BaseModel):
currencies: List[Balance]
currencies: list[Balance]
total: float
total_bot: float
symbol: str
@@ -172,8 +172,8 @@ class SellReason(BaseModel):
class Stats(BaseModel):
exit_reasons: Dict[str, SellReason]
durations: Dict[str, Optional[float]]
exit_reasons: dict[str, SellReason]
durations: dict[str, Optional[float]]
class DailyWeeklyMonthlyRecord(BaseModel):
@@ -186,7 +186,7 @@ class DailyWeeklyMonthlyRecord(BaseModel):
class DailyWeeklyMonthly(BaseModel):
data: List[DailyWeeklyMonthlyRecord]
data: list[DailyWeeklyMonthlyRecord]
fiat_display_currency: str
stake_currency: str
@@ -221,7 +221,7 @@ class ShowConfig(BaseModel):
available_capital: Optional[float] = None
stake_currency_decimals: int
max_open_trades: IntOrInf
minimal_roi: Dict[str, Any]
minimal_roi: dict[str, Any]
stoploss: Optional[float] = None
stoploss_on_exchange: bool
trailing_stop: Optional[bool] = None
@@ -237,8 +237,8 @@ class ShowConfig(BaseModel):
exchange: str
strategy: Optional[str] = None
force_entry_enable: bool
exit_pricing: Dict[str, Any]
entry_pricing: Dict[str, Any]
exit_pricing: dict[str, Any]
entry_pricing: dict[str, Any]
bot_name: str
state: str
runmode: str
@@ -326,7 +326,7 @@ class TradeSchema(BaseModel):
min_rate: Optional[float] = None
max_rate: Optional[float] = None
has_open_orders: bool
orders: List[OrderSchema]
orders: list[OrderSchema]
leverage: Optional[float] = None
interest_rate: Optional[float] = None
@@ -352,7 +352,7 @@ class OpenTradeSchema(TradeSchema):
class TradeResponse(BaseModel):
trades: List[TradeSchema]
trades: list[TradeSchema]
trades_count: int
offset: int
total_trades: int
@@ -375,7 +375,7 @@ class LockModel(BaseModel):
class Locks(BaseModel):
lock_count: int
locks: List[LockModel]
locks: list[LockModel]
class LocksPayload(BaseModel):
@@ -392,7 +392,7 @@ class DeleteLockRequest(BaseModel):
class Logs(BaseModel):
log_count: int
logs: List[List]
logs: list[list]
class ForceEnterPayload(BaseModel):
@@ -412,21 +412,21 @@ class ForceExitPayload(BaseModel):
class BlacklistPayload(BaseModel):
blacklist: List[str]
blacklist: list[str]
class BlacklistResponse(BaseModel):
blacklist: List[str]
blacklist_expanded: List[str]
errors: Dict
blacklist: list[str]
blacklist_expanded: list[str]
errors: dict
length: int
method: List[str]
method: list[str]
class WhitelistResponse(BaseModel):
whitelist: List[str]
whitelist: list[str]
length: int
method: List[str]
method: list[str]
class WhitelistEvaluateResponse(BackgroundTaskResult):
@@ -441,40 +441,40 @@ class DeleteTrade(BaseModel):
class PlotConfig_(BaseModel):
main_plot: Dict[str, Any]
subplots: Dict[str, Any]
main_plot: dict[str, Any]
subplots: dict[str, Any]
PlotConfig = RootModel[Union[PlotConfig_, Dict]]
PlotConfig = RootModel[Union[PlotConfig_, dict]]
class StrategyListResponse(BaseModel):
strategies: List[str]
strategies: list[str]
class ExchangeListResponse(BaseModel):
exchanges: List[ValidExchangesType]
exchanges: list[ValidExchangesType]
class PairListResponse(BaseModel):
name: str
description: str
is_pairlist_generator: bool
params: Dict[str, Any]
params: dict[str, Any]
class PairListsResponse(BaseModel):
pairlists: List[PairListResponse]
pairlists: list[PairListResponse]
class PairListsPayload(ExchangeModePayloadMixin, BaseModel):
pairlists: List[Dict[str, Any]]
blacklist: List[str]
pairlists: list[dict[str, Any]]
blacklist: list[str]
stake_currency: str
class FreqAIModelListResponse(BaseModel):
freqaimodels: List[str]
freqaimodels: list[str]
class StrategyResponse(BaseModel):
@@ -485,15 +485,15 @@ class StrategyResponse(BaseModel):
class AvailablePairs(BaseModel):
length: int
pairs: List[str]
pair_interval: List[List[str]]
pairs: list[str]
pair_interval: list[list[str]]
class PairCandlesRequest(BaseModel):
pair: str
timeframe: str
limit: Optional[int] = None
columns: Optional[List[str]] = None
columns: Optional[list[str]] = None
class PairHistoryRequest(PairCandlesRequest):
@@ -507,9 +507,9 @@ class PairHistory(BaseModel):
pair: str
timeframe: str
timeframe_ms: int
columns: List[str]
all_columns: List[str] = []
data: SerializeAsAny[List[Any]]
columns: list[str]
all_columns: list[str] = []
data: SerializeAsAny[list[Any]]
length: int
buy_signals: int
sell_signals: int
@@ -551,7 +551,7 @@ class BacktestResponse(BaseModel):
progress: float
trade_count: Optional[float] = None
# TODO: Properly type backtestresult...
backtest_result: Optional[Dict[str, Any]] = None
backtest_result: Optional[dict[str, Any]] = None
# TODO: This is a copy of BacktestHistoryEntryType
@@ -573,13 +573,13 @@ class BacktestMetadataUpdate(BaseModel):
class BacktestMarketChange(BaseModel):
columns: List[str]
columns: list[str]
length: int
data: List[List[Any]]
data: list[list[Any]]
class SysInfo(BaseModel):
cpu_pct: List[float]
cpu_pct: list[float]
ram_pct: float

View File

@@ -1,6 +1,6 @@
import logging
from copy import deepcopy
from typing import List, Optional
from typing import Optional
from fastapi import APIRouter, Depends, Query
from fastapi.exceptions import HTTPException
@@ -116,22 +116,22 @@ def count(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_count()
@router.get("/entries", response_model=List[Entry], tags=["info"])
@router.get("/entries", response_model=list[Entry], tags=["info"])
def entries(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_enter_tag_performance(pair)
@router.get("/exits", response_model=List[Exit], tags=["info"])
@router.get("/exits", response_model=list[Exit], tags=["info"])
def exits(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_exit_reason_performance(pair)
@router.get("/mix_tags", response_model=List[MixTag], tags=["info"])
@router.get("/mix_tags", response_model=list[MixTag], tags=["info"])
def mix_tags(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_mix_tag_performance(pair)
@router.get("/performance", response_model=List[PerformanceEntry], tags=["info"])
@router.get("/performance", response_model=list[PerformanceEntry], tags=["info"])
def performance(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_performance()
@@ -167,7 +167,7 @@ def monthly(timescale: int = 3, rpc: RPC = Depends(get_rpc), config=Depends(get_
)
@router.get("/status", response_model=List[OpenTradeSchema], tags=["info"])
@router.get("/status", response_model=list[OpenTradeSchema], tags=["info"])
def status(rpc: RPC = Depends(get_rpc)):
try:
return rpc._rpc_trade_status()
@@ -268,7 +268,7 @@ def blacklist_post(payload: BlacklistPayload, rpc: RPC = Depends(get_rpc)):
@router.delete("/blacklist", response_model=BlacklistResponse, tags=["info", "pairlist"])
def blacklist_delete(pairs_to_delete: List[str] = Query([]), rpc: RPC = Depends(get_rpc)):
def blacklist_delete(pairs_to_delete: list[str] = Query([]), rpc: RPC = Depends(get_rpc)):
"""Provide a list of pairs to delete from the blacklist"""
return rpc._rpc_blacklist_delete(pairs_to_delete)
@@ -295,7 +295,7 @@ def delete_lock_pair(payload: DeleteLockRequest, rpc: RPC = Depends(get_rpc)):
@router.post("/locks", response_model=Locks, tags=["info", "locks"])
def add_locks(payload: List[LocksPayload], rpc: RPC = Depends(get_rpc)):
def add_locks(payload: list[LocksPayload], rpc: RPC = Depends(get_rpc)):
for lock in payload:
rpc._rpc_add_lock(lock.pair, lock.until, lock.reason, lock.side)
return rpc._rpc_locks()

View File

@@ -1,6 +1,6 @@
import logging
import time
from typing import Any, Dict
from typing import Any
from fastapi import APIRouter, Depends
from fastapi.websockets import WebSocket
@@ -61,7 +61,7 @@ async def channel_broadcaster(channel: WebSocketChannel, message_stream: Message
await channel.send(message, use_timeout=True)
async def _process_consumer_request(request: Dict[str, Any], channel: WebSocketChannel, rpc: RPC):
async def _process_consumer_request(request: dict[str, Any], channel: WebSocketChannel, rpc: RPC):
"""
Validate and handle a request from a websocket consumer
"""

View File

@@ -1,4 +1,5 @@
from typing import Any, AsyncIterator, Dict, Optional
from collections.abc import AsyncIterator
from typing import Any, Optional
from uuid import uuid4
from fastapi import Depends, HTTPException
@@ -35,11 +36,11 @@ async def get_rpc() -> Optional[AsyncIterator[RPC]]:
raise RPCException("Bot is not in the correct state")
def get_config() -> Dict[str, Any]:
def get_config() -> dict[str, Any]:
return ApiServer._config
def get_api_config() -> Dict[str, Any]:
def get_api_config() -> dict[str, Any]:
return ApiServer._config["api_server"]

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Literal, Optional, TypedDict
from typing import Any, Literal, Optional, TypedDict
from uuid import uuid4
from freqtrade.exchange.exchange import Exchange
@@ -15,7 +15,7 @@ class JobsContainer(TypedDict):
class ApiBG:
# Backtesting type: Backtesting
bt: Dict[str, Any] = {
bt: dict[str, Any] = {
"bt": None,
"data": None,
"timerange": None,
@@ -24,12 +24,12 @@ class ApiBG:
}
bgtask_running: bool = False
# Exchange - only available in webserver mode.
exchanges: Dict[str, Exchange] = {}
exchanges: dict[str, Exchange] = {}
# Generic background jobs
# TODO: Change this to TTLCache
jobs: Dict[str, JobsContainer] = {}
jobs: dict[str, JobsContainer] = {}
# Pairlist evaluate things
pairlist_running: bool = False

View File

@@ -2,8 +2,9 @@ import asyncio
import logging
import time
from collections import deque
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Deque, Dict, List, Optional, Type, Union
from typing import Any, Optional, Union
from uuid import uuid4
from fastapi import WebSocketDisconnect
@@ -30,7 +31,7 @@ class WebSocketChannel:
self,
websocket: WebSocketType,
channel_id: Optional[str] = None,
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer,
serializer_cls: type[WebSocketSerializer] = HybridJSONWebSocketSerializer,
send_throttle: float = 0.01,
):
self.channel_id = channel_id if channel_id else uuid4().hex[:8]
@@ -39,16 +40,16 @@ class WebSocketChannel:
# Internal event to signify a closed websocket
self._closed = asyncio.Event()
# The async tasks created for the channel
self._channel_tasks: List[asyncio.Task] = []
self._channel_tasks: list[asyncio.Task] = []
# Deque for average send times
self._send_times: Deque[float] = deque([], maxlen=10)
self._send_times: deque[float] = deque([], maxlen=10)
# High limit defaults to 3 to start
self._send_high_limit = 3
self._send_throttle = send_throttle
# The subscribed message types
self._subscriptions: List[str] = []
self._subscriptions: list[str] = []
# Wrap the WebSocket in the Serializing class
self._wrapped_ws = serializer_cls(self._websocket)
@@ -80,7 +81,7 @@ class WebSocketChannel:
self._send_high_limit = min(max(self.avg_send_time * 2, 1), 3)
async def send(
self, message: Union[WSMessageSchemaType, Dict[str, Any]], use_timeout: bool = False
self, message: Union[WSMessageSchemaType, dict[str, Any]], use_timeout: bool = False
):
"""
Send a message on the wrapped websocket. If the sending
@@ -153,7 +154,7 @@ class WebSocketChannel:
"""
return self._closed.is_set()
def set_subscriptions(self, subscriptions: List[str]) -> None:
def set_subscriptions(self, subscriptions: list[str]) -> None:
"""
Set which subscriptions this channel is subscribed to

View File

@@ -1,4 +1,4 @@
from typing import Any, Tuple, Union
from typing import Any, Union
from fastapi import WebSocket as FastAPIWebSocket
from websockets.client import WebSocketClientProtocol as WebSocket
@@ -20,7 +20,7 @@ class WebSocketProxy:
return self._websocket
@property
def remote_addr(self) -> Tuple[Any, ...]:
def remote_addr(self) -> tuple[Any, ...]:
if isinstance(self._websocket, WebSocket):
return self._websocket.remote_address
elif isinstance(self._websocket, FastAPIWebSocket):

View File

@@ -1,6 +1,6 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Union
from typing import Any, Union
import orjson
import rapidjson
@@ -26,7 +26,7 @@ class WebSocketSerializer(ABC):
def _deserialize(self, data):
raise NotImplementedError()
async def send(self, data: Union[WSMessageSchemaType, Dict[str, Any]]):
async def send(self, data: Union[WSMessageSchemaType, dict[str, Any]]):
await self._websocket.send(self._serialize(data))
async def recv(self) -> bytes:

View File

@@ -1,8 +1,8 @@
from typing import Any, Dict, TypeVar
from typing import Any, TypeVar
from fastapi import WebSocket as FastAPIWebSocket
from websockets.client import WebSocketClientProtocol as WebSocket
WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket)
MessageType = Dict[str, Any]
MessageType = dict[str, Any]

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, TypedDict
from typing import Any, Optional, TypedDict
from pandas import DataFrame
from pydantic import BaseModel, ConfigDict
@@ -20,7 +20,7 @@ class WSRequestSchema(BaseArbitraryModel):
class WSMessageSchemaType(TypedDict):
# Type for typing to avoid doing pydantic typechecks.
type: RPCMessageType
data: Optional[Dict[str, Any]]
data: Optional[dict[str, Any]]
class WSMessageSchema(BaseArbitraryModel):
@@ -34,7 +34,7 @@ class WSMessageSchema(BaseArbitraryModel):
class WSSubscribeRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.SUBSCRIBE
data: List[RPCMessageType]
data: list[RPCMessageType]
class WSWhitelistRequest(WSRequestSchema):
@@ -44,7 +44,7 @@ class WSWhitelistRequest(WSRequestSchema):
class WSAnalyzedDFRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.ANALYZED_DF
data: Dict[str, Any] = {"limit": 1500, "pair": None}
data: dict[str, Any] = {"limit": 1500, "pair": None}
# ------------------------------ MESSAGE SCHEMAS ----------------------------
@@ -52,7 +52,7 @@ class WSAnalyzedDFRequest(WSRequestSchema):
class WSWhitelistMessage(WSMessageSchema):
type: RPCMessageType = RPCMessageType.WHITELIST
data: List[str]
data: list[str]
class WSAnalyzedDFMessage(WSMessageSchema):

View File

@@ -9,7 +9,7 @@ import asyncio
import logging
import socket
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union
from typing import TYPE_CHECKING, Any, Callable, TypedDict, Union
import websockets
from pydantic import ValidationError
@@ -56,7 +56,7 @@ class ExternalMessageConsumer:
other freqtrade bot's
"""
def __init__(self, config: Dict[str, Any], dataprovider: DataProvider):
def __init__(self, config: dict[str, Any], dataprovider: DataProvider):
self._config = config
self._dp = dataprovider
@@ -69,7 +69,7 @@ class ExternalMessageConsumer:
self._emc_config = self._config.get("external_message_consumer", {})
self.enabled = self._emc_config.get("enabled", False)
self.producers: List[Producer] = self._emc_config.get("producers", [])
self.producers: list[Producer] = self._emc_config.get("producers", [])
self.wait_timeout = self._emc_config.get("wait_timeout", 30) # in seconds
self.ping_timeout = self._emc_config.get("ping_timeout", 10) # in seconds
@@ -88,19 +88,19 @@ class ExternalMessageConsumer:
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
# Allow setting data for each initial request
self._initial_requests: List[WSRequestSchema] = [
self._initial_requests: list[WSRequestSchema] = [
WSSubscribeRequest(data=self.topics),
WSWhitelistRequest(),
WSAnalyzedDFRequest(),
]
# Specify which function to use for which RPCMessageType
self._message_handlers: Dict[str, Callable[[str, WSMessageSchema], None]] = {
self._message_handlers: dict[str, Callable[[str, WSMessageSchema], None]] = {
RPCMessageType.WHITELIST: self._consume_whitelist_message,
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
self._channel_streams: Dict[str, MessageStream] = {}
self._channel_streams: dict[str, MessageStream] = {}
self.start()
@@ -287,7 +287,7 @@ class ExternalMessageConsumer:
raise
def send_producer_request(
self, producer_name: str, request: Union[WSRequestSchema, Dict[str, Any]]
self, producer_name: str, request: Union[WSRequestSchema, dict[str, Any]]
):
"""
Publish a message to the producer's message stream to be
@@ -302,7 +302,7 @@ class ExternalMessageConsumer:
if channel_stream := self._channel_streams.get(producer_name):
channel_stream.publish(request)
def handle_producer_message(self, producer: Producer, message: Dict[str, Any]):
def handle_producer_message(self, producer: Producer, message: dict[str, Any]):
"""
Handles external messages from a Producer
"""

View File

@@ -5,7 +5,7 @@ e.g BTC to USD
import logging
from datetime import datetime
from typing import Any, Dict, List
from typing import Any
from cachetools import TTLCache
from requests.exceptions import RequestException
@@ -41,7 +41,7 @@ class CryptoToFiatConverter(LoggingMixin):
__instance = None
_coinlistings: List[Dict] = []
_coinlistings: list[dict] = []
_backoff: float = 0.0
def __new__(cls, *args: Any, **kwargs: Any) -> Any:

View File

@@ -4,9 +4,10 @@ This module contains class to define a RPC communications
import logging
from abc import abstractmethod
from collections.abc import Generator, Sequence
from datetime import date, datetime, timedelta, timezone
from math import isnan
from typing import Any, Dict, Generator, List, Optional, Sequence, Tuple, Union
from typing import Any, Optional, Union
import psutil
from dateutil.relativedelta import relativedelta
@@ -112,7 +113,7 @@ class RPC:
@staticmethod
def _rpc_show_config(
config, botstate: Union[State, str], strategy_version: Optional[str] = None
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Return a dict of config options.
Explicitly does NOT return the full config to avoid leakage of sensitive
@@ -167,7 +168,7 @@ class RPC:
}
return val
def _rpc_trade_status(self, trade_ids: Optional[List[int]] = None) -> List[Dict[str, Any]]:
def _rpc_trade_status(self, trade_ids: Optional[list[int]] = None) -> list[dict[str, Any]]:
"""
Below follows the RPC backend it is prefixed with rpc_ to raise awareness that it is
a remotely exposed function
@@ -270,8 +271,8 @@ class RPC:
def _rpc_status_table(
self, stake_currency: str, fiat_display_currency: str
) -> Tuple[List, List, float]:
trades: List[Trade] = Trade.get_open_trades()
) -> tuple[list, list, float]:
trades: list[Trade] = Trade.get_open_trades()
nonspot = self._config.get("trading_mode", TradingMode.SPOT) != TradingMode.SPOT
if not trades:
raise RPCException("no active trade")
@@ -354,7 +355,7 @@ class RPC:
stake_currency: str,
fiat_display_currency: str,
timeunit: str = "days",
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
:param timeunit: Valid entries are 'days', 'weeks', 'months'
"""
@@ -373,7 +374,7 @@ class RPC:
if not (isinstance(timescale, int) and timescale > 0):
raise RPCException("timescale must be an integer greater than 0")
profit_units: Dict[date, Dict] = {}
profit_units: dict[date, dict] = {}
daily_stake = self._freqtrade.wallets.get_total_stake_amount()
for day in range(0, timescale):
@@ -424,7 +425,7 @@ class RPC:
"data": data,
}
def _rpc_trade_history(self, limit: int, offset: int = 0, order_by_id: bool = False) -> Dict:
def _rpc_trade_history(self, limit: int, offset: int = 0, order_by_id: bool = False) -> dict:
"""Returns the X last trades"""
order_by: Any = Trade.id if order_by_id else Trade.close_date.desc()
if limit:
@@ -451,7 +452,7 @@ class RPC:
"total_trades": total_trades,
}
def _rpc_stats(self) -> Dict[str, Any]:
def _rpc_stats(self) -> dict[str, Any]:
"""
Generate generic stats for trades in database
"""
@@ -466,7 +467,7 @@ class RPC:
trades = Trade.get_trades([Trade.is_open.is_(False)], include_orders=False)
# Duration
dur: Dict[str, List[float]] = {"wins": [], "draws": [], "losses": []}
dur: dict[str, list[float]] = {"wins": [], "draws": [], "losses": []}
# Exit reason
exit_reasons = {}
for trade in trades:
@@ -487,7 +488,7 @@ class RPC:
def _rpc_trade_statistics(
self, stake_currency: str, fiat_display_currency: str, start_date: Optional[datetime] = None
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Returns cumulative profit statistics"""
start_date = datetime.fromtimestamp(0) if start_date is None else start_date
@@ -670,7 +671,7 @@ class RPC:
def __balance_get_est_stake(
self, coin: str, stake_currency: str, amount: float, balance: Wallet, tickers
) -> Tuple[float, float]:
) -> tuple[float, float]:
est_stake = 0.0
est_bot_stake = 0.0
if coin == stake_currency:
@@ -690,9 +691,9 @@ class RPC:
return est_stake, est_bot_stake
def _rpc_balance(self, stake_currency: str, fiat_display_currency: str) -> Dict:
def _rpc_balance(self, stake_currency: str, fiat_display_currency: str) -> dict:
"""Returns current account balance per crypto"""
currencies: List[Dict] = []
currencies: list[dict] = []
total = 0.0
total_bot = 0.0
try:
@@ -700,8 +701,8 @@ class RPC:
except ExchangeError:
raise RPCException("Error getting current tickers.")
open_trades: List[Trade] = Trade.get_open_trades()
open_assets: Dict[str, Trade] = {t.safe_base_currency: t for t in open_trades}
open_trades: list[Trade] = Trade.get_open_trades()
open_assets: dict[str, Trade] = {t.safe_base_currency: t for t in open_trades}
self._freqtrade.wallets.update(require_update=False)
starting_capital = self._freqtrade.wallets.get_starting_balance()
starting_cap_fiat = (
@@ -805,7 +806,7 @@ class RPC:
"note": "Simulated balances" if self._freqtrade.config["dry_run"] else "",
}
def _rpc_start(self) -> Dict[str, str]:
def _rpc_start(self) -> dict[str, str]:
"""Handler for start"""
if self._freqtrade.state == State.RUNNING:
return {"status": "already running"}
@@ -813,7 +814,7 @@ class RPC:
self._freqtrade.state = State.RUNNING
return {"status": "starting trader ..."}
def _rpc_stop(self) -> Dict[str, str]:
def _rpc_stop(self) -> dict[str, str]:
"""Handler for stop"""
if self._freqtrade.state == State.RUNNING:
self._freqtrade.state = State.STOPPED
@@ -821,12 +822,12 @@ class RPC:
return {"status": "already stopped"}
def _rpc_reload_config(self) -> Dict[str, str]:
def _rpc_reload_config(self) -> dict[str, str]:
"""Handler for reload_config."""
self._freqtrade.state = State.RELOAD_CONFIG
return {"status": "Reloading config ..."}
def _rpc_stopentry(self) -> Dict[str, str]:
def _rpc_stopentry(self) -> dict[str, str]:
"""
Handler to stop buying, but handle open trades gracefully.
"""
@@ -837,7 +838,7 @@ class RPC:
return {"status": "No more entries will occur from now. Run /reload_config to reset."}
def _rpc_reload_trade_from_exchange(self, trade_id: int) -> Dict[str, str]:
def _rpc_reload_trade_from_exchange(self, trade_id: int) -> dict[str, str]:
"""
Handler for reload_trade_from_exchange.
Reloads a trade from it's orders, should manual interaction have happened.
@@ -901,7 +902,7 @@ class RPC:
def _rpc_force_exit(
self, trade_id: str, ordertype: Optional[str] = None, *, amount: Optional[float] = None
) -> Dict[str, str]:
) -> dict[str, str]:
"""
Handler for forceexit <id>.
Sells the given trade at current price
@@ -1051,7 +1052,7 @@ class RPC:
)
Trade.commit()
def _rpc_delete(self, trade_id: int) -> Dict[str, Union[str, int]]:
def _rpc_delete(self, trade_id: int) -> dict[str, Union[str, int]]:
"""
Handler for delete <id>.
Delete the given trade and close eventually existing open orders.
@@ -1092,7 +1093,7 @@ class RPC:
"cancel_order_count": c_count,
}
def _rpc_list_custom_data(self, trade_id: int, key: Optional[str]) -> List[Dict[str, Any]]:
def _rpc_list_custom_data(self, trade_id: int, key: Optional[str]) -> list[dict[str, Any]]:
# Query for trade
trade = Trade.get_trades(trade_filter=[Trade.id == trade_id]).first()
if trade is None:
@@ -1118,7 +1119,7 @@ class RPC:
for data_entry in custom_data
]
def _rpc_performance(self) -> List[Dict[str, Any]]:
def _rpc_performance(self) -> list[dict[str, Any]]:
"""
Handler for performance.
Shows a performance statistic from finished trades
@@ -1127,21 +1128,21 @@ class RPC:
return pair_rates
def _rpc_enter_tag_performance(self, pair: Optional[str]) -> List[Dict[str, Any]]:
def _rpc_enter_tag_performance(self, pair: Optional[str]) -> list[dict[str, Any]]:
"""
Handler for buy tag performance.
Shows a performance statistic from finished trades
"""
return Trade.get_enter_tag_performance(pair)
def _rpc_exit_reason_performance(self, pair: Optional[str]) -> List[Dict[str, Any]]:
def _rpc_exit_reason_performance(self, pair: Optional[str]) -> list[dict[str, Any]]:
"""
Handler for exit reason performance.
Shows a performance statistic from finished trades
"""
return Trade.get_exit_reason_performance(pair)
def _rpc_mix_tag_performance(self, pair: Optional[str]) -> List[Dict[str, Any]]:
def _rpc_mix_tag_performance(self, pair: Optional[str]) -> list[dict[str, Any]]:
"""
Handler for mix tag (enter_tag + exit_reason) performance.
Shows a performance statistic from finished trades
@@ -1150,7 +1151,7 @@ class RPC:
return mix_tags
def _rpc_count(self) -> Dict[str, float]:
def _rpc_count(self) -> dict[str, float]:
"""Returns the number of trades running"""
if self._freqtrade.state != State.RUNNING:
raise RPCException("trader is not running")
@@ -1166,7 +1167,7 @@ class RPC:
"total_stake": sum((trade.open_rate * trade.amount) for trade in trades),
}
def _rpc_locks(self) -> Dict[str, Any]:
def _rpc_locks(self) -> dict[str, Any]:
"""Returns the current locks"""
locks = PairLocks.get_pair_locks(None)
@@ -1174,7 +1175,7 @@ class RPC:
def _rpc_delete_lock(
self, lockid: Optional[int] = None, pair: Optional[str] = None
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Delete specific lock(s)"""
locks: Sequence[PairLock] = []
@@ -1202,7 +1203,7 @@ class RPC:
)
return lock
def _rpc_whitelist(self) -> Dict:
def _rpc_whitelist(self) -> dict:
"""Returns the currently active whitelist"""
res = {
"method": self._freqtrade.pairlists.name_list,
@@ -1211,7 +1212,7 @@ class RPC:
}
return res
def _rpc_blacklist_delete(self, delete: List[str]) -> Dict:
def _rpc_blacklist_delete(self, delete: list[str]) -> dict:
"""Removes pairs from currently active blacklist"""
errors = {}
for pair in delete:
@@ -1223,7 +1224,7 @@ class RPC:
resp["errors"] = errors
return resp
def _rpc_blacklist(self, add: Optional[List[str]] = None) -> Dict:
def _rpc_blacklist(self, add: Optional[list[str]] = None) -> dict:
"""Returns the currently active blacklist"""
errors = {}
if add:
@@ -1248,7 +1249,7 @@ class RPC:
return res
@staticmethod
def _rpc_get_logs(limit: Optional[int]) -> Dict[str, Any]:
def _rpc_get_logs(limit: Optional[int]) -> dict[str, Any]:
"""Returns the last X logs"""
if limit:
buffer = bufferHandler.buffer[-limit:]
@@ -1272,7 +1273,7 @@ class RPC:
return {"log_count": len(records), "logs": records}
def _rpc_edge(self) -> List[Dict[str, Any]]:
def _rpc_edge(self) -> list[dict[str, Any]]:
"""Returns information related to Edge"""
if not self._freqtrade.edge:
raise RPCException("Edge is not enabled.")
@@ -1285,8 +1286,8 @@ class RPC:
timeframe: str,
dataframe: DataFrame,
last_analyzed: datetime,
selected_cols: Optional[List[str]],
) -> Dict[str, Any]:
selected_cols: Optional[list[str]],
) -> dict[str, Any]:
has_content = len(dataframe) != 0
dataframe_columns = list(dataframe.columns)
signals = {
@@ -1354,8 +1355,8 @@ class RPC:
return res
def _rpc_analysed_dataframe(
self, pair: str, timeframe: str, limit: Optional[int], selected_cols: Optional[List[str]]
) -> Dict[str, Any]:
self, pair: str, timeframe: str, limit: Optional[int], selected_cols: Optional[list[str]]
) -> dict[str, Any]:
"""Analyzed dataframe in Dict form"""
_data, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit)
@@ -1365,7 +1366,7 @@ class RPC:
def __rpc_analysed_dataframe_raw(
self, pair: str, timeframe: str, limit: Optional[int]
) -> Tuple[DataFrame, datetime]:
) -> tuple[DataFrame, datetime]:
"""
Get the dataframe and last analyze from the dataprovider
@@ -1382,8 +1383,8 @@ class RPC:
return _data, last_analyzed
def _ws_all_analysed_dataframes(
self, pairlist: List[str], limit: Optional[int]
) -> Generator[Dict[str, Any], None, None]:
self, pairlist: list[str], limit: Optional[int]
) -> Generator[dict[str, Any], None, None]:
"""
Get the analysed dataframes of each pair in the pairlist.
If specified, only return the most recent `limit` candles for
@@ -1414,8 +1415,8 @@ class RPC:
@staticmethod
def _rpc_analysed_history_full(
config: Config, pair: str, timeframe: str, exchange, selected_cols: Optional[List[str]]
) -> Dict[str, Any]:
config: Config, pair: str, timeframe: str, exchange, selected_cols: Optional[list[str]]
) -> dict[str, Any]:
timerange_parsed = TimeRange.parse_timerange(config.get("timerange"))
from freqtrade.data.converter import trim_dataframe
@@ -1454,7 +1455,7 @@ class RPC:
selected_cols,
)
def _rpc_plot_config(self) -> Dict[str, Any]:
def _rpc_plot_config(self) -> dict[str, Any]:
if (
self._freqtrade.strategy.plot_config
and "subplots" not in self._freqtrade.strategy.plot_config
@@ -1463,7 +1464,7 @@ class RPC:
return self._freqtrade.strategy.plot_config
@staticmethod
def _rpc_plot_config_with_strategy(config: Config) -> Dict[str, Any]:
def _rpc_plot_config_with_strategy(config: Config) -> dict[str, Any]:
from freqtrade.resolvers.strategy_resolver import StrategyResolver
strategy = StrategyResolver.load_strategy(config)
@@ -1475,15 +1476,15 @@ class RPC:
return strategy.plot_config
@staticmethod
def _rpc_sysinfo() -> Dict[str, Any]:
def _rpc_sysinfo() -> dict[str, Any]:
return {
"cpu_pct": psutil.cpu_percent(interval=1, percpu=True),
"ram_pct": psutil.virtual_memory().percent,
}
def health(self) -> Dict[str, Optional[Union[str, int]]]:
def health(self) -> dict[str, Optional[Union[str, int]]]:
last_p = self._freqtrade.last_process
res: Dict[str, Union[None, str, int]] = {
res: dict[str, Union[None, str, int]] = {
"last_process": None,
"last_process_loc": None,
"last_process_ts": None,

View File

@@ -4,7 +4,6 @@ This module contains class to manage RPC communications (Telegram, API, ...)
import logging
from collections import deque
from typing import List
from freqtrade.constants import Config
from freqtrade.enums import NO_ECHO_MESSAGES, RPCMessageType
@@ -22,7 +21,7 @@ class RPCManager:
def __init__(self, freqtrade) -> None:
"""Initializes all enabled rpc modules"""
self.registered_modules: List[RPCHandler] = []
self.registered_modules: list[RPCHandler] = []
self._rpc = RPC(freqtrade)
config = freqtrade.config
# Enable telegram

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, List, Literal, Optional, TypedDict, Union
from typing import Any, Literal, Optional, TypedDict, Union
from freqtrade.constants import PairWithTimeframe
from freqtrade.enums import RPCMessageType
@@ -43,7 +43,7 @@ class RPCProtectionMsg(RPCSendMsgBase):
class RPCWhitelistMsg(RPCSendMsgBase):
type: Literal[RPCMessageType.WHITELIST]
data: List[str]
data: list[str]
class __RPCEntryExitMsgBase(RPCSendMsgBase):

View File

@@ -8,6 +8,7 @@ import asyncio
import json
import logging
import re
from collections.abc import Coroutine
from copy import deepcopy
from dataclasses import dataclass
from datetime import date, datetime, timedelta
@@ -16,7 +17,7 @@ from html import escape
from itertools import chain
from math import isnan
from threading import Thread
from typing import Any, Callable, Coroutine, Dict, List, Literal, Optional, Union
from typing import Any, Callable, Literal, Optional, Union
from tabulate import tabulate
from telegram import (
@@ -145,7 +146,7 @@ class Telegram(RPCHandler):
Validates the keyboard configuration from telegram config
section.
"""
self._keyboard: List[List[Union[str, KeyboardButton]]] = [
self._keyboard: list[list[Union[str, KeyboardButton]]] = [
["/daily", "/profit", "/balance"],
["/status", "/status table", "/performance"],
["/count", "/start", "/stop", "/help"],
@@ -154,7 +155,7 @@ class Telegram(RPCHandler):
# TODO: DRY! - its not good to list all valid cmds here. But otherwise
# this needs refactoring of the whole telegram module (same
# problem in _help()).
valid_keys: List[str] = [
valid_keys: list[str] = [
r"/start$",
r"/stop$",
r"/status$",
@@ -594,16 +595,16 @@ class Telegram(RPCHandler):
else:
return "\N{CROSS MARK}"
def _prepare_order_details(self, filled_orders: List, quote_currency: str, is_open: bool):
def _prepare_order_details(self, filled_orders: list, quote_currency: str, is_open: bool):
"""
Prepare details of trade with entry adjustment enabled
"""
lines_detail: List[str] = []
lines_detail: list[str] = []
if len(filled_orders) > 0:
first_avg = filled_orders[0]["safe_price"]
order_nr = 0
for order in filled_orders:
lines: List[str] = []
lines: list[str] = []
if order["is_open"] is True:
continue
order_nr += 1
@@ -662,7 +663,7 @@ class Telegram(RPCHandler):
lines.extend(lines_detail if lines_detail else "")
await self.__send_order_msg(lines, r)
async def __send_order_msg(self, lines: List[str], r: Dict[str, Any]) -> None:
async def __send_order_msg(self, lines: list[str], r: dict[str, Any]) -> None:
"""
Send status message.
"""
@@ -805,7 +806,7 @@ class Telegram(RPCHandler):
await self.__send_status_msg(lines, r)
async def __send_status_msg(self, lines: List[str], r: Dict[str, Any]) -> None:
async def __send_status_msg(self, lines: list[str], r: dict[str, Any]) -> None:
"""
Send status message.
"""
@@ -1344,8 +1345,8 @@ class Telegram(RPCHandler):
@staticmethod
def _layout_inline_keyboard(
buttons: List[InlineKeyboardButton], cols=3
) -> List[List[InlineKeyboardButton]]:
buttons: list[InlineKeyboardButton], cols=3
) -> list[list[InlineKeyboardButton]]:
return [buttons[i : i + cols] for i in range(0, len(buttons), cols)]
@authorized_only
@@ -1689,7 +1690,7 @@ class Telegram(RPCHandler):
"""
await self.send_blacklist_msg(self._rpc._rpc_blacklist(context.args))
async def send_blacklist_msg(self, blacklist: Dict):
async def send_blacklist_msg(self, blacklist: dict):
errmsgs = []
for _, error in blacklist["errors"].items():
errmsgs.append(f"Error: {error['error_msg']}")
@@ -1998,7 +1999,7 @@ class Telegram(RPCHandler):
msg: str,
parse_mode: str = ParseMode.MARKDOWN,
disable_notification: bool = False,
keyboard: Optional[List[List[InlineKeyboardButton]]] = None,
keyboard: Optional[list[list[InlineKeyboardButton]]] = None,
callback_path: str = "",
reload_able: bool = False,
query: Optional[CallbackQuery] = None,

View File

@@ -4,7 +4,7 @@ This module manages webhook communication
import logging
import time
from typing import Any, Dict, Optional
from typing import Any, Optional
from requests import RequestException, post
@@ -44,7 +44,7 @@ class Webhook(RPCHandler):
"""
pass
def _get_value_dict(self, msg: RPCSendMsg) -> Optional[Dict[str, Any]]:
def _get_value_dict(self, msg: RPCSendMsg) -> Optional[dict[str, Any]]:
whconfig = self._config["webhook"]
if msg["type"].value in whconfig:
# Explicit types should have priority