feat: add agent webhook endpoint and implement related functionality

This commit is contained in:
Siddhant Rai
2025-04-26 12:00:29 +05:30
parent 3e1ec23409
commit 8289b02ab0
12 changed files with 424 additions and 100 deletions

View File

@@ -1,3 +1,4 @@
import json
import logging
import os
import shutil
@@ -7,15 +8,20 @@ from collections import Counter
from urllib.parse import urljoin
import requests
from bson.dbref import DBRef
from bson.objectid import ObjectId
from application.agents.agent_creator import AgentCreator
from application.api.answer.routes import get_prompt
from application.core.mongo_db import MongoDB
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.chunking import Chunker
from application.parser.embedding_pipeline import embed_and_store_documents
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.schema.base import Document
from application.parser.chunking import Chunker
from application.retriever.retriever_creator import RetrieverCreator
from application.utils import count_tokens_docs
mongo = MongoDB.get_client()
@@ -27,18 +33,22 @@ MIN_TOKENS = 150
MAX_TOKENS = 1250
RECURSION_DEPTH = 2
# Define a function to extract metadata from a given filename.
def metadata_from_filename(title):
return {"title": title}
# Define a function to generate a random string of a given length.
def generate_random_string(length):
return "".join([string.ascii_letters[i % 52] for i in range(length)])
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
"""
Recursively extract zip files with a limit on recursion depth.
@@ -69,6 +79,7 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
file_path = os.path.join(root, file)
extract_zip_recursive(file_path, root, current_depth + 1, max_depth)
def download_file(url, params, dest_path):
try:
response = requests.get(url, params=params)
@@ -79,6 +90,7 @@ def download_file(url, params, dest_path):
logging.error(f"Error downloading file: {e}")
raise
def upload_index(full_path, file_data):
try:
if settings.VECTOR_STORE == "faiss":
@@ -87,7 +99,9 @@ def upload_index(full_path, file_data):
"file_pkl": open(full_path + "/index.pkl", "rb"),
}
response = requests.post(
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
urljoin(settings.API_URL, "/api/upload_index"),
files=files,
data=file_data,
)
else:
response = requests.post(
@@ -102,6 +116,75 @@ def upload_index(full_path, file_data):
for file in files.values():
file.close()
def run_agent_logic(agent_config, input_data):
try:
source = agent_config.get("source")
retriever = agent_config.get("retriever", "classic")
if isinstance(source, DBRef):
source_doc = db.dereference(source)
source = str(source_doc["_id"])
retriever = source_doc.get("retriever", agent_config.get("retriever"))
else:
source = {}
source = {"active_docs": source}
chunks = int(agent_config.get("chunks", 2))
prompt_id = agent_config.get("prompt_id", "default")
user_api_key = agent_config["key"]
agent_type = agent_config.get("agent_type", "classic")
decoded_token = {"sub": agent_config.get("user")}
prompt = get_prompt(prompt_id)
agent = AgentCreator.create_agent(
agent_type,
endpoint="webhook",
llm_name=settings.LLM_NAME,
gpt_model=settings.MODEL_NAME,
api_key=settings.API_KEY,
user_api_key=user_api_key,
prompt=prompt,
chat_history=[],
decoded_token=decoded_token,
attachments=[],
)
retriever = RetrieverCreator.create_retriever(
retriever,
source=source,
chat_history=[],
prompt=prompt,
chunks=chunks,
token_limit=settings.DEFAULT_MAX_HISTORY,
gpt_model=settings.MODEL_NAME,
user_api_key=user_api_key,
decoded_token=decoded_token,
)
answer = agent.gen(query=input_data, retriever=retriever)
response_full = ""
thought = ""
source_log_docs = []
tool_calls = []
for line in answer:
if "answer" in line:
response_full += str(line["answer"])
elif "sources" in line:
source_log_docs.extend(line["sources"])
elif "tool_calls" in line:
tool_calls.extend(line["tool_calls"])
elif "thought" in line:
thought += line["thought"]
result = {
"answer": response_full,
"sources": source_log_docs,
"tool_calls": tool_calls,
"thought": thought,
}
return result
except Exception as e:
logging.error(f"Error in run_agent_logic: {e}", exc_info=True)
raise
# Define the main function for ingesting and processing documents.
def ingest_worker(
self, directory, formats, name_job, filename, user, retriever="classic"
@@ -133,7 +216,11 @@ def ingest_worker(
if not os.path.exists(full_path):
os.makedirs(full_path)
download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename))
download_file(
urljoin(settings.API_URL, "/api/download"),
file_data,
os.path.join(full_path, filename),
)
# check if file is .zip and extract it
if filename.endswith(".zip"):
@@ -157,7 +244,7 @@ def ingest_worker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False
duplicate_headers=False,
)
raw_docs = chunker.chunk(documents=raw_docs)
@@ -172,12 +259,14 @@ def ingest_worker(
for i in range(min(5, len(raw_docs))):
logging.info(f"Sample document {i}: {raw_docs[i]}")
file_data.update({
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
})
file_data.update(
{
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
}
)
upload_index(full_path, file_data)
# delete local
@@ -192,6 +281,7 @@ def ingest_worker(
"limited": False,
}
def remote_worker(
self,
source_data,
@@ -203,7 +293,7 @@ def remote_worker(
sync_frequency="never",
operation_mode="upload",
doc_id=None,
):
):
full_path = os.path.join(directory, user, name_job)
if not os.path.exists(full_path):
os.makedirs(full_path)
@@ -218,7 +308,7 @@ def remote_worker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False
duplicate_headers=False,
)
docs = chunker.chunk(documents=raw_docs)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
@@ -260,6 +350,7 @@ def remote_worker(
logging.info("remote_worker task completed successfully")
return {"urls": source_data, "name_job": name_job, "user": user, "limited": False}
def sync(
self,
source_data,
@@ -289,6 +380,7 @@ def sync(
return {"status": "error", "error": str(e)}
return {"status": "success"}
def sync_worker(self, frequency):
sync_counts = Counter()
sources = sources_collection.find()
@@ -313,84 +405,137 @@ def sync_worker(self, frequency):
for key in ["total_sync_count", "sync_success", "sync_failure"]
}
def attachment_worker(self, directory, file_info, user):
"""
Process and store a single attachment without vectorization.
Args:
self: Reference to the instance of the task.
directory (str): Base directory for storing files.
file_info (dict): Dictionary with folder and filename info.
user (str): User identifier.
Returns:
dict: Information about processed attachment.
"""
import datetime
import os
import mimetypes
import os
from application.utils import num_tokens_from_string
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
attachments_collection = db["attachments"]
filename = file_info["filename"]
attachment_id = file_info["attachment_id"]
logging.info(f"Processing attachment: {attachment_id}/{filename}", extra={"user": user})
logging.info(
f"Processing attachment: {attachment_id}/{filename}", extra={"user": user}
)
self.update_state(state="PROGRESS", meta={"current": 10})
file_path = os.path.join(directory, filename)
if not os.path.exists(file_path):
logging.warning(f"File not found: {file_path}", extra={"user": user})
raise FileNotFoundError(f"File not found: {file_path}")
try:
reader = SimpleDirectoryReader(
input_files=[file_path]
)
reader = SimpleDirectoryReader(input_files=[file_path])
documents = reader.load_data()
self.update_state(state="PROGRESS", meta={"current": 50})
if documents:
content = documents[0].text
token_count = num_tokens_from_string(content)
file_path_relative = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{attachment_id}/{filename}"
mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
doc_id = ObjectId(attachment_id)
attachments_collection.insert_one({
"_id": doc_id,
"user": user,
"path": file_path_relative,
"content": content,
"token_count": token_count,
"mime_type": mime_type,
"date": datetime.datetime.now(),
})
logging.info(f"Stored attachment with ID: {attachment_id}",
extra={"user": user})
attachments_collection.insert_one(
{
"_id": doc_id,
"user": user,
"path": file_path_relative,
"content": content,
"token_count": token_count,
"mime_type": mime_type,
"date": datetime.datetime.now(),
}
)
logging.info(
f"Stored attachment with ID: {attachment_id}", extra={"user": user}
)
self.update_state(state="PROGRESS", meta={"current": 100})
return {
"filename": filename,
"path": file_path_relative,
"token_count": token_count,
"attachment_id": attachment_id,
"mime_type": mime_type
"mime_type": mime_type,
}
else:
logging.warning("No content was extracted from the file",
extra={"user": user})
logging.warning(
"No content was extracted from the file", extra={"user": user}
)
raise ValueError("No content was extracted from the file")
except Exception as e:
logging.error(f"Error processing file {filename}: {e}", extra={"user": user}, exc_info=True)
logging.error(
f"Error processing file {filename}: {e}",
extra={"user": user},
exc_info=True,
)
raise
def agent_webhook_worker(self, agent_id, payload):
"""
Process the webhook payload for an agent.
Args:
self: Reference to the instance of the task.
agent_id (str): Unique identifier for the agent.
payload (dict): The payload data from the webhook.
Returns:
dict: Information about the processed webhook.
"""
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
agents_collection = db["agents"]
self.update_state(state="PROGRESS", meta={"current": 1})
try:
agent_oid = ObjectId(agent_id)
agent_config = agents_collection.find_one({"_id": agent_oid})
if not agent_config:
raise ValueError(f"Agent with ID {agent_id} not found.")
input_data = payload.get("query", "")
if input_data is None or not isinstance(input_data, str):
input_data = json.dumps(payload)
except Exception as e:
logging.error(f"Error processing agent webhook: {e}", exc_info=True)
return {"status": "error", "error": str(e)}
self.update_state(state="PROGRESS", meta={"current": 50})
try:
result = run_agent_logic(agent_config, input_data)
except Exception as e:
logging.error(f"Error running agent logic: {e}", exc_info=True)
return {"status": "error", "error": str(e)}
finally:
self.update_state(state="PROGRESS", meta={"current": 100})
logging.info(
f"Webhook processed for agent {agent_id}", extra={"agent_id": agent_id}
)
return {"status": "success", "result": result}