Feat: Agent Token or Request Limiting (#2041)

* Update routes.py, added token and request limits to create/update agent operations

* added usage limit check to api endpoints

cannot create agents with usage limit right now that will be implemented

* implemented api limiting as either token limiting or request limiting modes

* minor typo & bug fix
This commit is contained in:
Ali Arda Fincan
2025-10-13 21:32:46 +03:00
committed by GitHub
parent 72bc24a490
commit ce32dd2907
9 changed files with 352 additions and 5 deletions

View File

@@ -72,6 +72,9 @@ class AnswerResource(Resource, BaseAnswerResource):
agent = processor.create_agent()
retriever = processor.create_retriever()
if error := self.check_usage(processor.agent_config):
return error
stream = self.complete_stream(
question=data["question"],
agent=agent,

View File

@@ -3,7 +3,7 @@ import json
import logging
from typing import Any, Dict, Generator, List, Optional
from flask import Response
from flask import Response, make_response, jsonify
from flask_restx import Namespace
from application.api.answer.services.conversation_service import ConversationService
@@ -25,6 +25,7 @@ class BaseAnswerResource:
def __init__(self):
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
self.db = db
self.user_logs_collection = db["user_logs"]
self.gpt_model = get_gpt_model()
self.conversation_service = ConversationService()
@@ -40,6 +41,88 @@ class BaseAnswerResource:
return missing_fields
return None
def check_usage(
self, agent_config: Dict
) -> Optional[Response]:
"""Check if there is a usage limit and if it is exceeded
Args:
agent_config: The config dict of agent instance
Returns:
None or Response if either of limits exceeded.
"""
api_key = agent_config.get("user_api_key")
if not api_key:
return None
agents_collection = self.db["agents"]
agent = agents_collection.find_one({"key": api_key})
if not agent:
return make_response(
jsonify(
{
"success": False,
"message": "Invalid API key."
}
),
401
)
limited_token_mode = agent.get("limited_token_mode", False)
limited_request_mode = agent.get("limited_request_mode", False)
token_limit = int(agent.get("token_limit", settings.DEFAULT_AGENT_LIMITS["token_limit"]))
request_limit = int(agent.get("request_limit", settings.DEFAULT_AGENT_LIMITS["request_limit"]))
token_usage_collection = self.db["token_usage"]
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(hours=24)
match_query = {
"timestamp": {"$gte": start_date, "$lte": end_date},
"api_key": api_key
}
if limited_token_mode:
token_pipeline = [
{"$match": match_query},
{
"$group": {
"_id": None,
"total_tokens": {"$sum": {"$add": ["$prompt_tokens", "$generated_tokens"]}}
}
}
]
token_result = list(token_usage_collection.aggregate(token_pipeline))
daily_token_usage = token_result[0]["total_tokens"] if token_result else 0
else:
daily_token_usage = 0
if limited_request_mode:
daily_request_usage = token_usage_collection.count_documents(match_query)
else:
daily_request_usage = 0
if not limited_token_mode and not limited_request_mode:
return None
elif limited_token_mode and token_limit > daily_token_usage:
return None
elif limited_request_mode and request_limit > daily_request_usage:
return None
return make_response(
jsonify(
{
"success": False,
"message": "Exceeding usage limit, please try again later."
}
),
429, # too many requests
)
def complete_stream(
self,
question: str,

View File

@@ -76,6 +76,9 @@ class StreamResource(Resource, BaseAnswerResource):
agent = processor.create_agent()
retriever = processor.create_retriever()
if error := self.check_usage(processor.agent_config):
return error
return Response(
self.complete_stream(
question=data["question"],