Merge branch 'develop' into feature_keyval_storage

This commit is contained in:
Matthias
2024-02-07 07:13:41 +01:00
546 changed files with 75515 additions and 29640 deletions

View File

@@ -1,3 +1,2 @@
# flake8: noqa: F401
from .rpc import RPC, RPCException, RPCHandler
from .rpc_manager import RPCManager
from .rpc import RPC, RPCException, RPCHandler # noqa: F401
from .rpc_manager import RPCManager # noqa: F401

View File

@@ -1,2 +1 @@
# flake8: noqa: F401
from .webserver import ApiServer
from .webserver import ApiServer # noqa: F401

View File

@@ -1,8 +1,10 @@
import logging
import secrets
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Union
import jwt
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, Query, WebSocket, status
from fastapi.security import OAuth2PasswordBearer
from fastapi.security.http import HTTPBasic, HTTPBasicCredentials
@@ -10,6 +12,8 @@ from freqtrade.rpc.api_server.api_schemas import AccessAndRefreshToken, AccessTo
from freqtrade.rpc.api_server.deps import get_api_config
logger = logging.getLogger(__name__)
ALGORITHM = "HS256"
router_login = APIRouter()
@@ -25,7 +29,7 @@ httpbasic = HTTPBasic(auto_error=False)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
def get_user_from_token(token, secret_key: str, token_type: str = "access"):
def get_user_from_token(token, secret_key: str, token_type: str = "access") -> str:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
@@ -44,17 +48,54 @@ def get_user_from_token(token, secret_key: str, token_type: str = "access"):
return username
# This should be reimplemented to better realign with the existing tools provided
# by FastAPI regarding API Tokens
# https://github.com/tiangolo/fastapi/blob/master/fastapi/security/api_key.py
async def validate_ws_token(
ws: WebSocket,
ws_token: Union[str, None] = Query(default=None, alias="token"),
api_config: Dict[str, Any] = Depends(get_api_config)
):
secret_ws_token = api_config.get('ws_token', None)
secret_jwt_key = api_config.get('jwt_secret_key', 'super-secret')
# Check if ws_token is/in secret_ws_token
if ws_token and secret_ws_token:
is_valid_ws_token = False
if isinstance(secret_ws_token, str):
is_valid_ws_token = secrets.compare_digest(secret_ws_token, ws_token)
elif isinstance(secret_ws_token, list):
is_valid_ws_token = any([
secrets.compare_digest(potential, ws_token)
for potential in secret_ws_token
])
if is_valid_ws_token:
return ws_token
# Check if ws_token is a JWT
try:
user = get_user_from_token(ws_token, secret_jwt_key)
return user
# If the token is a jwt, and it's valid return the user
except HTTPException:
pass
# If it doesn't match, close the websocket connection
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
def create_token(data: dict, secret_key: str, token_type: str = "access") -> str:
to_encode = data.copy()
if token_type == "access":
expire = datetime.utcnow() + timedelta(minutes=15)
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
elif token_type == "refresh":
expire = datetime.utcnow() + timedelta(days=30)
expire = datetime.now(timezone.utc) + timedelta(days=30)
else:
raise ValueError()
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"iat": datetime.now(timezone.utc),
"type": token_type,
})
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)

View File

@@ -0,0 +1,146 @@
import logging
from copy import deepcopy
from fastapi import APIRouter, BackgroundTasks, Depends
from fastapi.exceptions import HTTPException
from freqtrade.constants import Config
from freqtrade.enums import CandleType
from freqtrade.exceptions import OperationalException
from freqtrade.persistence import FtNoDBContext
from freqtrade.rpc.api_server.api_schemas import (BackgroundTaskStatus, BgJobStarted,
ExchangeModePayloadMixin, PairListsPayload,
PairListsResponse, WhitelistEvaluateResponse)
from freqtrade.rpc.api_server.deps import get_config, get_exchange
from freqtrade.rpc.api_server.webserver_bgwork import ApiBG
logger = logging.getLogger(__name__)
# Private API, protected by authentication and webserver_mode dependency
router = APIRouter()
@router.get('/background/{jobid}', response_model=BackgroundTaskStatus, tags=['webserver'])
def background_job(jobid: str):
if not (job := ApiBG.jobs.get(jobid)):
raise HTTPException(status_code=404, detail='Job not found.')
return {
'job_id': jobid,
'job_category': job['category'],
'status': job['status'],
'running': job['is_running'],
'progress': job.get('progress'),
# 'job_error': job['error'],
}
@router.get('/pairlists/available',
response_model=PairListsResponse, tags=['pairlists', 'webserver'])
def list_pairlists(config=Depends(get_config)):
from freqtrade.resolvers import PairListResolver
pairlists = PairListResolver.search_all_objects(
config, False)
pairlists = sorted(pairlists, key=lambda x: x['name'])
return {'pairlists': [{
"name": x['name'],
"is_pairlist_generator": x['class'].is_pairlist_generator,
"params": x['class'].available_parameters(),
"description": x['class'].description(),
} for x in pairlists
]}
def __run_pairlist(job_id: str, config_loc: Config):
try:
ApiBG.jobs[job_id]['is_running'] = True
from freqtrade.plugins.pairlistmanager import PairListManager
with FtNoDBContext():
exchange = get_exchange(config_loc)
pairlists = PairListManager(exchange, config_loc)
pairlists.refresh_pairlist()
ApiBG.jobs[job_id]['result'] = {
'method': pairlists.name_list,
'length': len(pairlists.whitelist),
'whitelist': pairlists.whitelist
}
ApiBG.jobs[job_id]['status'] = 'success'
except (OperationalException, Exception) as e:
logger.exception(e)
ApiBG.jobs[job_id]['error'] = str(e)
ApiBG.jobs[job_id]['status'] = 'failed'
finally:
ApiBG.jobs[job_id]['is_running'] = False
ApiBG.pairlist_running = False
@router.post('/pairlists/evaluate', response_model=BgJobStarted, tags=['pairlists', 'webserver'])
def pairlists_evaluate(payload: PairListsPayload, background_tasks: BackgroundTasks,
config=Depends(get_config)):
if ApiBG.pairlist_running:
raise HTTPException(status_code=400, detail='Pairlist evaluation is already running.')
config_loc = deepcopy(config)
config_loc['stake_currency'] = payload.stake_currency
config_loc['pairlists'] = payload.pairlists
handleExchangePayload(payload, config_loc)
# TODO: overwrite blacklist? make it optional and fall back to the one in config?
# Outcome depends on the UI approach.
config_loc['exchange']['pair_blacklist'] = payload.blacklist
# Random job id
job_id = ApiBG.get_job_id()
ApiBG.jobs[job_id] = {
'category': 'pairlist',
'status': 'pending',
'progress': None,
'is_running': False,
'result': {},
'error': None,
}
background_tasks.add_task(__run_pairlist, job_id, config_loc)
ApiBG.pairlist_running = True
return {
'status': 'Pairlist evaluation started in background.',
'job_id': job_id,
}
def handleExchangePayload(payload: ExchangeModePayloadMixin, config_loc: Config):
"""
Handle exchange and trading mode payload.
Updates the configuration with the payload values.
"""
if payload.exchange:
config_loc['exchange']['name'] = payload.exchange
if payload.trading_mode:
config_loc['trading_mode'] = payload.trading_mode
config_loc['candle_type_def'] = CandleType.get_default(
config_loc.get('trading_mode', 'spot') or 'spot')
if payload.margin_mode:
config_loc['margin_mode'] = payload.margin_mode
@router.get('/pairlists/evaluate/{jobid}', response_model=WhitelistEvaluateResponse,
tags=['pairlists', 'webserver'])
def pairlists_evaluate_get(jobid: str):
if not (job := ApiBG.jobs.get(jobid)):
raise HTTPException(status_code=404, detail='Job not found.')
if job['is_running']:
raise HTTPException(status_code=400, detail='Job not finished yet.')
if error := job['error']:
return {
'status': 'failed',
'error': error,
}
return {
'status': 'success',
'result': job['result'],
}

View File

