This commit is contained in:
ManishMadan2882
2025-04-02 15:57:45 +05:30
17 changed files with 731 additions and 368 deletions

View File

View File

@@ -1,9 +1,11 @@
from application.agents.classic_agent import ClassicAgent
from application.agents.react_agent import ReActAgent
class AgentCreator:
agents = {
"classic": ClassicAgent,
"react": ReActAgent,
}
@classmethod

View File

@@ -1,4 +1,6 @@
from typing import Dict, Generator
import uuid
from abc import ABC, abstractmethod
from typing import Dict, Generator, List, Optional
from application.agents.llm_handler import get_llm_handler
from application.agents.tools.tool_action_parser import ToolActionParser
@@ -6,20 +8,35 @@ from application.agents.tools.tool_manager import ToolManager
from application.core.mongo_db import MongoDB
from application.llm.llm_creator import LLMCreator
from application.logging import build_stack_data, log_activity, LogContext
from application.retriever.base import BaseRetriever
class BaseAgent:
class BaseAgent(ABC):
def __init__(
self,
endpoint,
llm_name,
gpt_model,
api_key,
user_api_key=None,
decoded_token=None,
attachments=None,
endpoint: str,
llm_name: str,
gpt_model: str,
api_key: str,
user_api_key: Optional[str] = None,
prompt: str = "",
chat_history: Optional[List[Dict]] = None,
decoded_token: Optional[Dict] = None,
attachments: Optional[str]=None,
):
self.endpoint = endpoint
self.llm_name = llm_name
self.gpt_model = gpt_model
self.api_key = api_key
self.user_api_key = user_api_key
self.prompt = prompt
self.decoded_token = decoded_token or {}
self.user: str = decoded_token.get("sub")
self.tool_config: Dict = {}
self.tools: List[Dict] = []
self.tool_calls: List[Dict] = []
self.chat_history: List[Dict] = chat_history if chat_history is not None else []
self.llm = LLMCreator.create_llm(
llm_name,
api_key=api_key,
@@ -27,14 +44,19 @@ class BaseAgent:
decoded_token=decoded_token,
)
self.llm_handler = get_llm_handler(llm_name)
self.gpt_model = gpt_model
self.tools = []
self.tool_config = {}
self.tool_calls = []
self.attachments = attachments or []
set.attachments = attachments or []
def gen(self, *args, **kwargs) -> Generator[Dict, None, None]:
raise NotImplementedError('Method "gen" must be implemented in the child class')
@log_activity()
def gen(
self, query: str, retriever: BaseRetriever, log_context: LogContext = None
) -> Generator[Dict, None, None]:
yield from self._gen_inner(query, retriever, log_context)
@abstractmethod
def _gen_inner(
self, query: str, retriever: BaseRetriever, log_context: LogContext
) -> Generator[Dict, None, None]:
pass
def _get_user_tools(self, user="local"):
mongo = MongoDB.get_client()
@@ -111,14 +133,12 @@ class BaseAgent:
for param, details in action_data[param_type]["properties"].items():
if param not in call_args and "value" in details:
target_dict[param] = details["value"]
for param, value in call_args.items():
for param_type, target_dict in param_types.items():
if param_type in action_data and param in action_data[param_type].get(
"properties", {}
):
target_dict[param] = value
tm = ToolManager(config={})
tool = tm.load_tool(
tool_data["name"],
@@ -153,3 +173,79 @@ class BaseAgent:
self.tool_calls.append(tool_call_data)
return result, call_id
def _build_messages(
self,
system_prompt: str,
query: str,
retrieved_data: List[Dict],
) -> List[Dict]:
docs_together = "\n".join([doc["text"] for doc in retrieved_data])
p_chat_combine = system_prompt.replace("{summaries}", docs_together)
messages_combine = [{"role": "system", "content": p_chat_combine}]
for i in self.chat_history:
if "prompt" in i and "response" in i:
messages_combine.append({"role": "user", "content": i["prompt"]})
messages_combine.append({"role": "assistant", "content": i["response"]})
if "tool_calls" in i:
for tool_call in i["tool_calls"]:
call_id = tool_call.get("call_id") or str(uuid.uuid4())
function_call_dict = {
"function_call": {
"name": tool_call.get("action_name"),
"args": tool_call.get("arguments"),
"call_id": call_id,
}
}
function_response_dict = {
"function_response": {
"name": tool_call.get("action_name"),
"response": {"result": tool_call.get("result")},
"call_id": call_id,
}
}
messages_combine.append(
{"role": "assistant", "content": [function_call_dict]}
)
messages_combine.append(
{"role": "tool", "content": [function_response_dict]}
)
messages_combine.append({"role": "user", "content": query})
return messages_combine
def _retriever_search(
self,
retriever: BaseRetriever,
query: str,
log_context: Optional[LogContext] = None,
) -> List[Dict]:
retrieved_data = retriever.search(query)
if log_context:
data = build_stack_data(retriever, exclude_attributes=["llm"])
log_context.stacks.append({"component": "retriever", "data": data})
return retrieved_data
def _llm_gen(self, messages: List[Dict], log_context: Optional[LogContext] = None):
resp = self.llm.gen_stream(
model=self.gpt_model, messages=messages, tools=self.tools
)
if log_context:
data = build_stack_data(self.llm)
log_context.stacks.append({"component": "llm", "data": data})
return resp
def _llm_handler(
self,
resp,
tools_dict: Dict,
messages: List[Dict],
log_context: Optional[LogContext] = None,
):
resp = self.llm_handler.handle_response(self, resp, tools_dict, messages)
if log_context:
data = build_stack_data(self.llm_handler)
log_context.stacks.append({"component": "llm_handler", "data": data})
return resp

View File

@@ -1,88 +1,26 @@
import uuid
from typing import Dict, Generator
from application.agents.base import BaseAgent
from application.logging import build_stack_data, log_activity, LogContext
from application.logging import LogContext
from application.retriever.base import BaseRetriever
import logging
logger = logging.getLogger(__name__)
class ClassicAgent(BaseAgent):
def __init__(
self,
endpoint,
llm_name,
gpt_model,
api_key,
user_api_key=None,
prompt="",
chat_history=None,
decoded_token=None,
attachments=None,
):
super().__init__(
endpoint, llm_name, gpt_model, api_key, user_api_key, decoded_token, attachments
)
self.user = decoded_token.get("sub")
self.prompt = prompt
self.chat_history = chat_history if chat_history is not None else []
@log_activity()
def gen(
self, query: str, retriever: BaseRetriever, log_context: LogContext = None
) -> Generator[Dict, None, None]:
yield from self._gen_inner(query, retriever, log_context)
def _gen_inner(
self, query: str, retriever: BaseRetriever, log_context: LogContext
) -> Generator[Dict, None, None]:
retrieved_data = self._retriever_search(retriever, query, log_context)
docs_together = "\n".join([doc["text"] for doc in retrieved_data])
p_chat_combine = self.prompt.replace("{summaries}", docs_together)
messages_combine = [{"role": "system", "content": p_chat_combine}]
if len(self.chat_history) > 0:
for i in self.chat_history:
if "prompt" in i and "response" in i:
messages_combine.append({"role": "user", "content": i["prompt"]})
messages_combine.append(
{"role": "assistant", "content": i["response"]}
)
if "tool_calls" in i:
for tool_call in i["tool_calls"]:
call_id = tool_call.get("call_id")
if call_id is None or call_id == "None":
call_id = str(uuid.uuid4())
function_call_dict = {
"function_call": {
"name": tool_call.get("action_name"),
"args": tool_call.get("arguments"),
"call_id": call_id,
}
}
function_response_dict = {
"function_response": {
"name": tool_call.get("action_name"),
"response": {"result": tool_call.get("result")},
"call_id": call_id,
}
}
messages_combine.append(
{"role": "assistant", "content": [function_call_dict]}
)
messages_combine.append(
{"role": "tool", "content": [function_response_dict]}
)
messages_combine.append({"role": "user", "content": query})
tools_dict = self._get_user_tools(self.user)
self._prepare_tools(tools_dict)
resp = self._llm_gen(messages_combine, log_context)
messages = self._build_messages(self.prompt, query, retrieved_data)
resp = self._llm_gen(messages, log_context)
attachments = self.attachments
if isinstance(resp, str):
yield {"answer": resp}
@@ -95,7 +33,7 @@ class ClassicAgent(BaseAgent):
yield {"answer": resp.message.content}
return
resp = self._llm_handler(resp, tools_dict, messages_combine, log_context, self.attachments)
resp = self._llm_handler(resp, tools_dict, messages, log_context,attachments)
if isinstance(resp, str):
yield {"answer": resp}
@@ -107,7 +45,7 @@ class ClassicAgent(BaseAgent):
yield {"answer": resp.message.content}
else:
completion = self.llm.gen_stream(
model=self.gpt_model, messages=messages_combine, tools=self.tools
model=self.gpt_model, messages=messages, tools=self.tools
)
for line in completion:
if isinstance(line, str):
@@ -115,29 +53,3 @@ class ClassicAgent(BaseAgent):
yield {"sources": retrieved_data}
yield {"tool_calls": self.tool_calls.copy()}
def _retriever_search(self, retriever, query, log_context):
retrieved_data = retriever.search(query)
if log_context:
data = build_stack_data(retriever, exclude_attributes=["llm"])
log_context.stacks.append({"component": "retriever", "data": data})
return retrieved_data
def _llm_gen(self, messages_combine, log_context):
resp = self.llm.gen_stream(
model=self.gpt_model, messages=messages_combine, tools=self.tools
)
if log_context:
data = build_stack_data(self.llm)
log_context.stacks.append({"component": "llm", "data": data})
return resp
def _llm_handler(self, resp, tools_dict, messages_combine, log_context, attachments=None):
logger.info(f"Handling LLM response with {len(attachments) if attachments else 0} attachments")
resp = self.llm_handler.handle_response(
self, resp, tools_dict, messages_combine, attachments=attachments
)
if log_context:
data = build_stack_data(self.llm_handler)
log_context.stacks.append({"component": "llm_handler", "data": data})
return resp

View File

@@ -0,0 +1,121 @@
import os
from typing import Dict, Generator, List
from application.agents.base import BaseAgent
from application.logging import build_stack_data, LogContext
from application.retriever.base import BaseRetriever
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
with open(
os.path.join(current_dir, "application/prompts", "react_planning_prompt.txt"), "r"
) as f:
planning_prompt = f.read()
with open(
os.path.join(current_dir, "application/prompts", "react_final_prompt.txt"),
"r",
) as f:
final_prompt = f.read()
class ReActAgent(BaseAgent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.plan = ""
self.observations: List[str] = []
def _gen_inner(
self, query: str, retriever: BaseRetriever, log_context: LogContext
) -> Generator[Dict, None, None]:
retrieved_data = self._retriever_search(retriever, query, log_context)
tools_dict = self._get_user_tools(self.user)
self._prepare_tools(tools_dict)
docs_together = "\n".join([doc["text"] for doc in retrieved_data])
plan = self._create_plan(query, docs_together, log_context)
for line in plan:
if isinstance(line, str):
self.plan += line
yield {"thought": line}
prompt = self.prompt + f"\nFollow this plan: {self.plan}"
messages = self._build_messages(prompt, query, retrieved_data)
resp = self._llm_gen(messages, log_context)
if isinstance(resp, str):
self.observations.append(resp)
if (
hasattr(resp, "message")
and hasattr(resp.message, "content")
and resp.message.content is not None
):
self.observations.append(resp.message.content)
resp = self._llm_handler(resp, tools_dict, messages, log_context)
for tool_call in self.tool_calls:
observation = (
f"Action '{tool_call['action_name']}' of tool '{tool_call['tool_name']}' "
f"with arguments '{tool_call['arguments']}' returned: '{tool_call['result']}'"
)
self.observations.append(observation)
if isinstance(resp, str):
self.observations.append(resp)
elif (
hasattr(resp, "message")
and hasattr(resp.message, "content")
and resp.message.content is not None
):
self.observations.append(resp.message.content)
else:
completion = self.llm.gen_stream(
model=self.gpt_model, messages=messages, tools=self.tools
)
for line in completion:
if isinstance(line, str):
self.observations.append(line)
yield {"sources": retrieved_data}
yield {"tool_calls": self.tool_calls.copy()}
final_answer = self._create_final_answer(query, self.observations, log_context)
for line in final_answer:
if isinstance(line, str):
yield {"answer": line}
def _create_plan(
self, query: str, docs_data: str, log_context: LogContext = None
) -> Generator[str, None, None]:
plan_prompt = planning_prompt.replace("{query}", query)
if "{summaries}" in planning_prompt:
summaries = docs_data
plan_prompt = plan_prompt.replace("{summaries}", summaries)
messages = [{"role": "user", "content": plan_prompt}]
print(self.tools)
plan = self.llm.gen_stream(
model=self.gpt_model, messages=messages, tools=self.tools
)
if log_context:
data = build_stack_data(self.llm)
log_context.stacks.append({"component": "planning_llm", "data": data})
return plan
def _create_final_answer(
self, query: str, observations: List[str], log_context: LogContext = None
) -> str:
observation_string = "\n".join(observations)
final_answer_prompt = final_prompt.format(
query=query, observations=observation_string
)
messages = [{"role": "user", "content": final_answer_prompt}]
final_answer = self.llm.gen_stream(model=self.gpt_model, messages=messages)
if log_context:
data = build_stack_data(self.llm)
log_context.stacks.append({"component": "final_answer_llm", "data": data})
return final_answer