@@ -2,41 +2,130 @@ import asyncio
import logging
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
from fastapi import APIRouter, BackgroundTasks, Depends
from fastapi.exceptions import HTTPException
from freqtrade.configuration.config_validation import validate_config_consistency
from freqtrade.data.btanalysis import get_backtest_resultlist, load_and_merge_backtest_result
from freqtrade.constants import Config
from freqtrade.data.btanalysis import (delete_backtest_result, get_backtest_result,
get_backtest_resultlist, load_and_merge_backtest_result,
update_backtest_metadata)
from freqtrade.enums import BacktestState
from freqtrade.exceptions import DependencyException
from freqtrade.rpc.api_server.api_schemas import (BacktestHistoryEntry, BacktestRequest,
BacktestResponse)
from freqtrade.rpc.api_server.deps import get_config, is_webserver_mode
from freqtrade.rpc.api_server.webserver import ApiServer
from freqtrade.exceptions import DependencyException, OperationalException
from freqtrade.exchange.common import remove_exchange_credentials
from freqtrade.misc import deep_merge_dicts, is_file_in_dir
from freqtrade.rpc.api_server.api_schemas import (BacktestHistoryEntry, BacktestMetadataUpdate,
BacktestRequest, BacktestResponse)
from freqtrade.rpc.api_server.deps import get_config
from freqtrade.rpc.api_server.webserver_bgwork import ApiBG
from freqtrade.rpc.rpc import RPCException
from freqtrade.types import get_BacktestResultType_default
logger = logging.getLogger(__name__)
# Private API, protected by authentication
# Private API, protected by authentication and webserver_mode dependency
router = APIRouter()
def __run_backtest_bg(btconfig: Config):
from freqtrade.optimize.optimize_reports import generate_backtest_stats, store_backtest_stats
from freqtrade.resolvers import StrategyResolver
asyncio.set_event_loop(asyncio.new_event_loop())
try:
# Reload strategy
lastconfig = ApiBG.bt['last_config']
strat = StrategyResolver.load_strategy(btconfig)
validate_config_consistency(btconfig)
if (
not ApiBG.bt['bt']
or lastconfig.get('timeframe') != strat.timeframe
or lastconfig.get('timeframe_detail') != btconfig.get('timeframe_detail')
or lastconfig.get('timerange') != btconfig['timerange']
):
from freqtrade.optimize.backtesting import Backtesting
ApiBG.bt['bt'] = Backtesting(btconfig)
ApiBG.bt['bt'].load_bt_data_detail()
else:
ApiBG.bt['bt'].config = btconfig
ApiBG.bt['bt'].init_backtest()
# Only reload data if timeframe changed.
if (
not ApiBG.bt['data']
or not ApiBG.bt['timerange']
or lastconfig.get('timeframe') != strat.timeframe
or lastconfig.get('timerange') != btconfig['timerange']
):
ApiBG.bt['data'], ApiBG.bt['timerange'] = ApiBG.bt[
'bt'].load_bt_data()
lastconfig['timerange'] = btconfig['timerange']
lastconfig['timeframe'] = strat.timeframe
lastconfig['protections'] = btconfig.get('protections', [])
lastconfig['enable_protections'] = btconfig.get('enable_protections')
lastconfig['dry_run_wallet'] = btconfig.get('dry_run_wallet')
ApiBG.bt['bt'].enable_protections = btconfig.get('enable_protections', False)
ApiBG.bt['bt'].strategylist = [strat]
ApiBG.bt['bt'].results = get_BacktestResultType_default()
ApiBG.bt['bt'].load_prior_backtest()
ApiBG.bt['bt'].abort = False
strategy_name = strat.get_strategy_name()
if (ApiBG.bt['bt'].results and
strategy_name in ApiBG.bt['bt'].results['strategy']):
# When previous result hash matches - reuse that result and skip backtesting.
logger.info(f'Reusing result of previous backtest for {strategy_name}')
else:
min_date, max_date = ApiBG.bt['bt'].backtest_one_strategy(
strat, ApiBG.bt['data'], ApiBG.bt['timerange'])
ApiBG.bt['bt'].results = generate_backtest_stats(
ApiBG.bt['data'], ApiBG.bt['bt'].all_results,
min_date=min_date, max_date=max_date)
if btconfig.get('export', 'none') == 'trades':
fn = store_backtest_stats(
btconfig['exportfilename'], ApiBG.bt['bt'].results,
datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
)
ApiBG.bt['bt'].results['metadata'][strategy_name]['filename'] = str(fn.name)
ApiBG.bt['bt'].results['metadata'][strategy_name]['strategy'] = strategy_name
logger.info("Backtest finished.")
except (Exception, OperationalException, DependencyException) as e:
logger.exception(f"Backtesting caused an error: {e}")
ApiBG.bt['bt_error'] = str(e)
pass
finally:
ApiBG.bgtask_running = False
@router.post('/backtest', response_model=BacktestResponse, tags=['webserver', 'backtest'])
# flake8: noqa: C901
async def api_start_backtest(bt_settings: BacktestRequest, background_tasks: BackgroundTasks,
config=Depends(get_config), ws_mode=Depends(is_webserver_mode)):
async def api_start_backtest(
bt_settings: BacktestRequest, background_tasks: BackgroundTasks,
config=Depends(get_config)):
ApiBG.bt['bt_error'] = None
"""Start backtesting if not done so already"""
if ApiServer._bgtask_running:
if ApiBG.bgtask_running:
raise RPCException('Bot Background task already running')
if ':' in bt_settings.strategy:
raise HTTPException(status_code=500, detail="base64 encoded strategies are not allowed.")
btconfig = deepcopy(config)
remove_exchange_credentials(btconfig['exchange'], True)
settings = dict(bt_settings)
if settings.get('freqai', None) is not None:
settings['freqai'] = dict(settings['freqai'])
# Pydantic models will contain all keys, but non-provided ones are None
for setting in settings.keys():
if settings[setting] is not None:
btconfig[setting] = settings[setting]
btconfig = deep_merge_dicts(settings, btconfig, allow_null_overrides=False)
try:
btconfig['stake_amount'] = float(btconfig['stake_amount'])
except ValueError:
@@ -47,77 +136,9 @@ async def api_start_backtest(bt_settings: BacktestRequest, background_tasks: Bac
# Start backtesting
# Initialize backtesting object
def run_backtest():
from freqtrade.optimize.optimize_reports import (generate_backtest_stats,
store_backtest_stats)
from freqtrade.resolvers import StrategyResolver
asyncio.set_event_loop(asyncio.new_event_loop())
try:
# Reload strategy
lastconfig = ApiServer._bt_last_config
strat = StrategyResolver.load_strategy(btconfig)
validate_config_consistency(btconfig)
if (
not ApiServer._bt
or lastconfig.get('timeframe') != strat.timeframe
or lastconfig.get('timeframe_detail') != btconfig.get('timeframe_detail')
or lastconfig.get('timerange') != btconfig['timerange']
):
from freqtrade.optimize.backtesting import Backtesting
ApiServer._bt = Backtesting(btconfig)
ApiServer._bt.load_bt_data_detail()
else:
ApiServer._bt.config = btconfig
ApiServer._bt.init_backtest()
# Only reload data if timeframe changed.
if (
not ApiServer._bt_data
or not ApiServer._bt_timerange
or lastconfig.get('timeframe') != strat.timeframe
or lastconfig.get('timerange') != btconfig['timerange']
):
ApiServer._bt_data, ApiServer._bt_timerange = ApiServer._bt.load_bt_data()
lastconfig['timerange'] = btconfig['timerange']
lastconfig['timeframe'] = strat.timeframe
lastconfig['protections'] = btconfig.get('protections', [])
lastconfig['enable_protections'] = btconfig.get('enable_protections')
lastconfig['dry_run_wallet'] = btconfig.get('dry_run_wallet')
ApiServer._bt.strategylist = [strat]
ApiServer._bt.results = {}
ApiServer._bt.load_prior_backtest()
ApiServer._bt.abort = False
if (ApiServer._bt.results and
strat.get_strategy_name() in ApiServer._bt.results['strategy']):
# When previous result hash matches - reuse that result and skip backtesting.
logger.info(f'Reusing result of previous backtest for {strat.get_strategy_name()}')
else:
min_date, max_date = ApiServer._bt.backtest_one_strategy(
strat, ApiServer._bt_data, ApiServer._bt_timerange)
ApiServer._bt.results = generate_backtest_stats(
ApiServer._bt_data, ApiServer._bt.all_results,
min_date=min_date, max_date=max_date)
if btconfig.get('export', 'none') == 'trades':
store_backtest_stats(
btconfig['exportfilename'], ApiServer._bt.results,
datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
)
logger.info("Backtest finished.")
except DependencyException as e:
logger.info(f"Backtesting caused an error: {e}")
pass
finally:
ApiServer._bgtask_running = False
background_tasks.add_task(run_backtest)
ApiServer._bgtask_running = True
background_tasks.add_task(__run_backtest_bg, btconfig=btconfig)
ApiBG.bgtask_running = True
return {
"status": "running",
@@ -129,23 +150,24 @@ async def api_start_backtest(bt_settings: BacktestRequest, background_tasks: Bac
@router.get('/backtest', response_model=BacktestResponse, tags=['webserver', 'backtest'])
def api_get_backtest(ws_mode=Depends(is_webserver_mode)):
def api_get_backtest():
"""
Get backtesting result.
Returns Result after backtesting has been ran.
"""
from freqtrade.persistence import LocalTrade
if ApiServer._bgtask_running:
if ApiBG.bgtask_running:
return {
"status": "running",
"running": True,
"step": ApiServer._bt.progress.action if ApiServer._bt else str(BacktestState.STARTUP),
"progress": ApiServer._bt.progress.progress if ApiServer._bt else 0,
"step": (ApiBG.bt['bt'].progress.action if ApiBG.bt['bt']
else str(BacktestState.STARTUP)),
"progress": ApiBG.bt['bt'].progress.progress if ApiBG.bt['bt'] else 0,
"trade_count": len(LocalTrade.trades),
"status_msg": "Backtest running",
}
if not ApiServer._bt:
if not ApiBG.bt['bt']:
return {
"status": "not_started",
"running": False,
@@ -153,6 +175,14 @@ def api_get_backtest(ws_mode=Depends(is_webserver_mode)):
"progress": 0,
"status_msg": "Backtest not yet executed"
}
if ApiBG.bt['bt_error']:
return {
"status": "error",
"running": False,
"step": "",
"progress": 0,
"status_msg": f"Backtest failed with {ApiBG.bt['bt_error']}"
}
return {
"status": "ended",
@@ -160,14 +190,14 @@ def api_get_backtest(ws_mode=Depends(is_webserver_mode)):
"status_msg": "Backtest ended",
"step": "finished",
"progress": 1,
"backtest_result": ApiServer._bt.results,
"backtest_result": ApiBG.bt['bt'].results,
}
@router.delete('/backtest', response_model=BacktestResponse, tags=['webserver', 'backtest'])
def api_delete_backtest(ws_mode=Depends(is_webserver_mode)):
def api_delete_backtest():
"""Reset backtesting"""
if ApiServer._bgtask_running:
if ApiBG.bgtask_running:
return {
"status": "running",
"running": True,
@@ -175,12 +205,12 @@ def api_delete_backtest(ws_mode=Depends(is_webserver_mode)):
"progress": 0,
"status_msg": "Backtest running",
}
if ApiServer._bt:
ApiServer._bt.cleanup()
del ApiServer._bt
ApiServer._bt = None
del ApiServer._bt_data
ApiServer._bt_data = None
if ApiBG.bt['bt']:
ApiBG.bt['bt'].cleanup()
del ApiBG.bt['bt']
ApiBG.bt['bt'] = None
del ApiBG.bt['data']
ApiBG.bt['data'] = None
logger.info("Backtesting reset")
return {
"status": "reset",
@@ -192,8 +222,8 @@ def api_delete_backtest(ws_mode=Depends(is_webserver_mode)):
@router.get('/backtest/abort', response_model=BacktestResponse, tags=['webserver', 'backtest'])
def api_backtest_abort(ws_mode=Depends(is_webserver_mode)):
if not ApiServer._bgtask_running:
def api_backtest_abort():
if not ApiBG.bgtask_running:
return {
"status": "not_running",
"running": False,
@@ -201,7 +231,7 @@ def api_backtest_abort(ws_mode=Depends(is_webserver_mode)):
"progress": 0,
"status_msg": "Backtest ended",
}
ApiServer._bt.abort = True
ApiBG.bt['bt'].abort = True
return {
"status": "stopping",
"running": False,
@@ -211,22 +241,27 @@ def api_backtest_abort(ws_mode=Depends(is_webserver_mode)):
}
@router.get('/backtest/history', response_model=List[BacktestHistoryEntry], tags=['webserver', 'backtest'])
def api_backtest_history(config=Depends(get_config), ws_mode=Depends(is_webserver_mode)):
@router.get('/backtest/history', response_model=List[BacktestHistoryEntry],
tags=['webserver', 'backtest'])
def api_backtest_history(config=Depends(get_config)):
# Get backtest result history, read from metadata files
return get_backtest_resultlist(config['user_data_dir'] / 'backtest_results')
@router.get('/backtest/history/result', response_model=BacktestResponse, tags=['webserver', 'backtest'])
def api_backtest_history_result(filename: str, strategy: str, config=Depends(get_config), ws_mode=Depends(is_webserver_mode)):
@router.get('/backtest/history/result', response_model=BacktestResponse,
tags=['webserver', 'backtest'])
def api_backtest_history_result(filename: str, strategy: str, config=Depends(get_config)):
# Get backtest result history, read from metadata files
fn = config['user_data_dir'] / 'backtest_results' / filename
bt_results_base: Path = config['user_data_dir'] / 'backtest_results'
fn = (bt_results_base / filename).with_suffix('.json')
results: Dict[str, Any] = {
'metadata': {},
'strategy': {},
'strategy_comparison': [],
}
if not is_file_in_dir(fn, bt_results_base):
raise HTTPException(status_code=404, detail="File not found.")
load_and_merge_backtest_result(strategy, fn, results)
return {
"status": "ended",
@@ -236,3 +271,38 @@ def api_backtest_history_result(filename: str, strategy: str, config=Depends(get
"status_msg": "Historic result",
"backtest_result": results,
}
@router.delete('/backtest/history/{file}', response_model=List[BacktestHistoryEntry],
tags=['webserver', 'backtest'])
def api_delete_backtest_history_entry(file: str, config=Depends(get_config)):
# Get backtest result history, read from metadata files
bt_results_base: Path = config['user_data_dir'] / 'backtest_results'
file_abs = (bt_results_base / file).with_suffix('.json')
# Ensure file is in backtest_results directory
if not is_file_in_dir(file_abs, bt_results_base):
raise HTTPException(status_code=404, detail="File not found.")
delete_backtest_result(file_abs)
return get_backtest_resultlist(config['user_data_dir'] / 'backtest_results')
@router.patch('/backtest/history/{file}', response_model=List[BacktestHistoryEntry],
tags=['webserver', 'backtest'])
def api_update_backtest_history_entry(file: str, body: BacktestMetadataUpdate,
config=Depends(get_config)):
# Get backtest result history, read from metadata files
bt_results_base: Path = config['user_data_dir'] / 'backtest_results'
file_abs = (bt_results_base / file).with_suffix('.json')
# Ensure file is in backtest_results directory
if not is_file_in_dir(file_abs, bt_results_base):
raise HTTPException(status_code=404, detail="File not found.")
content = {
'notes': body.notes
}
try:
update_backtest_metadata(file_abs, body.strategy, content)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return get_backtest_result(file_abs)

View File

@@ -1,10 +1,17 @@
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from pydantic import BaseModel, RootModel, SerializeAsAny
from freqtrade.constants import DATETIME_PRINT_FORMAT
from freqtrade.enums import OrderTypeValues, SignalDirection, TradingMode
from freqtrade.constants import IntOrInf
from freqtrade.enums import MarginMode, OrderTypeValues, SignalDirection, TradingMode
from freqtrade.types import ValidExchangesType
class ExchangeModePayloadMixin(BaseModel):
trading_mode: Optional[TradingMode] = None
margin_mode: Optional[MarginMode] = None
exchange: Optional[str] = None
class Ping(BaseModel):
@@ -27,6 +34,23 @@ class StatusMsg(BaseModel):
status: str
class BgJobStarted(StatusMsg):
job_id: str
class BackgroundTaskStatus(BaseModel):
job_id: str
job_category: str
status: str
running: bool
progress: Optional[float] = None
class BackgroundTaskResult(BaseModel):
error: Optional[str] = None
status: str
class ResultMsg(BaseModel):
result: str
@@ -36,20 +60,25 @@ class Balance(BaseModel):
free: float
balance: float
used: float
bot_owned: Optional[float] = None
est_stake: float
est_stake_bot: Optional[float] = None
stake: str
# Starting with 2.x
side: str
leverage: float
is_position: bool
position: float
is_bot_managed: bool
class Balances(BaseModel):
currencies: List[Balance]
total: float
total_bot: float
symbol: str
value: float
value_bot: float
stake: str
note: str
starting_capital: float
@@ -66,15 +95,30 @@ class Count(BaseModel):
total_stake: float
class PerformanceEntry(BaseModel):
pair: str
profit: float
class __BaseStatsModel(BaseModel):
profit_ratio: float
profit_pct: float
profit_abs: float
count: int
class Entry(__BaseStatsModel):
enter_tag: str
class Exit(__BaseStatsModel):
exit_reason: str
class MixTag(__BaseStatsModel):
mix_tag: str
class PerformanceEntry(__BaseStatsModel):
pair: str
profit: float
class Profit(BaseModel):
profit_closed_coin: float
profit_closed_percent_mean: float
@@ -95,8 +139,10 @@ class Profit(BaseModel):
trade_count: int
closed_trade_count: int
first_trade_date: str
first_trade_humanized: str
first_trade_timestamp: int
latest_trade_date: str
latest_trade_humanized: str
latest_trade_timestamp: int
avg_duration: str
best_pair: str
@@ -105,9 +151,18 @@ class Profit(BaseModel):
winning_trades: int
losing_trades: int
profit_factor: float
winrate: float
expectancy: float
expectancy_ratio: float
max_drawdown: float
max_drawdown_abs: float
trading_volume: Optional[float]
max_drawdown_start: str
max_drawdown_start_timestamp: int
max_drawdown_end: str
max_drawdown_end_timestamp: int
trading_volume: Optional[float] = None
bot_start_timestamp: int
bot_start_date: str
class SellReason(BaseModel):
@@ -121,7 +176,7 @@ class Stats(BaseModel):
durations: Dict[str, Optional[float]]
class DailyRecord(BaseModel):
class DailyWeeklyMonthlyRecord(BaseModel):
date: date
abs_profit: float
rel_profit: float
@@ -130,56 +185,57 @@ class DailyRecord(BaseModel):
trade_count: int
class Daily(BaseModel):
data: List[DailyRecord]
class DailyWeeklyMonthly(BaseModel):
data: List[DailyWeeklyMonthlyRecord]
fiat_display_currency: str
stake_currency: str
class UnfilledTimeout(BaseModel):
entry: Optional[int]
exit: Optional[int]
unit: Optional[str]
exit_timeout_count: Optional[int]
entry: Optional[int] = None
exit: Optional[int] = None
unit: Optional[str] = None
exit_timeout_count: Optional[int] = None
class OrderTypes(BaseModel):
entry: OrderTypeValues
exit: OrderTypeValues
emergency_exit: Optional[OrderTypeValues]
force_exit: Optional[OrderTypeValues]
force_entry: Optional[OrderTypeValues]
emergency_exit: Optional[OrderTypeValues] = None
force_exit: Optional[OrderTypeValues] = None
force_entry: Optional[OrderTypeValues] = None
stoploss: OrderTypeValues
stoploss_on_exchange: bool
stoploss_on_exchange_interval: Optional[int]
stoploss_on_exchange_interval: Optional[int] = None
class ShowConfig(BaseModel):
version: str
strategy_version: Optional[str]
strategy_version: Optional[str] = None
api_version: float
dry_run: bool
trading_mode: str
short_allowed: bool
stake_currency: str
stake_amount: str
available_capital: Optional[float]
available_capital: Optional[float] = None
stake_currency_decimals: int
max_open_trades: int
max_open_trades: IntOrInf
minimal_roi: Dict[str, Any]
stoploss: Optional[float]
trailing_stop: Optional[bool]
trailing_stop_positive: Optional[float]
trailing_stop_positive_offset: Optional[float]
trailing_only_offset_is_reached: Optional[bool]
unfilledtimeout: Optional[UnfilledTimeout] # Empty in webserver mode
order_types: Optional[OrderTypes]
use_custom_stoploss: Optional[bool]
timeframe: Optional[str]
stoploss: Optional[float] = None
stoploss_on_exchange: bool
trailing_stop: Optional[bool] = None
trailing_stop_positive: Optional[float] = None
trailing_stop_positive_offset: Optional[float] = None
trailing_only_offset_is_reached: Optional[bool] = None
unfilledtimeout: Optional[UnfilledTimeout] = None # Empty in webserver mode
order_types: Optional[OrderTypes] = None
use_custom_stoploss: Optional[bool] = None
timeframe: Optional[str] = None
timeframe_ms: int
timeframe_min: int
exchange: str
strategy: Optional[str]
strategy: Optional[str] = None
force_entry_enable: bool
exit_pricing: Dict[str, Any]
entry_pricing: Dict[str, Any]
@@ -194,16 +250,18 @@ class OrderSchema(BaseModel):
pair: str
order_id: str
status: str
remaining: Optional[float]
remaining: Optional[float] = None
amount: float
safe_price: float
cost: float
filled: Optional[float]
filled: Optional[float] = None
ft_order_side: str
order_type: str
is_open: bool
order_timestamp: Optional[int]
order_filled_timestamp: Optional[int]
order_timestamp: Optional[int] = None
order_filled_timestamp: Optional[int] = None
ft_fee_base: Optional[float] = None
ft_order_tag: Optional[str] = None
class TradeSchema(BaseModel):
@@ -217,67 +275,78 @@ class TradeSchema(BaseModel):
amount: float
amount_requested: float
stake_amount: float
max_stake_amount: Optional[float] = None
strategy: str
buy_tag: Optional[str] # Deprecated
enter_tag: Optional[str]
enter_tag: Optional[str] = None
timeframe: int
fee_open: Optional[float]
fee_open_cost: Optional[float]
fee_open_currency: Optional[str]
fee_close: Optional[float]
fee_close_cost: Optional[float]
fee_close_currency: Optional[str]
fee_open: Optional[float] = None
fee_open_cost: Optional[float] = None
fee_open_currency: Optional[str] = None
fee_close: Optional[float] = None
fee_close_cost: Optional[float] = None
fee_close_currency: Optional[str] = None
open_date: str
open_timestamp: int
open_rate: float
open_rate_requested: Optional[float]
open_rate_requested: Optional[float] = None
open_trade_value: float
close_date: Optional[str]
close_timestamp: Optional[int]
close_rate: Optional[float]
close_rate_requested: Optional[float]
close_profit: Optional[float]
close_profit_pct: Optional[float]
close_profit_abs: Optional[float]
profit_ratio: Optional[float]
profit_pct: Optional[float]
profit_abs: Optional[float]
profit_fiat: Optional[float]
sell_reason: Optional[str] # Deprecated
exit_reason: Optional[str]
exit_order_status: Optional[str]
stop_loss_abs: Optional[float]
stop_loss_ratio: Optional[float]
stop_loss_pct: Optional[float]
stoploss_order_id: Optional[str]
stoploss_last_update: Optional[str]
stoploss_last_update_timestamp: Optional[int]
initial_stop_loss_abs: Optional[float]
initial_stop_loss_ratio: Optional[float]
initial_stop_loss_pct: Optional[float]
min_rate: Optional[float]
max_rate: Optional[float]
open_order_id: Optional[str]
close_date: Optional[str] = None
close_timestamp: Optional[int] = None
close_rate: Optional[float] = None
close_rate_requested: Optional[float] = None
close_profit: Optional[float] = None
close_profit_pct: Optional[float] = None
close_profit_abs: Optional[float] = None
profit_ratio: Optional[float] = None
profit_pct: Optional[float] = None
profit_abs: Optional[float] = None
profit_fiat: Optional[float] = None
realized_profit: float
realized_profit_ratio: Optional[float] = None
exit_reason: Optional[str] = None
exit_order_status: Optional[str] = None
stop_loss_abs: Optional[float] = None
stop_loss_ratio: Optional[float] = None
stop_loss_pct: Optional[float] = None
stoploss_last_update: Optional[str] = None
stoploss_last_update_timestamp: Optional[int] = None
initial_stop_loss_abs: Optional[float] = None
initial_stop_loss_ratio: Optional[float] = None
initial_stop_loss_pct: Optional[float] = None
min_rate: Optional[float] = None
max_rate: Optional[float] = None
has_open_orders: bool
orders: List[OrderSchema]
leverage: Optional[float]
interest_rate: Optional[float]
liquidation_price: Optional[float]
funding_fees: Optional[float]
trading_mode: Optional[TradingMode]
leverage: Optional[float] = None
interest_rate: Optional[float] = None
liquidation_price: Optional[float] = None
funding_fees: Optional[float] = None
trading_mode: Optional[TradingMode] = None
amount_precision: Optional[float] = None
price_precision: Optional[float] = None
precision_mode: Optional[int] = None
class OpenTradeSchema(TradeSchema):
stoploss_current_dist: Optional[float]
stoploss_current_dist_pct: Optional[float]
stoploss_current_dist_ratio: Optional[float]
stoploss_entry_dist: Optional[float]
stoploss_entry_dist_ratio: Optional[float]
current_profit: float
current_profit_abs: float
current_profit_pct: float
stoploss_current_dist: Optional[float] = None
stoploss_current_dist_pct: Optional[float] = None
stoploss_current_dist_ratio: Optional[float] = None
stoploss_entry_dist: Optional[float] = None
stoploss_entry_dist_ratio: Optional[float] = None
current_rate: float
open_order: Optional[str]
total_profit_abs: float
total_profit_fiat: Optional[float] = None
total_profit_ratio: Optional[float] = None
class TradeResponse(BaseModel):
@@ -287,8 +356,7 @@ class TradeResponse(BaseModel):
total_trades: int
class ForceEnterResponse(BaseModel):
__root__: Union[TradeSchema, StatusMsg]
ForceEnterResponse = RootModel[Union[TradeSchema, StatusMsg]]
class LockModel(BaseModel):
@@ -300,7 +368,7 @@ class LockModel(BaseModel):
lock_timestamp: int
pair: str
side: str
reason: str
reason: Optional[str] = None
class Locks(BaseModel):
@@ -309,8 +377,8 @@ class Locks(BaseModel):
class DeleteLockRequest(BaseModel):
pair: Optional[str]
lockid: Optional[int]
pair: Optional[str] = None
lockid: Optional[int] = None
class Logs(BaseModel):
@@ -321,17 +389,17 @@ class Logs(BaseModel):
class ForceEnterPayload(BaseModel):
pair: str
side: SignalDirection = SignalDirection.LONG
price: Optional[float]
ordertype: Optional[OrderTypeValues]
stakeamount: Optional[float]
entry_tag: Optional[str]
leverage: Optional[float]
price: Optional[float] = None
ordertype: Optional[OrderTypeValues] = None
stakeamount: Optional[float] = None
entry_tag: Optional[str] = None
leverage: Optional[float] = None
class ForceExitPayload(BaseModel):
tradeid: str
ordertype: Optional[OrderTypeValues]
amount: Optional[float]
ordertype: Optional[OrderTypeValues] = None
amount: Optional[float] = None
class BlacklistPayload(BaseModel):
@@ -352,6 +420,10 @@ class WhitelistResponse(BaseModel):
method: List[str]
class WhitelistEvaluateResponse(BackgroundTaskResult):
result: Optional[WhitelistResponse] = None
class DeleteTrade(BaseModel):
cancel_order_count: int
result: str
@@ -364,17 +436,42 @@ class PlotConfig_(BaseModel):
subplots: Dict[str, Any]
class PlotConfig(BaseModel):
__root__: Union[PlotConfig_, Dict]
PlotConfig = RootModel[Union[PlotConfig_, Dict]]
class StrategyListResponse(BaseModel):
strategies: List[str]
class ExchangeListResponse(BaseModel):
exchanges: List[ValidExchangesType]
class PairListResponse(BaseModel):
name: str
description: str
is_pairlist_generator: bool
params: Dict[str, Any]
class PairListsResponse(BaseModel):
pairlists: List[PairListResponse]
class PairListsPayload(ExchangeModePayloadMixin, BaseModel):
pairlists: List[Dict[str, Any]]
blacklist: List[str]
stake_currency: str
class FreqAIModelListResponse(BaseModel):
freqaimodels: List[str]
class StrategyResponse(BaseModel):
strategy: str
code: str
timeframe: Optional[str]
class AvailablePairs(BaseModel):
@@ -389,7 +486,7 @@ class PairHistory(BaseModel):
timeframe: str
timeframe_ms: int
columns: List[str]
data: List[Any]
data: SerializeAsAny[List[Any]]
length: int
buy_signals: int
sell_signals: int
@@ -404,21 +501,23 @@ class PairHistory(BaseModel):
data_stop: str
data_stop_ts: int
class Config:
json_encoders = {
datetime: lambda v: v.strftime(DATETIME_PRINT_FORMAT),
}
class BacktestFreqAIInputs(BaseModel):
identifier: str
class BacktestRequest(BaseModel):
strategy: str
timeframe: Optional[str]
timeframe_detail: Optional[str]
timerange: Optional[str]
max_open_trades: Optional[int]
stake_amount: Optional[str]
timeframe: Optional[str] = None
timeframe_detail: Optional[str] = None
timerange: Optional[str] = None
max_open_trades: Optional[IntOrInf] = None
stake_amount: Optional[Union[str, float]] = None
enable_protections: bool
dry_run_wallet: Optional[float]
dry_run_wallet: Optional[float] = None
backtest_cache: Optional[str] = None
freqaimodel: Optional[str] = None
freqai: Optional[BacktestFreqAIInputs] = None
class BacktestResponse(BaseModel):
@@ -427,16 +526,27 @@ class BacktestResponse(BaseModel):
status_msg: str
step: str
progress: float
trade_count: Optional[float]
trade_count: Optional[float] = None
# TODO: Properly type backtestresult...
backtest_result: Optional[Dict[str, Any]]
backtest_result: Optional[Dict[str, Any]] = None
# TODO: This is a copy of BacktestHistoryEntryType
class BacktestHistoryEntry(BaseModel):
filename: str
strategy: str
run_id: str
backtest_start_time: int
notes: Optional[str] = ''
backtest_start_ts: Optional[int] = None
backtest_end_ts: Optional[int] = None
timeframe: Optional[str] = None
timeframe_detail: Optional[str] = None
class BacktestMetadataUpdate(BaseModel):
strategy: str
notes: str = ''
class SysInfo(BaseModel):
@@ -445,5 +555,5 @@ class SysInfo(BaseModel):
class Health(BaseModel):
last_process: datetime
last_process_ts: int
last_process: Optional[datetime] = None
last_process_ts: Optional[int] = None

View File

@@ -1,22 +1,22 @@
import logging
from copy import deepcopy
from pathlib import Path
from typing import List, Optional
from fastapi import APIRouter, Depends, Query
from fastapi.exceptions import HTTPException
from freqtrade import __version__
from freqtrade.constants import USERPATH_STRATEGIES
from freqtrade.data.history import get_datahandler
from freqtrade.enums import CandleType, TradingMode
from freqtrade.exceptions import OperationalException
from freqtrade.rpc import RPC
from freqtrade.rpc.api_server.api_schemas import (AvailablePairs, Balances, BlacklistPayload,
BlacklistResponse, Count, Daily,
DeleteLockRequest, DeleteTrade, ForceEnterPayload,
ForceEnterResponse, ForceExitPayload, Health,
Locks, Logs, OpenTradeSchema, PairHistory,
BlacklistResponse, Count, DailyWeeklyMonthly,
DeleteLockRequest, DeleteTrade, Entry,
ExchangeListResponse, Exit, ForceEnterPayload,
ForceEnterResponse, ForceExitPayload,
FreqAIModelListResponse, Health, Locks, Logs,
MixTag, OpenTradeSchema, PairHistory,
PerformanceEntry, Ping, PlotConfig, Profit,
ResultMsg, ShowConfig, Stats, StatusMsg,
StrategyListResponse, StrategyResponse, SysInfo,
@@ -38,7 +38,22 @@ logger = logging.getLogger(__name__)
# 2.15: Add backtest history endpoints
# 2.16: Additional daily metrics
# 2.17: Forceentry - leverage, partial force_exit
API_VERSION = 2.17
# 2.20: Add websocket endpoints
# 2.21: Add new_candle messagetype
# 2.22: Add FreqAI to backtesting
# 2.23: Allow plot config request in webserver mode
# 2.24: Add cancel_open_order endpoint
# 2.25: Add several profit values to /status endpoint
# 2.26: increase /balance output
# 2.27: Add /trades/<id>/reload endpoint
# 2.28: Switch reload endpoint to Post
# 2.29: Add /exchanges endpoint
# 2.30: new /pairlists endpoint
# 2.31: new /backtest/history/ delete endpoint
# 2.32: new /backtest/history/ patch endpoint
# 2.33: Additional weekly/monthly metrics
# 2.34: new entries/exits/mix_tags endpoints
API_VERSION = 2.34
# Public API, requires no auth.
router_public = APIRouter()
@@ -69,6 +84,21 @@ def count(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_count()
@router.get('/entries', response_model=List[Entry], tags=['info'])
def entries(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_enter_tag_performance(pair)
@router.get('/exits', response_model=List[Exit], tags=['info'])
def exits(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_exit_reason_performance(pair)
@router.get('/mix_tags', response_model=List[MixTag], tags=['info'])
def mix_tags(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_mix_tag_performance(pair)
@router.get('/performance', response_model=List[PerformanceEntry], tags=['info'])
def performance(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_performance()
@@ -86,12 +116,24 @@ def stats(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_stats()
@router.get('/daily', response_model=Daily, tags=['info'])
@router.get('/daily', response_model=DailyWeeklyMonthly, tags=['info'])
def daily(timescale: int = 7, rpc: RPC = Depends(get_rpc), config=Depends(get_config)):
return rpc._rpc_timeunit_profit(timescale, config['stake_currency'],
config.get('fiat_display_currency', ''))
@router.get('/weekly', response_model=DailyWeeklyMonthly, tags=['info'])
def weekly(timescale: int = 4, rpc: RPC = Depends(get_rpc), config=Depends(get_config)):
return rpc._rpc_timeunit_profit(timescale, config['stake_currency'],
config.get('fiat_display_currency', ''), 'weeks')
@router.get('/monthly', response_model=DailyWeeklyMonthly, tags=['info'])
def monthly(timescale: int = 3, rpc: RPC = Depends(get_rpc), config=Depends(get_config)):
return rpc._rpc_timeunit_profit(timescale, config['stake_currency'],
config.get('fiat_display_currency', ''), 'months')
@router.get('/status', response_model=List[OpenTradeSchema], tags=['info'])
def status(rpc: RPC = Depends(get_rpc)):
try:
@@ -120,6 +162,18 @@ def trades_delete(tradeid: int, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_delete(tradeid)
@router.delete('/trades/{tradeid}/open-order', response_model=OpenTradeSchema, tags=['trading'])
def trade_cancel_open_order(tradeid: int, rpc: RPC = Depends(get_rpc)):
rpc._rpc_cancel_open_order(tradeid)
return rpc._rpc_trade_status([tradeid])[0]
@router.post('/trades/{tradeid}/reload', response_model=OpenTradeSchema, tags=['trading'])
def trade_reload(tradeid: int, rpc: RPC = Depends(get_rpc)):
rpc._rpc_reload_trade_from_exchange(tradeid)
return rpc._rpc_trade_status([tradeid])[0]
# TODO: Missing response model
@router.get('/edge', tags=['info'])
def edge(rpc: RPC = Depends(get_rpc)):
@@ -150,9 +204,9 @@ def force_entry(payload: ForceEnterPayload, rpc: RPC = Depends(get_rpc)):
leverage=payload.leverage)
if trade:
return ForceEnterResponse.parse_obj(trade.to_json())
return ForceEnterResponse.model_validate(trade.to_json())
else:
return ForceEnterResponse.parse_obj(
return ForceEnterResponse.model_validate(
{"status": f"Error entering {payload.side} trade for pair {payload.pair}."})
@@ -216,9 +270,10 @@ def stop(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_stop()
@router.post('/stopentry', response_model=StatusMsg, tags=['botcontrol'])
@router.post('/stopbuy', response_model=StatusMsg, tags=['botcontrol'])
def stop_buy(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_stopbuy()
return rpc._rpc_stopentry()
@router.post('/reload_config', response_model=StatusMsg, tags=['botcontrol'])
@@ -234,28 +289,45 @@ def pair_candles(
@router.get('/pair_history', response_model=PairHistory, tags=['candle data'])
def pair_history(pair: str, timeframe: str, timerange: str, strategy: str,
freqaimodel: Optional[str] = None,
config=Depends(get_config), exchange=Depends(get_exchange)):
# The initial call to this endpoint can be slow, as it may need to initialize
# the exchange class.
config = deepcopy(config)
config.update({
'strategy': strategy,
'timerange': timerange,
'freqaimodel': freqaimodel if freqaimodel else config.get('freqaimodel'),
})
return RPC._rpc_analysed_history_full(config, pair, timeframe, timerange, exchange)
try:
return RPC._rpc_analysed_history_full(config, pair, timeframe, exchange)
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@router.get('/plot_config', response_model=PlotConfig, tags=['candle data'])
def plot_config(rpc: RPC = Depends(get_rpc)):
return PlotConfig.parse_obj(rpc._rpc_plot_config())
def plot_config(strategy: Optional[str] = None, config=Depends(get_config),
rpc: Optional[RPC] = Depends(get_rpc_optional)):
if not strategy:
if not rpc:
raise RPCException("Strategy is mandatory in webserver mode.")
return PlotConfig.model_validate(rpc._rpc_plot_config())
else:
config1 = deepcopy(config)
config1.update({
'strategy': strategy
})
try:
return PlotConfig.model_validate(RPC._rpc_plot_config_with_strategy(config1))
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@router.get('/strategies', response_model=StrategyListResponse, tags=['strategy'])
def list_strategies(config=Depends(get_config)):
directory = Path(config.get(
'strategy_path', config['user_data_dir'] / USERPATH_STRATEGIES))
from freqtrade.resolvers.strategy_resolver import StrategyResolver
strategies = StrategyResolver.search_all_objects(
directory, False, config.get('recursive_strategy_search', False))
config, False, config.get('recursive_strategy_search', False))
strategies = sorted(strategies, key=lambda x: x['name'])
return {'strategies': [x['name'] for x in strategies]}
@@ -263,6 +335,8 @@ def list_strategies(config=Depends(get_config)):
@router.get('/strategy/{strategy}', response_model=StrategyResponse, tags=['strategy'])
def get_strategy(strategy: str, config=Depends(get_config)):
if ":" in strategy:
raise HTTPException(status_code=500, detail="base64 encoded strategies are not allowed.")
config_ = deepcopy(config)
from freqtrade.resolvers.strategy_resolver import StrategyResolver
@@ -271,13 +345,34 @@ def get_strategy(strategy: str, config=Depends(get_config)):
extra_dir=config_.get('strategy_path'))
except OperationalException:
raise HTTPException(status_code=404, detail='Strategy not found')
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
return {
'strategy': strategy_obj.get_strategy_name(),
'code': strategy_obj.__source__,
'timeframe': getattr(strategy_obj, 'timeframe', None),
}
@router.get('/exchanges', response_model=ExchangeListResponse, tags=[])
def list_exchanges(config=Depends(get_config)):
from freqtrade.exchange import list_available_exchanges
exchanges = list_available_exchanges(config)
return {
'exchanges': exchanges,
}
@router.get('/freqaimodels', response_model=FreqAIModelListResponse, tags=['freqai'])
def list_freqaimodels(config=Depends(get_config)):
from freqtrade.resolvers.freqaimodel_resolver import FreqaiModelResolver
models = FreqaiModelResolver.search_all_objects(
config, False)
models = sorted(models, key=lambda x: x['name'])
return {'freqaimodels': [x['name'] for x in models]}
@router.get('/available_pairs', response_model=AvailablePairs, tags=['candle data'])
def list_available_pairs(timeframe: Optional[str] = None, stake_currency: Optional[str] = None,
candletype: Optional[CandleType] = None, config=Depends(get_config)):
@@ -315,4 +410,4 @@ def sysinfo():
@router.get('/health', response_model=Health, tags=['info'])
def health(rpc: RPC = Depends(get_rpc)):
return rpc._health()
return rpc.health()

View File

@@ -0,0 +1,123 @@
import logging
import time
from typing import Any, Dict
from fastapi import APIRouter, Depends
from fastapi.websockets import WebSocket
from pydantic import ValidationError
from freqtrade.enums import RPCMessageType, RPCRequestType
from freqtrade.exceptions import FreqtradeException
from freqtrade.rpc.api_server.api_auth import validate_ws_token
from freqtrade.rpc.api_server.deps import get_message_stream, get_rpc
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSErrorMessage,
WSMessageSchema, WSRequestSchema,
WSWhitelistMessage)
from freqtrade.rpc.rpc import RPC
logger = logging.getLogger(__name__)
# Private router, protected by API Key authentication
router = APIRouter()
async def channel_reader(channel: WebSocketChannel, rpc: RPC):
"""
Iterate over the messages from the channel and process the request
"""
async for message in channel:
try:
await _process_consumer_request(message, channel, rpc)
except FreqtradeException:
logger.exception(f"Error processing request from {channel}")
response = WSErrorMessage(data='Error processing request')
await channel.send(response.dict(exclude_none=True))
async def channel_broadcaster(channel: WebSocketChannel, message_stream: MessageStream):
"""
Iterate over messages in the message stream and send them
"""
async for message, ts in message_stream:
if channel.subscribed_to(message.get('type')):
# Log a warning if this channel is behind
# on the message stream by a lot
if (time.time() - ts) > 60:
logger.warning(f"Channel {channel} is behind MessageStream by 1 minute,"
" this can cause a memory leak if you see this message"
" often, consider reducing pair list size or amount of"
" consumers.")
await channel.send(message, timeout=True)
async def _process_consumer_request(
request: Dict[str, Any],
channel: WebSocketChannel,
rpc: RPC
):
"""
Validate and handle a request from a websocket consumer
"""
# Validate the request, makes sure it matches the schema
try:
websocket_request = WSRequestSchema.model_validate(request)
except ValidationError as e:
logger.error(f"Invalid request from {channel}: {e}")
return
type_, data = websocket_request.type, websocket_request.data
response: WSMessageSchema
logger.debug(f"Request of type {type_} from {channel}")
# If we have a request of type SUBSCRIBE, set the topics in this channel
if type_ == RPCRequestType.SUBSCRIBE:
# If the request is empty, do nothing
if not data:
return
# If all topics passed are a valid RPCMessageType, set subscriptions on channel
if all([any(x.value == topic for x in RPCMessageType) for topic in data]):
channel.set_subscriptions(data)
# We don't send a response for subscriptions
return
elif type_ == RPCRequestType.WHITELIST:
# Get whitelist
whitelist = rpc._ws_request_whitelist()
# Format response
response = WSWhitelistMessage(data=whitelist)
await channel.send(response.model_dump(exclude_none=True))
elif type_ == RPCRequestType.ANALYZED_DF:
# Limit the amount of candles per dataframe to 'limit' or 1500
limit = int(min(data.get('limit', 1500), 1500)) if data else None
pair = data.get('pair', None) if data else None
# For every pair in the generator, send a separate message
for message in rpc._ws_request_analyzed_df(limit, pair):
# Format response
response = WSAnalyzedDFMessage(data=message)
await channel.send(response.model_dump(exclude_none=True))
@router.websocket("/message/ws")
async def message_endpoint(
websocket: WebSocket,
token: str = Depends(validate_ws_token),
rpc: RPC = Depends(get_rpc),
message_stream: MessageStream = Depends(get_message_stream)
):
if token:
async with create_channel(websocket) as channel:
await channel.run_channel_tasks(
channel_reader(channel, rpc),
channel_broadcaster(channel, message_stream)
)

View File

@@ -1,9 +1,13 @@
from typing import Any, Dict, Iterator, Optional
from typing import Any, AsyncIterator, Dict, Optional
from uuid import uuid4
from fastapi import Depends
from fastapi import Depends, HTTPException
from freqtrade.constants import Config
from freqtrade.enums import RunMode
from freqtrade.persistence import Trade
from freqtrade.persistence.models import _request_id_ctx_var
from freqtrade.rpc.api_server.webserver_bgwork import ApiBG
from freqtrade.rpc.rpc import RPC, RPCException
from .webserver import ApiServer
@@ -15,12 +19,19 @@ def get_rpc_optional() -> Optional[RPC]:
return None
def get_rpc() -> Optional[Iterator[RPC]]:
async def get_rpc() -> Optional[AsyncIterator[RPC]]:
_rpc = get_rpc_optional()
if _rpc:
request_id = str(uuid4())
ctx_token = _request_id_ctx_var.set(request_id)
Trade.rollback()
yield _rpc
Trade.rollback()
try:
yield _rpc
finally:
Trade.session.remove()
_request_id_ctx_var.reset(ctx_token)
else:
raise RPCException('Bot is not in the correct state')
@@ -33,15 +44,29 @@ def get_api_config() -> Dict[str, Any]:
return ApiServer._config['api_server']
def _generate_exchange_key(config: Config) -> str:
"""
Exchange key - used for caching the exchange object.
"""
return f"{config['exchange']['name']}_{config.get('trading_mode', 'spot')}"
def get_exchange(config=Depends(get_config)):
if not ApiServer._exchange:
exchange_key = _generate_exchange_key(config)
if not (exchange := ApiBG.exchanges.get(exchange_key)):
from freqtrade.resolvers import ExchangeResolver
ApiServer._exchange = ExchangeResolver.load_exchange(
config['exchange']['name'], config, load_leverage_tiers=False)
return ApiServer._exchange
exchange = ExchangeResolver.load_exchange(
config, validate=False, load_leverage_tiers=False)
ApiBG.exchanges[exchange_key] = exchange
return exchange
def get_message_stream():
return ApiServer._message_stream
def is_webserver_mode(config=Depends(get_config)):
if config['runmode'] != RunMode.WEBSERVER:
raise RPCException('Bot is not in the correct state')
raise HTTPException(status_code=503,
detail='Bot is not in the correct state.')
return None

View File

@@ -55,7 +55,7 @@ class UvicornServer(uvicorn.Server):
@contextlib.contextmanager
def run_in_thread(self):
self.thread = threading.Thread(target=self.run)
self.thread = threading.Thread(target=self.run, name='FTUvicorn')
self.thread.start()
while not self.started:
time.sleep(1e-3)

View File

@@ -30,7 +30,7 @@ async def ui_version():
}
def is_relative_to(path, base) -> bool:
def is_relative_to(path: Path, base: Path) -> bool:
# Helper function simulating behaviour of is_relative_to, which was only added in python 3.9
try:
path.relative_to(base)

View File

@@ -1,6 +1,6 @@
import logging
from ipaddress import IPv4Address
from typing import Any, Dict
from typing import Any, Optional
import orjson
import uvicorn
@@ -8,9 +8,13 @@ from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse
from freqtrade.configuration import running_in_docker
from freqtrade.constants import Config
from freqtrade.exceptions import OperationalException
from freqtrade.rpc.api_server.uvicorn_threaded import UvicornServer
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
from freqtrade.rpc.rpc import RPC, RPCException, RPCHandler
from freqtrade.rpc.rpc_types import RPCSendMsg
logger = logging.getLogger(__name__)
@@ -33,16 +37,10 @@ class ApiServer(RPCHandler):
__initialized = False
_rpc: RPC
# Backtesting type: Backtesting
_bt = None
_bt_data = None
_bt_timerange = None
_bt_last_config: Dict[str, Any] = {}
_has_rpc: bool = False
_bgtask_running: bool = False
_config: Dict[str, Any] = {}
# Exchange - only available in webserver mode.
_exchange = None
_config: Config = {}
# websocket message stuff
_message_stream: Optional[MessageStream] = None
def __new__(cls, *args, **kwargs):
"""
@@ -54,12 +52,13 @@ class ApiServer(RPCHandler):
ApiServer.__initialized = False
return ApiServer.__instance
def __init__(self, config: Dict[str, Any], standalone: bool = False) -> None:
def __init__(self, config: Config, standalone: bool = False) -> None:
ApiServer._config = config
if self.__initialized and (standalone or self._standalone):
return
self._standalone: bool = standalone
self._server = None
ApiServer.__initialized = True
api_config = self._config['api_server']
@@ -70,14 +69,13 @@ class ApiServer(RPCHandler):
default_response_class=FTJSONResponse,
)
self.configure_app(self.app, self._config)
self.start_api()
def add_rpc_handler(self, rpc: RPC):
"""
Attach rpc handler
"""
if not self._has_rpc:
if not ApiServer._has_rpc:
ApiServer._rpc = rpc
ApiServer._has_rpc = True
else:
@@ -90,6 +88,7 @@ class ApiServer(RPCHandler):
del ApiServer._rpc
if self._server and not self._standalone:
logger.info("Stopping API Server")
# self._server.force_exit, self._server.should_exit = True, True
self._server.cleanup()
@classmethod
@@ -100,11 +99,15 @@ class ApiServer(RPCHandler):
cls._has_rpc = False
cls._rpc = None
def send_msg(self, msg: Dict[str, str]) -> None:
pass
def send_msg(self, msg: RPCSendMsg) -> None:
"""
Publish the message to the message stream
"""
if ApiServer._message_stream:
ApiServer._message_stream.publish(msg)
def handle_rpc_exception(self, request, exc):
logger.exception(f"API Error calling: {exc}")
logger.error(f"API Error calling: {exc}")
return JSONResponse(
status_code=502,
content={'error': f"Error querying {request.url.path}: {exc.message}"}
@@ -112,9 +115,12 @@ class ApiServer(RPCHandler):
def configure_app(self, app: FastAPI, config):
from freqtrade.rpc.api_server.api_auth import http_basic_or_jwt_token, router_login
from freqtrade.rpc.api_server.api_background_tasks import router as api_bg_tasks
from freqtrade.rpc.api_server.api_backtest import router as api_backtest
from freqtrade.rpc.api_server.api_v1 import router as api_v1
from freqtrade.rpc.api_server.api_v1 import router_public as api_v1_public
from freqtrade.rpc.api_server.api_ws import router as ws_router
from freqtrade.rpc.api_server.deps import is_webserver_mode
from freqtrade.rpc.api_server.web_ui import router_ui
app.include_router(api_v1_public, prefix="/api/v1")
@@ -123,8 +129,14 @@ class ApiServer(RPCHandler):
dependencies=[Depends(http_basic_or_jwt_token)],
)
app.include_router(api_backtest, prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token)],
dependencies=[Depends(http_basic_or_jwt_token),
Depends(is_webserver_mode)],
)
app.include_router(api_bg_tasks, prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token),
Depends(is_webserver_mode)],
)
app.include_router(ws_router, prefix="/api/v1")
app.include_router(router_login, prefix="/api/v1", tags=["auth"])
# UI Router MUST be last!
app.include_router(router_ui, prefix='')
@@ -138,6 +150,30 @@ class ApiServer(RPCHandler):
)
app.add_exception_handler(RPCException, self.handle_rpc_exception)
app.add_event_handler(
event_type="startup",
func=self._api_startup_event
)
app.add_event_handler(
event_type="shutdown",
func=self._api_shutdown_event
)
async def _api_startup_event(self):
"""
Creates the MessageStream class on startup
so it has access to the same event loop
as uvicorn
"""
if not ApiServer._message_stream:
ApiServer._message_stream = MessageStream()
async def _api_shutdown_event(self):
"""
Removes the MessageStream class on shutdown
"""
if ApiServer._message_stream:
ApiServer._message_stream = None
def start_api(self):
"""
@@ -147,7 +183,7 @@ class ApiServer(RPCHandler):
rest_port = self._config['api_server']['listen_port']
logger.info(f'Starting HTTP Server at {rest_ip}:{rest_port}')
if not IPv4Address(rest_ip).is_loopback:
if not IPv4Address(rest_ip).is_loopback and not running_in_docker():
logger.warning("SECURITY WARNING - Local Rest Server listening to external connections")
logger.warning("SECURITY WARNING - This is insecure please set to your loopback,"
"e.g 127.0.0.1 in config.json")
@@ -170,6 +206,7 @@ class ApiServer(RPCHandler):
use_colors=False,
log_config=None,
access_log=True if verbosity != 'error' else False,
ws_ping_interval=None # We do this explicitly ourselves
)
try:
self._server = UvicornServer(uvconfig)

View File

@@ -0,0 +1,39 @@
from typing import Any, Dict, Literal, Optional, TypedDict
from uuid import uuid4
from freqtrade.exchange.exchange import Exchange
class JobsContainer(TypedDict):
category: Literal['pairlist']
is_running: bool
status: str
progress: Optional[float]
result: Any
error: Optional[str]
class ApiBG:
# Backtesting type: Backtesting
bt: Dict[str, Any] = {
'bt': None,
'data': None,
'timerange': None,
'last_config': {},
'bt_error': None,
}
bgtask_running: bool = False
# Exchange - only available in webserver mode.
exchanges: Dict[str, Exchange] = {}
# Generic background jobs
# TODO: Change this to TTLCache
jobs: Dict[str, JobsContainer] = {}
# Pairlist evaluate things
pairlist_running: bool = False
@staticmethod
def get_job_id() -> str:
return str(uuid4())

View File

@@ -0,0 +1,6 @@
# isort: off
from freqtrade.rpc.api_server.ws.types import WebSocketType # noqa: F401
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy # noqa: F401
from freqtrade.rpc.api_server.ws.serializer import HybridJSONWebSocketSerializer # noqa: F401
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel # noqa: F401
from freqtrade.rpc.api_server.ws.message_stream import MessageStream # noqa: F401

View File

@@ -0,0 +1,243 @@
import asyncio
import logging
import time
from collections import deque
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Deque, Dict, List, Optional, Type, Union
from uuid import uuid4
from fastapi import WebSocketDisconnect
from websockets.exceptions import ConnectionClosed
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer,
WebSocketSerializer)
from freqtrade.rpc.api_server.ws.types import WebSocketType
from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType
logger = logging.getLogger(__name__)
class WebSocketChannel:
"""
Object to help facilitate managing a websocket connection
"""
def __init__(
self,
websocket: WebSocketType,
channel_id: Optional[str] = None,
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer,
send_throttle: float = 0.01
):
self.channel_id = channel_id if channel_id else uuid4().hex[:8]
self._websocket = WebSocketProxy(websocket)
# Internal event to signify a closed websocket
self._closed = asyncio.Event()
# The async tasks created for the channel
self._channel_tasks: List[asyncio.Task] = []
# Deque for average send times
self._send_times: Deque[float] = deque([], maxlen=10)
# High limit defaults to 3 to start
self._send_high_limit = 3
self._send_throttle = send_throttle
# The subscribed message types
self._subscriptions: List[str] = []
# Wrap the WebSocket in the Serializing class
self._wrapped_ws = serializer_cls(self._websocket)
def __repr__(self):
return f"WebSocketChannel({self.channel_id}, {self.remote_addr})"
@property
def raw_websocket(self):
return self._websocket.raw_websocket
@property
def remote_addr(self):
return self._websocket.remote_addr
@property
def avg_send_time(self):
return sum(self._send_times) / len(self._send_times)
def _calc_send_limit(self):
"""
Calculate the send high limit for this channel
"""
# Only update if we have enough data
if len(self._send_times) == self._send_times.maxlen:
# At least 1s or twice the average of send times, with a
# maximum of 3 seconds per message
self._send_high_limit = min(max(self.avg_send_time * 2, 1), 3)
async def send(
self,
message: Union[WSMessageSchemaType, Dict[str, Any]],
timeout: bool = False
):
"""
Send a message on the wrapped websocket. If the sending
takes too long, it will raise a TimeoutError and
disconnect the connection.
:param message: The message to send
:param timeout: Enforce send high limit, defaults to False
"""
try:
_ = time.time()
# If the send times out, it will raise
# a TimeoutError and bubble up to the
# message_endpoint to close the connection
await asyncio.wait_for(
self._wrapped_ws.send(message),
timeout=self._send_high_limit if timeout else None
)
total_time = time.time() - _
self._send_times.append(total_time)
self._calc_send_limit()
except asyncio.TimeoutError:
logger.info(f"Connection for {self} timed out, disconnecting")
raise
# Explicitly give control back to event loop as
# websockets.send does not
# Also throttles how fast we send
await asyncio.sleep(self._send_throttle)
async def recv(self):
"""
Receive a message on the wrapped websocket
"""
return await self._wrapped_ws.recv()
async def ping(self):
"""
Ping the websocket
"""
return await self._websocket.ping()
async def accept(self):
"""
Accept the underlying websocket connection,
if the connection has been closed before we can
accept, just close the channel.
"""
try:
return await self._websocket.accept()
except RuntimeError:
await self.close()
async def close(self):
"""
Close the WebSocketChannel
"""
self._closed.set()
try:
await self._websocket.close()
except RuntimeError:
pass
def is_closed(self) -> bool:
"""
Closed flag
"""
return self._closed.is_set()
def set_subscriptions(self, subscriptions: List[str] = []) -> None:
"""
Set which subscriptions this channel is subscribed to
:param subscriptions: List of subscriptions, List[str]
"""
self._subscriptions = subscriptions
def subscribed_to(self, message_type: str) -> bool:
"""
Check if this channel is subscribed to the message_type
:param message_type: The message type to check
"""
return message_type in self._subscriptions
async def run_channel_tasks(self, *tasks, **kwargs):
"""
Create and await on the channel tasks unless an exception
was raised, then cancel them all.
:params *tasks: All coros or tasks to be run concurrently
:param **kwargs: Any extra kwargs to pass to gather
"""
if not self.is_closed():
# Wrap the coros into tasks if they aren't already
self._channel_tasks = [
task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
for task in tasks
]
try:
return await asyncio.gather(*self._channel_tasks, **kwargs)
except Exception:
# If an exception occurred, cancel the rest of the tasks
await self.cancel_channel_tasks()
async def cancel_channel_tasks(self):
"""
Cancel and wait on all channel tasks
"""
for task in self._channel_tasks:
task.cancel()
# Wait for tasks to finish cancelling
try:
await task
except (
asyncio.CancelledError,
asyncio.TimeoutError,
WebSocketDisconnect,
ConnectionClosed,
RuntimeError
):
pass
except Exception as e:
logger.info(f"Encountered unknown exception: {e}", exc_info=e)
self._channel_tasks = []
async def __aiter__(self):
"""
Generator for received messages
"""
# We can not catch any errors here as websocket.recv is
# the first to catch any disconnects and bubble it up
# so the connection is garbage collected right away
while not self.is_closed():
yield await self.recv()
@asynccontextmanager
async def create_channel(
websocket: WebSocketType,
**kwargs
) -> AsyncIterator[WebSocketChannel]:
"""
Context manager for safely opening and closing a WebSocketChannel
"""
channel = WebSocketChannel(websocket, **kwargs)
try:
await channel.accept()
logger.info(f"Connected to channel - {channel}")
yield channel
finally:
await channel.close()
logger.info(f"Disconnected from channel - {channel}")

View File

@@ -0,0 +1,31 @@
import asyncio
import time
class MessageStream:
"""
A message stream for consumers to subscribe to,
and for producers to publish to.
"""
def __init__(self):
self._loop = asyncio.get_running_loop()
self._waiter = self._loop.create_future()
def publish(self, message):
"""
Publish a message to this MessageStream
:param message: The message to publish
"""
waiter, self._waiter = self._waiter, self._loop.create_future()
waiter.set_result((message, time.time(), self._waiter))
async def __aiter__(self):
"""
Iterate over the messages in the message stream
"""
waiter = self._waiter
while True:
# Shield the future from being cancelled by a task waiting on it
message, ts, waiter = await asyncio.shield(waiter)
yield message, ts

View File

@@ -0,0 +1,73 @@
from typing import Any, Tuple, Union
from fastapi import WebSocket as FastAPIWebSocket
from websockets.client import WebSocketClientProtocol as WebSocket
from freqtrade.rpc.api_server.ws.types import WebSocketType
class WebSocketProxy:
"""
WebSocketProxy object to bring the FastAPIWebSocket and websockets.WebSocketClientProtocol
under the same API
"""
def __init__(self, websocket: WebSocketType):
self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket
@property
def raw_websocket(self):
return self._websocket
@property
def remote_addr(self) -> Tuple[Any, ...]:
if isinstance(self._websocket, WebSocket):
return self._websocket.remote_address
elif isinstance(self._websocket, FastAPIWebSocket):
if self._websocket.client:
client, port = self._websocket.client.host, self._websocket.client.port
return (client, port)
return ("unknown", 0)
async def send(self, data):
"""
Send data on the wrapped websocket
"""
if hasattr(self._websocket, "send_text"):
await self._websocket.send_text(data)
else:
await self._websocket.send(data)
async def recv(self):
"""
Receive data on the wrapped websocket
"""
if hasattr(self._websocket, "receive_text"):
return await self._websocket.receive_text()
else:
return await self._websocket.recv()
async def ping(self):
"""
Ping the websocket, not supported by FastAPI WebSockets
"""
if hasattr(self._websocket, "ping"):
return await self._websocket.ping()
return False
async def close(self, code: int = 1000):
"""
Close the websocket connection, only supported by FastAPI WebSockets
"""
if hasattr(self._websocket, "close"):
try:
return await self._websocket.close(code)
except RuntimeError:
pass
async def accept(self):
"""
Accept the WebSocket connection, only support by FastAPI WebSockets
"""
if hasattr(self._websocket, "accept"):
return await self._websocket.accept()

View File

@@ -0,0 +1,60 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Union
import orjson
import rapidjson
from pandas import DataFrame
from freqtrade.misc import dataframe_to_json, json_to_dataframe
from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy
from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType
logger = logging.getLogger(__name__)
class WebSocketSerializer(ABC):
def __init__(self, websocket: WebSocketProxy):
self._websocket: WebSocketProxy = websocket
@abstractmethod
def _serialize(self, data):
raise NotImplementedError()
@abstractmethod
def _deserialize(self, data):
raise NotImplementedError()
async def send(self, data: Union[WSMessageSchemaType, Dict[str, Any]]):
await self._websocket.send(self._serialize(data))
async def recv(self) -> bytes:
data = await self._websocket.recv()
return self._deserialize(data)
class HybridJSONWebSocketSerializer(WebSocketSerializer):
def _serialize(self, data) -> str:
return str(orjson.dumps(data, default=_json_default), "utf-8")
def _deserialize(self, data: str):
# RapidJSON expects strings
return rapidjson.loads(data, object_hook=_json_object_hook)
# Support serializing pandas DataFrames
def _json_default(z):
if isinstance(z, DataFrame):
return {
'__type__': 'dataframe',
'__value__': dataframe_to_json(z)
}
raise TypeError
# Support deserializing JSON to pandas DataFrames
def _json_object_hook(z):
if z.get('__type__') == 'dataframe':
return json_to_dataframe(z.get('__value__'))
return z

View File

@@ -0,0 +1,8 @@
from typing import Any, Dict, TypeVar
from fastapi import WebSocket as FastAPIWebSocket
from websockets.client import WebSocketClientProtocol as WebSocket
WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket)
MessageType = Dict[str, Any]

View File

@@ -0,0 +1,71 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, TypedDict
from pandas import DataFrame
from pydantic import BaseModel, ConfigDict
from freqtrade.constants import PairWithTimeframe
from freqtrade.enums import RPCMessageType, RPCRequestType
class BaseArbitraryModel(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
class WSRequestSchema(BaseArbitraryModel):
type: RPCRequestType
data: Optional[Any] = None
class WSMessageSchemaType(TypedDict):
# Type for typing to avoid doing pydantic typechecks.
type: RPCMessageType
data: Optional[Dict[str, Any]]
class WSMessageSchema(BaseArbitraryModel):
type: RPCMessageType
data: Optional[Any] = None
model_config = ConfigDict(extra='allow')
# ------------------------------ REQUEST SCHEMAS ----------------------------
class WSSubscribeRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.SUBSCRIBE
data: List[RPCMessageType]
class WSWhitelistRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.WHITELIST
data: None = None
class WSAnalyzedDFRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.ANALYZED_DF
data: Dict[str, Any] = {"limit": 1500, "pair": None}
# ------------------------------ MESSAGE SCHEMAS ----------------------------
class WSWhitelistMessage(WSMessageSchema):
type: RPCMessageType = RPCMessageType.WHITELIST
data: List[str]
class WSAnalyzedDFMessage(WSMessageSchema):
class AnalyzedDFData(BaseArbitraryModel):
key: PairWithTimeframe
df: DataFrame
la: datetime
type: RPCMessageType = RPCMessageType.ANALYZED_DF
data: AnalyzedDFData
class WSErrorMessage(WSMessageSchema):
type: RPCMessageType = RPCMessageType.EXCEPTION
data: str
# --------------------------------------------------------------------------

View File

@@ -1,7 +1,7 @@
import logging
from typing import Any, Dict
from freqtrade.enums.rpcmessagetype import RPCMessageType
from freqtrade.constants import Config
from freqtrade.enums import RPCMessageType
from freqtrade.rpc import RPC
from freqtrade.rpc.webhook import Webhook
@@ -10,17 +10,18 @@ logger = logging.getLogger(__name__)
class Discord(Webhook):
def __init__(self, rpc: 'RPC', config: Dict[str, Any]):
# super().__init__(rpc, config)
def __init__(self, rpc: 'RPC', config: Config):
self._config = config
self.rpc = rpc
self.config = config
self.strategy = config.get('strategy', '')
self.timeframe = config.get('timeframe', '')
self.bot_name = config.get('bot_name', '')
self._url = self.config['discord']['webhook_url']
self._url = config['discord']['webhook_url']
self._format = 'json'
self._retries = 1
self._retry_delay = 0.1
self._timeout = self._config['discord'].get('timeout', 10)
def cleanup(self) -> None:
"""
@@ -30,20 +31,22 @@ class Discord(Webhook):
pass
def send_msg(self, msg) -> None:
logger.info(f"Sending discord message: {msg}")
if msg['type'].value in self.config['discord']:
if (fields := self._config['discord'].get(msg['type'].value)):
logger.info(f"Sending discord message: {msg}")
msg['strategy'] = self.strategy
msg['timeframe'] = self.timeframe
fields = self.config['discord'].get(msg['type'].value)
msg['bot_name'] = self.bot_name
color = 0x0000FF
if msg['type'] in (RPCMessageType.EXIT, RPCMessageType.EXIT_FILL):
profit_ratio = msg.get('profit_ratio')
color = (0x00FF00 if profit_ratio > 0 else 0xFF0000)
title = msg['type'].value
if 'pair' in msg:
title = f"Trade: {msg['pair']} {msg['type'].value}"
embeds = [{
'title': f"Trade: {msg['pair']} {msg['type'].value}",
'title': title,
'color': color,
'fields': [],
@@ -51,7 +54,7 @@ class Discord(Webhook):
for f in fields:
for k, v in f.items():
v = v.format(**msg)
embeds[0]['fields'].append( # type: ignore
embeds[0]['fields'].append(
{'name': k, 'value': v, 'inline': True})
# Send the message to discord channel

View File

@@ -0,0 +1,411 @@
"""
ExternalMessageConsumer module
Main purpose is to connect to external bot's message websocket to consume data
from it
"""
import asyncio
import logging
import socket
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union
import websockets
from pydantic import ValidationError
from freqtrade.constants import FULL_DATAFRAME_THRESHOLD
from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import RPCMessageType
from freqtrade.misc import remove_entry_exit_signals
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest,
WSMessageSchema, WSRequestSchema,
WSSubscribeRequest, WSWhitelistMessage,
WSWhitelistRequest)
if TYPE_CHECKING:
import websockets.connect
class Producer(TypedDict):
name: str
host: str
port: int
secure: bool
ws_token: str
logger = logging.getLogger(__name__)
def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]):
return schema.model_dump(exclude_none=True)
class ExternalMessageConsumer:
"""
The main controller class for consuming external messages from
other freqtrade bot's
"""
def __init__(
self,
config: Dict[str, Any],
dataprovider: DataProvider
):
self._config = config
self._dp = dataprovider
self._running = False
self._thread = None
self._loop = None
self._main_task = None
self._sub_tasks = None
self._emc_config = self._config.get('external_message_consumer', {})
self.enabled = self._emc_config.get('enabled', False)
self.producers: List[Producer] = self._emc_config.get('producers', [])
self.wait_timeout = self._emc_config.get('wait_timeout', 30) # in seconds
self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds
self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds
# The amount of candles per dataframe on the initial request
self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 1500)
# Message size limit, in megabytes. Default 8mb, Use bitwise operator << 20 to convert
# as the websockets client expects bytes.
self.message_size_limit = (self._emc_config.get('message_size_limit', 8) << 20)
# Setting these explicitly as they probably shouldn't be changed by a user
# Unless we somehow integrate this with the strategy to allow creating
# callbacks for the messages
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
# Allow setting data for each initial request
self._initial_requests: List[WSRequestSchema] = [
WSSubscribeRequest(data=self.topics),
WSWhitelistRequest(),
WSAnalyzedDFRequest()
]
# Specify which function to use for which RPCMessageType
self._message_handlers: Dict[str, Callable[[str, WSMessageSchema], None]] = {
RPCMessageType.WHITELIST: self._consume_whitelist_message,
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
self._channel_streams: Dict[str, MessageStream] = {}
self.start()
def start(self):
"""
Start the main internal loop in another thread to run coroutines
"""
if self._thread and self._loop:
return
logger.info("Starting ExternalMessageConsumer")
self._loop = asyncio.new_event_loop()
self._thread = Thread(target=self._loop.run_forever)
self._running = True
self._thread.start()
self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop)
def shutdown(self):
"""
Shutdown the loop, thread, and tasks
"""
if self._thread and self._loop:
logger.info("Stopping ExternalMessageConsumer")
self._running = False
self._channel_streams = {}
if self._sub_tasks:
# Cancel sub tasks
for task in self._sub_tasks:
task.cancel()
if self._main_task:
# Cancel the main task
self._main_task.cancel()
self._thread.join()
self._thread = None
self._loop = None
self._sub_tasks = None
self._main_task = None
async def _main(self):
"""
The main task coroutine
"""
lock = asyncio.Lock()
try:
# Create a connection to each producer
self._sub_tasks = [
self._loop.create_task(self._handle_producer_connection(producer, lock))
for producer in self.producers
]
await asyncio.gather(*self._sub_tasks)
except asyncio.CancelledError:
pass
finally:
# Stop the loop once we are done
self._loop.stop()
async def _handle_producer_connection(self, producer: Producer, lock: asyncio.Lock):
"""
Main connection loop for the consumer
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
try:
await self._create_connection(producer, lock)
except asyncio.CancelledError:
# Exit silently
pass
async def _create_connection(self, producer: Producer, lock: asyncio.Lock):
"""
Actually creates and handles the websocket connection, pinging on timeout
and handling connection errors.
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
while self._running:
try:
host, port = producer['host'], producer['port']
token = producer['ws_token']
name = producer['name']
scheme = 'wss' if producer.get('secure', False) else 'ws'
ws_url = f"{scheme}://{host}:{port}/api/v1/message/ws?token={token}"
# This will raise InvalidURI if the url is bad
async with websockets.connect(
ws_url,
max_size=self.message_size_limit,
ping_interval=None
) as ws:
async with create_channel(
ws,
channel_id=name,
send_throttle=0.5
) as channel:
# Create the message stream for this channel
self._channel_streams[name] = MessageStream()
# Run the channel tasks while connected
await channel.run_channel_tasks(
self._receive_messages(channel, producer, lock),
self._send_requests(channel, self._channel_streams[name])
)
except (websockets.exceptions.InvalidURI, ValueError) as e:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break
except (
socket.gaierror,
ConnectionRefusedError,
websockets.exceptions.InvalidStatusCode,
websockets.exceptions.InvalidMessage
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
continue
except (
websockets.exceptions.ConnectionClosedError,
websockets.exceptions.ConnectionClosedOK
):
# Just keep trying to connect again indefinitely
await asyncio.sleep(self.sleep_time)
continue
except Exception as e:
# An unforseen error has occurred, log and continue
logger.error("Unexpected error has occurred:")
logger.exception(e)
await asyncio.sleep(self.sleep_time)
continue
async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream):
# Send the initial requests
for init_request in self._initial_requests:
await channel.send(schema_to_dict(init_request))
# Now send any subsequent requests published to
# this channel's stream
async for request, _ in channel_stream:
logger.debug(f"Sending request to channel - {channel} - {request}")
await channel.send(request)
async def _receive_messages(
self,
channel: WebSocketChannel,
producer: Producer,
lock: asyncio.Lock
):
"""
Loop to handle receiving messages from a Producer
:param channel: The WebSocketChannel object for the WebSocket
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
while self._running:
try:
message = await asyncio.wait_for(
channel.recv(),
timeout=self.wait_timeout
)
try:
async with lock:
# Handle the message
self.handle_producer_message(producer, message)
except Exception as e:
logger.exception(f"Error handling producer message: {e}")
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
pong = await channel.ping()
latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000)
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")
continue
except Exception as e:
# Just eat the error and continue reconnecting
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
logger.debug(e, exc_info=e)
raise
def send_producer_request(
self,
producer_name: str,
request: Union[WSRequestSchema, Dict[str, Any]]
):
"""
Publish a message to the producer's message stream to be
sent by the channel task.
:param producer_name: The name of the producer to publish the message to
:param request: The request to send to the producer
"""
if isinstance(request, WSRequestSchema):
request = schema_to_dict(request)
if channel_stream := self._channel_streams.get(producer_name):
channel_stream.publish(request)
def handle_producer_message(self, producer: Producer, message: Dict[str, Any]):
"""
Handles external messages from a Producer
"""
producer_name = producer.get('name', 'default')
try:
producer_message = WSMessageSchema.model_validate(message)
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
if not producer_message.data:
logger.error(f"Empty message received from `{producer_name}`")
return
logger.debug(f"Received message of type `{producer_message.type}` from `{producer_name}`")
message_handler = self._message_handlers.get(producer_message.type)
if not message_handler:
logger.info(f"Received unhandled message: `{producer_message.data}`, ignoring...")
return
message_handler(producer_name, producer_message)
def _consume_whitelist_message(self, producer_name: str, message: WSMessageSchema):
try:
# Validate the message
whitelist_message = WSWhitelistMessage.model_validate(message.model_dump())
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
# Add the pairlist data to the DataProvider
self._dp._set_producer_pairs(whitelist_message.data, producer_name=producer_name)
logger.debug(f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`")
def _consume_analyzed_df_message(self, producer_name: str, message: WSMessageSchema):
try:
df_message = WSAnalyzedDFMessage.model_validate(message.model_dump())
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
key = df_message.data.key
df = df_message.data.df
la = df_message.data.la
pair, timeframe, candle_type = key
if df.empty:
logger.debug(f"Received Empty Dataframe for {key}")
return
# If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False):
df = remove_entry_exit_signals(df)
logger.debug(f"Received {len(df)} candle(s) for {key}")
did_append, n_missing = self._dp._add_external_df(
pair,
df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
if not did_append:
# We want an overlap in candles incase some data has changed
n_missing += 1
# Set to None for all candles if we missed a full df's worth of candles
n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
logger.warning(f"Holes in data or no existing df, requesting {n_missing} candles "
f"for {key} from `{producer_name}`")
self.send_producer_request(
producer_name,
WSAnalyzedDFRequest(
data={
"limit": n_missing,
"pair": pair
}
)
)
return
logger.debug(
f"Consumed message from `{producer_name}` "
f"of type `RPCMessageType.ANALYZED_DF` for {key}")

View File

@@ -3,8 +3,8 @@ Module that define classes to convert Crypto-currency to FIAT
e.g BTC to USD
"""
import datetime
import logging
from datetime import datetime
from typing import Dict, List
from cachetools import TTLCache
@@ -25,6 +25,10 @@ coingecko_mapping = {
'bnb': 'binancecoin',
'sol': 'solana',
'usdt': 'tether',
'busd': 'binance-usd',
'tusd': 'true-usd',
'usdc': 'usd-coin',
'btc': 'bitcoin'
}
@@ -46,7 +50,9 @@ class CryptoToFiatConverter(LoggingMixin):
if CryptoToFiatConverter.__instance is None:
CryptoToFiatConverter.__instance = object.__new__(cls)
try:
CryptoToFiatConverter._coingekko = CoinGeckoAPI()
# Limit retires to 1 (0 and 1)
# otherwise we risk bot impact if coingecko is down.
CryptoToFiatConverter._coingekko = CoinGeckoAPI(retries=1)
except BaseException:
CryptoToFiatConverter._coingekko = None
return CryptoToFiatConverter.__instance
@@ -67,7 +73,7 @@ class CryptoToFiatConverter(LoggingMixin):
logger.warning(
"Too many requests for CoinGecko API, backing off and trying again later.")
# Set backoff timestamp to 60 seconds in the future
self._backoff = datetime.datetime.now().timestamp() + 60
self._backoff = datetime.now().timestamp() + 60
return
# If the request is not a 429 error we want to raise the normal error
logger.error(
@@ -81,7 +87,7 @@ class CryptoToFiatConverter(LoggingMixin):
def _get_gekko_id(self, crypto_symbol):
if not self._coinlistings:
if self._backoff <= datetime.datetime.now().timestamp():
if self._backoff <= datetime.now().timestamp():
self._load_cryptomap()
# Still not loaded.
if not self._coinlistings:

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,12 @@ This module contains class to manage RPC communications (Telegram, API, ...)
"""
import logging
from collections import deque
from typing import Any, Dict, List
from typing import List
from freqtrade.enums import RPCMessageType
from freqtrade.constants import Config
from freqtrade.enums import NO_ECHO_MESSAGES, RPCMessageType
from freqtrade.rpc import RPC, RPCHandler
from freqtrade.rpc.rpc_types import RPCSendMsg
logger = logging.getLogger(__name__)
@@ -57,7 +59,7 @@ class RPCManager:
mod.cleanup()
del mod
def send_msg(self, msg: Dict[str, Any]) -> None:
def send_msg(self, msg: RPCSendMsg) -> None:
"""
Send given message to all registered rpc modules.
A message consists of one or more key value pairs of strings.
@@ -66,17 +68,16 @@ class RPCManager:
'status': 'stopping bot'
}
"""
logger.info('Sending rpc message: %s', msg)
if 'pair' in msg:
msg.update({
'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair'])
})
if msg.get('type') not in NO_ECHO_MESSAGES:
logger.info('Sending rpc message: %s', msg)
for mod in self.registered_modules:
logger.debug('Forwarding message to rpc.%s', mod.name)
try:
mod.send_msg(msg)
except NotImplementedError:
logger.error(f"Message type '{msg['type']}' not implemented by handler {mod.name}.")
except Exception:
logger.exception('Exception occurred within RPC module %s', mod.name)
def process_msg_queue(self, queue: deque) -> None:
"""
@@ -84,12 +85,15 @@ class RPCManager:
"""
while queue:
msg = queue.popleft()
self.send_msg({
'type': RPCMessageType.STRATEGY_MSG,
'msg': msg,
})
logger.info('Sending rpc strategy_msg: %s', msg)
for mod in self.registered_modules:
if mod._config.get(mod.name, {}).get('allow_custom_messages', False):
mod.send_msg({
'type': RPCMessageType.STRATEGY_MSG,
'msg': msg,
})
def startup_messages(self, config: Dict[str, Any], pairlist, protections) -> None:
def startup_messages(self, config: Config, pairlist, protections) -> None:
if config['dry_run']:
self.send_msg({
'type': RPCMessageType.WARNING,

135
freqtrade/rpc/rpc_types.py Normal file
View File

@@ -0,0 +1,135 @@
from datetime import datetime
from typing import Any, List, Literal, Optional, TypedDict, Union
from freqtrade.constants import PairWithTimeframe
from freqtrade.enums import RPCMessageType
ProfitLossStr = Literal["profit", "loss"]
class RPCSendMsgBase(TypedDict):
pass
# ty1pe: Literal[RPCMessageType]
class RPCStatusMsg(RPCSendMsgBase):
"""Used for Status, Startup and Warning messages"""
type: Literal[RPCMessageType.STATUS, RPCMessageType.STARTUP, RPCMessageType.WARNING]
status: str
class RPCStrategyMsg(RPCSendMsgBase):
"""Used for Status, Startup and Warning messages"""
type: Literal[RPCMessageType.STRATEGY_MSG]
msg: str
class RPCProtectionMsg(RPCSendMsgBase):
type: Literal[RPCMessageType.PROTECTION_TRIGGER, RPCMessageType.PROTECTION_TRIGGER_GLOBAL]
id: int
pair: str
base_currency: Optional[str]
lock_time: str
lock_timestamp: int
lock_end_time: str
lock_end_timestamp: int
reason: str
side: str
active: bool
class RPCWhitelistMsg(RPCSendMsgBase):
type: Literal[RPCMessageType.WHITELIST]
data: List[str]
class __RPCEntryExitMsgBase(RPCSendMsgBase):
trade_id: int
buy_tag: Optional[str]
enter_tag: Optional[str]
exchange: str
pair: str
base_currency: str
quote_currency: str
leverage: Optional[float]
direction: str
limit: float
open_rate: float
order_type: str
stake_amount: float
stake_currency: str
fiat_currency: Optional[str]
amount: float
open_date: datetime
current_rate: Optional[float]
sub_trade: bool
class RPCEntryMsg(__RPCEntryExitMsgBase):
type: Literal[RPCMessageType.ENTRY, RPCMessageType.ENTRY_FILL]
class RPCCancelMsg(__RPCEntryExitMsgBase):
type: Literal[RPCMessageType.ENTRY_CANCEL]
reason: str
class RPCExitMsg(__RPCEntryExitMsgBase):
type: Literal[RPCMessageType.EXIT, RPCMessageType.EXIT_FILL]
cumulative_profit: float
gain: ProfitLossStr
close_rate: float
profit_amount: float
profit_ratio: float
exit_reason: Optional[str]
close_date: datetime
# current_rate: Optional[float]
order_rate: Optional[float]
final_profit_ratio: Optional[float]
is_final_exit: bool
class RPCExitCancelMsg(__RPCEntryExitMsgBase):
type: Literal[RPCMessageType.EXIT_CANCEL]
reason: str
gain: ProfitLossStr
profit_amount: float
profit_ratio: float
exit_reason: Optional[str]
close_date: datetime
class _AnalyzedDFData(TypedDict):
key: PairWithTimeframe
df: Any
la: datetime
class RPCAnalyzedDFMsg(RPCSendMsgBase):
"""New Analyzed dataframe message"""
type: Literal[RPCMessageType.ANALYZED_DF]
data: _AnalyzedDFData
class RPCNewCandleMsg(RPCSendMsgBase):
"""New candle ping message, issued once per new candle/pair"""
type: Literal[RPCMessageType.NEW_CANDLE]
data: PairWithTimeframe
RPCOrderMsg = Union[RPCEntryMsg, RPCExitMsg, RPCExitCancelMsg, RPCCancelMsg]
RPCSendMsg = Union[
RPCStatusMsg,
RPCStrategyMsg,
RPCProtectionMsg,
RPCWhitelistMsg,
RPCEntryMsg,
RPCCancelMsg,
RPCExitMsg,
RPCExitCancelMsg,
RPCAnalyzedDFMsg,
RPCNewCandleMsg
]

File diff suppressed because it is too large Load Diff

View File

@@ -3,12 +3,14 @@ This module manages webhook communication
"""
import logging
import time
from typing import Any, Dict
from typing import Any, Dict, Optional
from requests import RequestException, post
from freqtrade.constants import Config
from freqtrade.enums import RPCMessageType
from freqtrade.rpc import RPC, RPCHandler
from freqtrade.rpc.rpc_types import RPCSendMsg
logger = logging.getLogger(__name__)
@@ -19,7 +21,7 @@ logger.debug('Included module rpc.webhook ...')
class Webhook(RPCHandler):
""" This class handles all webhook communication """
def __init__(self, rpc: RPC, config: Dict[str, Any]) -> None:
def __init__(self, rpc: RPC, config: Config) -> None:
"""
Init the Webhook class, and init the super class RPCHandler
:param rpc: instance of RPC Helper class
@@ -32,6 +34,7 @@ class Webhook(RPCHandler):
self._format = self._config['webhook'].get('format', 'form')
self._retries = self._config['webhook'].get('retries', 0)
self._retry_delay = self._config['webhook'].get('retry_delay', 0.1)
self._timeout = self._config['webhook'].get('timeout', 10)
def cleanup(self) -> None:
"""
@@ -40,30 +43,48 @@ class Webhook(RPCHandler):
"""
pass
def send_msg(self, msg: Dict[str, Any]) -> None:
def _get_value_dict(self, msg: RPCSendMsg) -> Optional[Dict[str, Any]]:
whconfig = self._config['webhook']
if msg['type'].value in whconfig:
# Explicit types should have priority
valuedict = whconfig.get(msg['type'].value)
# Deprecated 2022.10 - only keep generic method.
elif msg['type'] in [RPCMessageType.ENTRY]:
valuedict = whconfig.get('webhookentry')
elif msg['type'] in [RPCMessageType.ENTRY_CANCEL]:
valuedict = whconfig.get('webhookentrycancel')
elif msg['type'] in [RPCMessageType.ENTRY_FILL]:
valuedict = whconfig.get('webhookentryfill')
elif msg['type'] == RPCMessageType.EXIT:
valuedict = whconfig.get('webhookexit')
elif msg['type'] == RPCMessageType.EXIT_FILL:
valuedict = whconfig.get('webhookexitfill')
elif msg['type'] == RPCMessageType.EXIT_CANCEL:
valuedict = whconfig.get('webhookexitcancel')
elif msg['type'] in (RPCMessageType.STATUS,
RPCMessageType.STARTUP,
RPCMessageType.EXCEPTION,
RPCMessageType.WARNING):
valuedict = whconfig.get('webhookstatus')
elif msg['type'] in (
RPCMessageType.PROTECTION_TRIGGER,
RPCMessageType.PROTECTION_TRIGGER_GLOBAL,
RPCMessageType.WHITELIST,
RPCMessageType.ANALYZED_DF,
RPCMessageType.NEW_CANDLE,
RPCMessageType.STRATEGY_MSG):
# Don't fail for non-implemented types
return None
return valuedict
def send_msg(self, msg: RPCSendMsg) -> None:
""" Send a message to telegram channel """
try:
whconfig = self._config['webhook']
if msg['type'] in [RPCMessageType.ENTRY]:
valuedict = whconfig.get('webhookentry')
elif msg['type'] in [RPCMessageType.ENTRY_CANCEL]:
valuedict = whconfig.get('webhookentrycancel')
elif msg['type'] in [RPCMessageType.ENTRY_FILL]:
valuedict = whconfig.get('webhookentryfill')
elif msg['type'] == RPCMessageType.EXIT:
valuedict = whconfig.get('webhookexit')
elif msg['type'] == RPCMessageType.EXIT_FILL:
valuedict = whconfig.get('webhookexitfill')
elif msg['type'] == RPCMessageType.EXIT_CANCEL:
valuedict = whconfig.get('webhookexitcancel')
elif msg['type'] in (RPCMessageType.STATUS,
RPCMessageType.STARTUP,
RPCMessageType.WARNING):
valuedict = whconfig.get('webhookstatus')
else:
raise NotImplementedError('Unknown message type: {}'.format(msg['type']))
valuedict = self._get_value_dict(msg)
if not valuedict:
logger.info("Message type '%s' not configured for webhooks", msg['type'])
logger.debug("Message type '%s' not configured for webhooks", msg['type'])
return
payload = {key: value.format(**msg) for (key, value) in valuedict.items()}
@@ -87,14 +108,15 @@ class Webhook(RPCHandler):
try:
if self._format == 'form':
response = post(self._url, data=payload)
response = post(self._url, data=payload, timeout=self._timeout)
elif self._format == 'json':
response = post(self._url, json=payload)
response = post(self._url, json=payload, timeout=self._timeout)
elif self._format == 'raw':
response = post(self._url, data=payload['data'],
headers={'Content-Type': 'text/plain'})
headers={'Content-Type': 'text/plain'},
timeout=self._timeout)
else:
raise NotImplementedError('Unknown format: {}'.format(self._format))
raise NotImplementedError(f'Unknown format: {self._format}')
# Throw a RequestException if the post was not successful
response.raise_for_status()