diff --git a/README.md b/README.md index 598cfa1d..bc4113ee 100644 --- a/README.md +++ b/README.md @@ -49,10 +49,10 @@ - [x] Manually updating chunks in the app UI (Feb 2025) - [x] Devcontainer for easy development (Feb 2025) - [x] ReACT agent (March 2025) -- [ ] Chatbots menu re-design to handle tools, agent types, and more (April 2025) -- [ ] New input box in the conversation menu (April 2025) -- [ ] Anthropic Tool compatibility (April 2025) -- [ ] Add triggerable actions / tools (webhook) (April 2025) +- [x] Chatbots menu re-design to handle tools, agent types, and more (April 2025) +- [x] New input box in the conversation menu (April 2025) +- [x] Add triggerable actions / tools (webhook) (April 2025) +- [ ] Anthropic Tool compatibility (May 2025) - [ ] Add OAuth 2.0 authentication for tools and sources - [ ] Agent scheduling diff --git a/application/Dockerfile b/application/Dockerfile index 308b721b..e33721a2 100644 --- a/application/Dockerfile +++ b/application/Dockerfile @@ -84,4 +84,4 @@ EXPOSE 7091 USER appuser # Start Gunicorn -CMD ["gunicorn", "-w", "2", "--timeout", "120", "--bind", "0.0.0.0:7091", "application.wsgi:app"] \ No newline at end of file +CMD ["gunicorn", "-w", "1", "--timeout", "120", "--bind", "0.0.0.0:7091", "--preload", "application.wsgi:app"] diff --git a/application/agents/base.py b/application/agents/base.py index 64fac17b..d44244cf 100644 --- a/application/agents/base.py +++ b/application/agents/base.py @@ -10,6 +10,7 @@ from application.core.mongo_db import MongoDB from application.llm.llm_creator import LLMCreator from application.logging import build_stack_data, log_activity, LogContext from application.retriever.base import BaseRetriever +from application.core.settings import settings from bson.objectid import ObjectId @@ -61,7 +62,7 @@ class BaseAgent(ABC): def _get_tools(self, api_key: str = None) -> Dict[str, Dict]: mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] agents_collection = db["agents"] tools_collection = db["user_tools"] @@ -82,7 +83,7 @@ class BaseAgent(ABC): def _get_user_tools(self, user="local"): mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] user_tools_collection = db["user_tools"] user_tools = user_tools_collection.find({"user": user, "status": True}) user_tools = list(user_tools) @@ -255,7 +256,7 @@ class BaseAgent(ABC): model=self.gpt_model, messages=messages, tools=self.tools ) if log_context: - data = build_stack_data(self.llm) + data = build_stack_data(self.llm, exclude_attributes=["client"]) log_context.stacks.append({"component": "llm", "data": data}) return resp @@ -271,6 +272,6 @@ class BaseAgent(ABC): self, resp, tools_dict, messages, attachments ) if log_context: - data = build_stack_data(self.llm_handler) + data = build_stack_data(self.llm_handler, exclude_attributes=["tool_calls"]) log_context.stacks.append({"component": "llm_handler", "data": data}) return resp diff --git a/application/agents/classic_agent.py b/application/agents/classic_agent.py index bf472cd0..b96a77fc 100644 --- a/application/agents/classic_agent.py +++ b/application/agents/classic_agent.py @@ -48,15 +48,13 @@ class ClassicAgent(BaseAgent): ): yield {"answer": resp.message.content} else: - # completion = self.llm.gen_stream( - # model=self.gpt_model, messages=messages, tools=self.tools - # ) - # log type of resp - logger.info(f"Response type: {type(resp)}") - logger.info(f"Response: {resp}") for line in resp: if isinstance(line, str): yield {"answer": line} + log_context.stacks.append( + {"component": "agent", "data": {"tool_calls": self.tool_calls.copy()}} + ) + yield {"sources": retrieved_data} yield {"tool_calls": self.tool_calls.copy()} diff --git a/application/agents/llm_handler.py b/application/agents/llm_handler.py index 7fe794f8..1b995f71 100644 --- a/application/agents/llm_handler.py +++ b/application/agents/llm_handler.py @@ -15,95 +15,86 @@ class LLMHandler(ABC): @abstractmethod def handle_response(self, agent, resp, tools_dict, messages, attachments=None, **kwargs): pass - + def prepare_messages_with_attachments(self, agent, messages, attachments=None): """ Prepare messages with attachment content if available. - + Args: agent: The current agent instance. messages (list): List of message dictionaries. attachments (list): List of attachment dictionaries with content. - + Returns: list: Messages with attachment context added to the system prompt. """ if not attachments: return messages - + logger.info(f"Preparing messages with {len(attachments)} attachments") - + supported_types = agent.llm.get_supported_attachment_types() - + supported_attachments = [] unsupported_attachments = [] - + for attachment in attachments: mime_type = attachment.get('mime_type') - if not mime_type: - import mimetypes - file_path = attachment.get('path') - if file_path: - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - else: - unsupported_attachments.append(attachment) - continue - if mime_type in supported_types: supported_attachments.append(attachment) else: unsupported_attachments.append(attachment) - + # Process supported attachments with the LLM's custom method prepared_messages = messages if supported_attachments: logger.info(f"Processing {len(supported_attachments)} supported attachments with {agent.llm.__class__.__name__}'s method") prepared_messages = agent.llm.prepare_messages_with_attachments(messages, supported_attachments) - + # Process unsupported attachments with the default method if unsupported_attachments: logger.info(f"Processing {len(unsupported_attachments)} unsupported attachments with default method") prepared_messages = self._append_attachment_content_to_system(prepared_messages, unsupported_attachments) - + return prepared_messages - + def _append_attachment_content_to_system(self, messages, attachments): """ Default method to append attachment content to the system prompt. - + Args: messages (list): List of message dictionaries. attachments (list): List of attachment dictionaries with content. - + Returns: list: Messages with attachment context added to the system prompt. """ prepared_messages = messages.copy() - + attachment_texts = [] for attachment in attachments: logger.info(f"Adding attachment {attachment.get('id')} to context") if 'content' in attachment: attachment_texts.append(f"Attached file content:\n\n{attachment['content']}") - + if attachment_texts: combined_attachment_text = "\n\n".join(attachment_texts) - + system_found = False for i in range(len(prepared_messages)): if prepared_messages[i].get("role") == "system": prepared_messages[i]["content"] += f"\n\n{combined_attachment_text}" system_found = True break - + if not system_found: prepared_messages.insert(0, {"role": "system", "content": combined_attachment_text}) - + return prepared_messages class OpenAILLMHandler(LLMHandler): def handle_response(self, agent, resp, tools_dict, messages, attachments=None, stream: bool = True): - + messages = self.prepare_messages_with_attachments(agent, messages, attachments) logger.info(f"Messages with attachments: {messages}") if not stream: @@ -146,6 +137,7 @@ class OpenAILLMHandler(LLMHandler): messages = self.prepare_messages_with_attachments(agent, messages, attachments) except Exception as e: + logging.error(f"Error executing tool: {str(e)}", exc_info=True) messages.append( { "role": "tool", @@ -167,7 +159,7 @@ class OpenAILLMHandler(LLMHandler): if isinstance(chunk, str) and len(chunk) > 0: yield chunk continue - elif hasattr(chunk, "delta"): + elif hasattr(chunk, "delta"): chunk_delta = chunk.delta if ( @@ -238,6 +230,7 @@ class OpenAILLMHandler(LLMHandler): ) except Exception as e: + logging.error(f"Error executing tool: {str(e)}", exc_info=True) messages.append( { "role": "assistant", @@ -258,7 +251,7 @@ class OpenAILLMHandler(LLMHandler): return resp elif isinstance(chunk, str) and len(chunk) == 0: continue - + logger.info(f"Regenerating with messages: {messages}") resp = agent.llm.gen_stream( model=agent.gpt_model, messages=messages, tools=agent.tools @@ -269,9 +262,9 @@ class OpenAILLMHandler(LLMHandler): class GoogleLLMHandler(LLMHandler): def handle_response(self, agent, resp, tools_dict, messages, attachments=None, stream: bool = True): from google.genai import types - + messages = self.prepare_messages_with_attachments(agent, messages, attachments) - + while True: if not stream: response = agent.llm.gen( diff --git a/application/agents/react_agent.py b/application/agents/react_agent.py index 3fae1fda..a5d47850 100644 --- a/application/agents/react_agent.py +++ b/application/agents/react_agent.py @@ -82,6 +82,10 @@ class ReActAgent(BaseAgent): if isinstance(line, str): self.observations.append(line) + log_context.stacks.append( + {"component": "agent", "data": {"tool_calls": self.tool_calls.copy()}} + ) + yield {"sources": retrieved_data} yield {"tool_calls": self.tool_calls.copy()} diff --git a/application/agents/tools/api_tool.py b/application/agents/tools/api_tool.py index 092c3569..063313c4 100644 --- a/application/agents/tools/api_tool.py +++ b/application/agents/tools/api_tool.py @@ -25,8 +25,8 @@ class APITool(Tool): def _make_api_call(self, url, method, headers, query_params, body): if query_params: url = f"{url}?{requests.compat.urlencode(query_params)}" - if isinstance(body, dict): - body = json.dumps(body) + # if isinstance(body, dict): + # body = json.dumps(body) try: print(f"Making API call: {method} {url} with body: {body}") if body == "{}": diff --git a/application/api/answer/routes.py b/application/api/answer/routes.py index 8f44385b..abc1f9ba 100644 --- a/application/api/answer/routes.py +++ b/application/api/answer/routes.py @@ -23,7 +23,7 @@ from application.utils import check_required_fields, limit_chat_history logger = logging.getLogger(__name__) mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] conversations_collection = db["conversations"] sources_collection = db["sources"] prompts_collection = db["prompts"] @@ -105,7 +105,7 @@ def get_agent_key(agent_id, user_id): raise Exception("Unauthorized access to the agent", 403) except Exception as e: - logger.error(f"Error in get_agent_key: {str(e)}") + logger.error(f"Error in get_agent_key: {str(e)}", exc_info=True) raise @@ -351,8 +351,7 @@ def complete_stream( data = json.dumps({"type": "end"}) yield f"data: {data}\n\n" except Exception as e: - logger.error(f"Error in stream: {str(e)}") - logger.error(traceback.format_exc()) + logger.error(f"Error in stream: {str(e)}", exc_info=True) data = json.dumps( { "type": "error", @@ -882,6 +881,6 @@ def get_attachments_content(attachment_ids, user): if attachment_doc: attachments.append(attachment_doc) except Exception as e: - logger.error(f"Error retrieving attachment {attachment_id}: {e}") + logger.error(f"Error retrieving attachment {attachment_id}: {e}", exc_info=True) return attachments diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index c8e32d11..80759593 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -3,12 +3,15 @@ import datetime from flask import Blueprint, request, send_from_directory from werkzeug.utils import secure_filename from bson.objectid import ObjectId - +import logging from application.core.mongo_db import MongoDB from application.core.settings import settings +from application.storage.storage_creator import StorageCreator + +logger = logging.getLogger(__name__) mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] conversations_collection = db["conversations"] sources_collection = db["sources"] @@ -45,26 +48,26 @@ def upload_index_files(): remote_data = request.form["remote_data"] if "remote_data" in request.form else None sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None - save_dir = os.path.join(current_dir, "indexes", str(id)) + storage = StorageCreator.get_storage() + index_base_path = f"indexes/{id}" + if settings.VECTOR_STORE == "faiss": if "file_faiss" not in request.files: - print("No file part") + logger.error("No file_faiss part") return {"status": "no file"} file_faiss = request.files["file_faiss"] if file_faiss.filename == "": return {"status": "no file name"} if "file_pkl" not in request.files: - print("No file part") + logger.error("No file_pkl part") return {"status": "no file"} file_pkl = request.files["file_pkl"] if file_pkl.filename == "": return {"status": "no file name"} - # saves index files - - if not os.path.exists(save_dir): - os.makedirs(save_dir) - file_faiss.save(os.path.join(save_dir, "index.faiss")) - file_pkl.save(os.path.join(save_dir, "index.pkl")) + + # Save index files to storage + storage.save_file(file_faiss, f"{index_base_path}/index.faiss") + storage.save_file(file_pkl, f"{index_base_path}/index.pkl") existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) if existing_entry: diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 8876be6b..e10082d3 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -2,8 +2,10 @@ import datetime import json import math import os +import secrets import shutil import uuid +from functools import wraps from bson.binary import Binary, UuidRepresentation from bson.dbref import DBRef @@ -14,7 +16,12 @@ from werkzeug.utils import secure_filename from application.agents.tools.tool_manager import ToolManager -from application.api.user.tasks import ingest, ingest_remote, store_attachment +from application.api.user.tasks import ( + ingest, + ingest_remote, + process_agent_webhook, + store_attachment, +) from application.core.mongo_db import MongoDB from application.core.settings import settings from application.extensions import api @@ -23,7 +30,7 @@ from application.utils import check_required_fields, validate_function_name from application.vectorstore.vector_creator import VectorCreator mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] conversations_collection = db["conversations"] sources_collection = db["sources"] prompts_collection = db["prompts"] @@ -103,7 +110,7 @@ class DeleteConversation(Resource): {"_id": ObjectId(conversation_id), "user": decoded_token["sub"]} ) except Exception as err: - current_app.logger.error(f"Error deleting conversation: {err}") + current_app.logger.error(f"Error deleting conversation: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -121,7 +128,7 @@ class DeleteAllConversations(Resource): try: conversations_collection.delete_many({"user": user_id}) except Exception as err: - current_app.logger.error(f"Error deleting all conversations: {err}") + current_app.logger.error(f"Error deleting all conversations: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -159,7 +166,7 @@ class GetConversations(Resource): for conversation in conversations ] except Exception as err: - current_app.logger.error(f"Error retrieving conversations: {err}") + current_app.logger.error(f"Error retrieving conversations: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify(list_conversations), 200) @@ -187,7 +194,7 @@ class GetSingleConversation(Resource): if not conversation: return make_response(jsonify({"status": "not found"}), 404) except Exception as err: - current_app.logger.error(f"Error retrieving conversation: {err}") + current_app.logger.error(f"Error retrieving conversation: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) data = { @@ -229,7 +236,7 @@ class UpdateConversationName(Resource): {"$set": {"name": data["name"]}}, ) except Exception as err: - current_app.logger.error(f"Error updating conversation name: {err}") + current_app.logger.error(f"Error updating conversation name: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -307,7 +314,7 @@ class SubmitFeedback(Resource): ) except Exception as err: - current_app.logger.error(f"Error submitting feedback: {err}") + current_app.logger.error(f"Error submitting feedback: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -331,7 +338,7 @@ class DeleteByIds(Resource): if result: return make_response(jsonify({"success": True}), 200) except Exception as err: - current_app.logger.error(f"Error deleting indexes: {err}") + current_app.logger.error(f"Error deleting indexes: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": False}), 400) @@ -370,7 +377,7 @@ class DeleteOldIndexes(Resource): except FileNotFoundError: pass except Exception as err: - current_app.logger.error(f"Error deleting old indexes: {err}") + current_app.logger.error(f"Error deleting old indexes: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) sources_collection.delete_one({"_id": ObjectId(source_id)}) @@ -413,53 +420,105 @@ class UploadFile(Resource): user = secure_filename(decoded_token.get("sub")) job_name = secure_filename(request.form["name"]) + try: - save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) - os.makedirs(save_dir, exist_ok=True) + from application.storage.storage_creator import StorageCreator + + storage = StorageCreator.get_storage() + + base_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}" if len(files) > 1: - temp_dir = os.path.join(save_dir, "temp") - os.makedirs(temp_dir, exist_ok=True) - + temp_files = [] for file in files: filename = secure_filename(file.filename) - file.save(os.path.join(temp_dir, filename)) + temp_path = f"{base_path}/temp/{filename}" + storage.save_file(file, temp_path) + temp_files.append(temp_path) print(f"Saved file: {filename}") - zip_path = shutil.make_archive( - base_name=os.path.join(save_dir, job_name), - format="zip", - root_dir=temp_dir, - ) - final_filename = os.path.basename(zip_path) - shutil.rmtree(temp_dir) - task = ingest.delay( - settings.UPLOAD_FOLDER, - [ - ".rst", - ".md", - ".pdf", - ".txt", - ".docx", - ".csv", - ".epub", - ".html", - ".mdx", - ".json", - ".xlsx", - ".pptx", - ".png", - ".jpg", - ".jpeg", - ], - job_name, - final_filename, - user, - ) - else: + + zip_filename = f"{job_name}.zip" + zip_path = f"{base_path}/{zip_filename}" + zip_temp_path = None + + def create_zip_archive(temp_paths, job_name, storage): + import tempfile + + with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_zip_file: + zip_output_path = temp_zip_file.name + + with tempfile.TemporaryDirectory() as stage_dir: + for path in temp_paths: + try: + file_data = storage.get_file(path) + with open(os.path.join(stage_dir, os.path.basename(path)), "wb") as f: + f.write(file_data.read()) + except Exception as e: + current_app.logger.error(f"Error processing file {path} for zipping: {e}", exc_info=True) + if os.path.exists(zip_output_path): + os.remove(zip_output_path) + raise + try: + shutil.make_archive( + base_name=zip_output_path.replace(".zip", ""), + format="zip", + root_dir=stage_dir, + ) + except Exception as e: + current_app.logger.error(f"Error creating zip archive: {e}", exc_info=True) + if os.path.exists(zip_output_path): + os.remove(zip_output_path) + raise + + return zip_output_path + + try: + zip_temp_path = create_zip_archive(temp_files, job_name, storage) + with open(zip_temp_path, "rb") as zip_file: + storage.save_file(zip_file, zip_path) + + task = ingest.delay( + settings.UPLOAD_FOLDER, + [ + ".rst", + ".md", + ".pdf", + ".txt", + ".docx", + ".csv", + ".epub", + ".html", + ".mdx", + ".json", + ".xlsx", + ".pptx", + ".png", + ".jpg", + ".jpeg", + ], + job_name, + zip_filename, + user, + ) + finally: + # Clean up temporary files + for temp_path in temp_files: + try: + storage.delete_file(temp_path) + except Exception as e: + current_app.logger.error(f"Error deleting temporary file {temp_path}: {e}", exc_info=True) + + # Clean up the zip file if it was created + if zip_temp_path and os.path.exists(zip_temp_path): + os.remove(zip_temp_path) + + else: # Keep this else block for single file upload + # For single file file = files[0] - final_filename = secure_filename(file.filename) - file_path = os.path.join(save_dir, final_filename) - file.save(file_path) + filename = secure_filename(file.filename) + file_path = f"{base_path}/{filename}" + + storage.save_file(file, file_path) task = ingest.delay( settings.UPLOAD_FOLDER, @@ -481,13 +540,14 @@ class UploadFile(Resource): ".jpeg", ], job_name, - final_filename, + filename, # Corrected variable for single-file case user, ) except Exception as err: - current_app.logger.error(f"Error uploading file: {err}") + current_app.logger.error(f"Error uploading file: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) + return make_response(jsonify({"success": True, "task_id": task.id}), 200) @@ -538,7 +598,7 @@ class UploadRemote(Resource): loader=data["source"], ) except Exception as err: - current_app.logger.error(f"Error uploading remote source: {err}") + current_app.logger.error(f"Error uploading remote source: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True, "task_id": task.id}), 200) @@ -571,7 +631,7 @@ class TaskStatus(Resource): ): task_meta = str(task_meta) # Convert to a string representation except Exception as err: - current_app.logger.error(f"Error getting task status: {err}") + current_app.logger.error(f"Error getting task status: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"status": task.status, "result": task_meta}), 200) @@ -650,7 +710,7 @@ class PaginatedSources(Resource): return make_response(jsonify(response), 200) except Exception as err: - current_app.logger.error(f"Error retrieving paginated sources: {err}") + current_app.logger.error(f"Error retrieving paginated sources: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) @@ -714,7 +774,7 @@ class CombinedJson(Resource): ) except Exception as err: - current_app.logger.error(f"Error retrieving sources: {err}") + current_app.logger.error(f"Error retrieving sources: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify(data), 200) @@ -741,7 +801,7 @@ class CheckDocs(Resource): if os.path.exists(vectorstore) or data["docs"] == "default": return {"status": "exists"}, 200 except Exception as err: - current_app.logger.error(f"Error checking document: {err}") + current_app.logger.error(f"Error checking document: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"status": "not found"}), 404) @@ -783,7 +843,7 @@ class CreatePrompt(Resource): ) new_id = str(resp.inserted_id) except Exception as err: - current_app.logger.error(f"Error creating prompt: {err}") + current_app.logger.error(f"Error creating prompt: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"id": new_id}), 200) @@ -814,7 +874,7 @@ class GetPrompts(Resource): } ) except Exception as err: - current_app.logger.error(f"Error retrieving prompts: {err}") + current_app.logger.error(f"Error retrieving prompts: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify(list_prompts), 200) @@ -862,7 +922,7 @@ class GetSinglePrompt(Resource): {"_id": ObjectId(prompt_id), "user": user} ) except Exception as err: - current_app.logger.error(f"Error retrieving prompt: {err}") + current_app.logger.error(f"Error retrieving prompt: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"content": prompt["content"]}), 200) @@ -891,7 +951,7 @@ class DeletePrompt(Resource): try: prompts_collection.delete_one({"_id": ObjectId(data["id"]), "user": user}) except Exception as err: - current_app.logger.error(f"Error deleting prompt: {err}") + current_app.logger.error(f"Error deleting prompt: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -929,7 +989,7 @@ class UpdatePrompt(Resource): {"$set": {"name": data["name"], "content": data["content"]}}, ) except Exception as err: - current_app.logger.error(f"Error updating prompt: {err}") + current_app.logger.error(f"Error updating prompt: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -958,12 +1018,8 @@ class GetAgent(Resource): data = { "id": str(agent["_id"]), "name": agent["name"], - "description": agent["description"], - "source": ( - str(db.dereference(agent["source"])["_id"]) - if "source" in agent and isinstance(agent["source"], DBRef) - else "" - ), + "description": agent.get("description", ""), + "source": (str(source_doc["_id"]) if isinstance(agent.get("source"), DBRef) and (source_doc := db.dereference(agent.get("source"))) else ""), "chunks": agent["chunks"], "retriever": agent.get("retriever", ""), "prompt_id": agent["prompt_id"], @@ -976,7 +1032,7 @@ class GetAgent(Resource): "key": f"{agent['key'][:4]}...{agent['key'][-4:]}", } except Exception as err: - current_app.logger.error(f"Error retrieving agent: {err}") + current_app.logger.error(f"Error retrieving agent: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify(data), 200) @@ -996,12 +1052,8 @@ class GetAgents(Resource): { "id": str(agent["_id"]), "name": agent["name"], - "description": agent["description"], - "source": ( - str(db.dereference(agent["source"])["_id"]) - if "source" in agent and isinstance(agent["source"], DBRef) - else "" - ), + "description": agent.get("description", ""), + "source": (str(source_doc["_id"]) if isinstance(agent.get("source"), DBRef) and (source_doc := db.dereference(agent.get("source"))) else ""), "chunks": agent["chunks"], "retriever": agent.get("retriever", ""), "prompt_id": agent["prompt_id"], @@ -1017,7 +1069,7 @@ class GetAgents(Resource): if "source" in agent or "retriever" in agent ] except Exception as err: - current_app.logger.error(f"Error retrieving agents: {err}") + current_app.logger.error(f"Error retrieving agents: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify(list_agents), 200) @@ -1106,7 +1158,7 @@ class CreateAgent(Resource): resp = agents_collection.insert_one(new_agent) new_id = str(resp.inserted_id) except Exception as err: - current_app.logger.error(f"Error creating agent: {err}") + current_app.logger.error(f"Error creating agent: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"id": new_id, "key": key}), 201) @@ -1157,7 +1209,7 @@ class UpdateAgent(Resource): existing_agent = agents_collection.find_one({"_id": oid, "user": user}) except Exception as err: return make_response( - current_app.logger.error(f"Error finding agent {agent_id}: {err}"), + current_app.logger.error(f"Error finding agent {agent_id}: {err}", exc_info=True), jsonify({"success": False, "message": "Database error finding agent"}), 500, ) @@ -1280,7 +1332,7 @@ class UpdateAgent(Resource): ) except Exception as err: - current_app.logger.error(f"Error updating agent {agent_id}: {err}") + current_app.logger.error(f"Error updating agent {agent_id}: {err}", exc_info=True) return make_response( jsonify({"success": False, "message": "Database error during update"}), 500, @@ -1323,12 +1375,143 @@ class DeleteAgent(Resource): deleted_id = str(deleted_agent["_id"]) except Exception as err: - current_app.logger.error(f"Error deleting agent: {err}") + current_app.logger.error(f"Error deleting agent: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"id": deleted_id}), 200) +@user_ns.route("/api/agent_webhook") +class AgentWebhook(Resource): + @api.doc( + params={"id": "ID of the agent"}, + description="Generate webhook URL for the agent", + ) + def get(self): + decoded_token = request.decoded_token + if not decoded_token: + return make_response(jsonify({"success": False}), 401) + user = decoded_token.get("sub") + agent_id = request.args.get("id") + if not agent_id: + return make_response( + jsonify({"success": False, "message": "ID is required"}), 400 + ) + + try: + agent = agents_collection.find_one( + {"_id": ObjectId(agent_id), "user": user} + ) + if not agent: + return make_response( + jsonify({"success": False, "message": "Agent not found"}), 404 + ) + + webhook_token = agent.get("incoming_webhook_token") + if not webhook_token: + webhook_token = secrets.token_urlsafe(32) + agents_collection.update_one( + {"_id": ObjectId(agent_id), "user": user}, + {"$set": {"incoming_webhook_token": webhook_token}}, + ) + base_url = settings.API_URL.rstrip("/") + full_webhook_url = f"{base_url}/api/webhooks/agents/{webhook_token}" + + except Exception as err: + current_app.logger.error(f"Error generating webhook URL: {err}", exc_info=True) + return make_response( + jsonify({"success": False, "message": "Error generating webhook URL"}), + 400, + ) + return make_response( + jsonify({"success": True, "webhook_url": full_webhook_url}), 200 + ) + + +def require_agent(func): + @wraps(func) + def wrapper(*args, **kwargs): + webhook_token = kwargs.get("webhook_token") + if not webhook_token: + return make_response( + jsonify({"success": False, "message": "Webhook token missing"}), 400 + ) + + agent = agents_collection.find_one( + {"incoming_webhook_token": webhook_token}, {"_id": 1} + ) + if not agent: + current_app.logger.warning( + f"Webhook attempt with invalid token: {webhook_token}" + ) + return make_response( + jsonify({"success": False, "message": "Agent not found"}), 404 + ) + + kwargs["agent"] = agent + kwargs["agent_id_str"] = str(agent["_id"]) + return func(*args, **kwargs) + + return wrapper + + +@user_ns.route("/api/webhooks/agents/") +class AgentWebhookListener(Resource): + method_decorators = [require_agent] + + def _enqueue_webhook_task(self, agent_id_str, payload, source_method): + if not payload: + current_app.logger.warning( + f"Webhook ({source_method}) received for agent {agent_id_str} with empty payload." + ) + + current_app.logger.info( + f"Incoming {source_method} webhook for agent {agent_id_str}. Enqueuing task with payload: {payload}" + ) + + try: + task = process_agent_webhook.delay( + agent_id=agent_id_str, + payload=payload, + ) + current_app.logger.info( + f"Task {task.id} enqueued for agent {agent_id_str} ({source_method})." + ) + return make_response(jsonify({"success": True, "task_id": task.id}), 200) + except Exception as err: + current_app.logger.error( + f"Error enqueuing webhook task ({source_method}) for agent {agent_id_str}: {err}", + exc_info=True, + ) + return make_response( + jsonify({"success": False, "message": "Error processing webhook"}), 500 + ) + + @api.doc( + description="Webhook listener for agent events (POST). Expects JSON payload, which is used to trigger processing.", + ) + def post(self, webhook_token, agent, agent_id_str): + payload = request.get_json() + if payload is None: + return make_response( + jsonify( + { + "success": False, + "message": "Invalid or missing JSON data in request body", + } + ), + 400, + ) + return self._enqueue_webhook_task(agent_id_str, payload, source_method="POST") + + @api.doc( + description="Webhook listener for agent events (GET). Uses URL query parameters as payload to trigger processing.", + ) + def get(self, webhook_token, agent, agent_id_str): + payload = request.args.to_dict(flat=True) + return self._enqueue_webhook_task(agent_id_str, payload, source_method="GET") + + @user_ns.route("/api/share") class ShareConversation(Resource): share_conversation_model = api.model( @@ -1524,7 +1707,7 @@ class ShareConversation(Resource): 201, ) except Exception as err: - current_app.logger.error(f"Error sharing conversation: {err}") + current_app.logger.error(f"Error sharing conversation: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) @@ -1580,7 +1763,7 @@ class GetPubliclySharedConversations(Resource): res["api_key"] = shared["api_key"] return make_response(jsonify(res), 200) except Exception as err: - current_app.logger.error(f"Error getting shared conversation: {err}") + current_app.logger.error(f"Error getting shared conversation: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) @@ -1625,7 +1808,7 @@ class GetMessageAnalytics(Resource): else None ) except Exception as err: - current_app.logger.error(f"Error getting API key: {err}") + current_app.logger.error(f"Error getting API key: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) end_date = datetime.datetime.now(datetime.timezone.utc) @@ -1700,7 +1883,7 @@ class GetMessageAnalytics(Resource): daily_messages[entry["_id"]] = entry["count"] except Exception as err: - current_app.logger.error(f"Error getting message analytics: {err}") + current_app.logger.error(f"Error getting message analytics: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response( @@ -1749,7 +1932,7 @@ class GetTokenAnalytics(Resource): else None ) except Exception as err: - current_app.logger.error(f"Error getting API key: {err}") + current_app.logger.error(f"Error getting API key: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) end_date = datetime.datetime.now(datetime.timezone.utc) @@ -1859,7 +2042,7 @@ class GetTokenAnalytics(Resource): daily_token_usage[entry["_id"]["day"]] = entry["total_tokens"] except Exception as err: - current_app.logger.error(f"Error getting token analytics: {err}") + current_app.logger.error(f"Error getting token analytics: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response( @@ -1908,7 +2091,7 @@ class GetFeedbackAnalytics(Resource): else None ) except Exception as err: - current_app.logger.error(f"Error getting API key: {err}") + current_app.logger.error(f"Error getting API key: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) end_date = datetime.datetime.now(datetime.timezone.utc) @@ -2024,7 +2207,7 @@ class GetFeedbackAnalytics(Resource): } except Exception as err: - current_app.logger.error(f"Error getting feedback analytics: {err}") + current_app.logger.error(f"Error getting feedback analytics: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response( @@ -2071,7 +2254,7 @@ class GetUserLogs(Resource): else None ) except Exception as err: - current_app.logger.error(f"Error getting API key: {err}") + current_app.logger.error(f"Error getting API key: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) query = {"user": user} @@ -2160,7 +2343,7 @@ class ManageSync(Resource): update_data, ) except Exception as err: - current_app.logger.error(f"Error updating sync frequency: {err}") + current_app.logger.error(f"Error updating sync frequency: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -2196,7 +2379,7 @@ class TextToSpeech(Resource): 200, ) except Exception as err: - current_app.logger.error(f"Error synthesizing audio: {err}") + current_app.logger.error(f"Error synthesizing audio: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) @@ -2221,7 +2404,7 @@ class AvailableTools(Resource): } ) except Exception as err: - current_app.logger.error(f"Error getting available tools: {err}") + current_app.logger.error(f"Error getting available tools: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True, "data": tools_metadata}), 200) @@ -2243,7 +2426,7 @@ class GetTools(Resource): tool.pop("_id") user_tools.append(tool) except Exception as err: - current_app.logger.error(f"Error getting user tools: {err}") + current_app.logger.error(f"Error getting user tools: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True, "tools": user_tools}), 200) @@ -2319,7 +2502,7 @@ class CreateTool(Resource): resp = user_tools_collection.insert_one(new_tool) new_id = str(resp.inserted_id) except Exception as err: - current_app.logger.error(f"Error creating tool: {err}") + current_app.logger.error(f"Error creating tool: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"id": new_id}), 200) @@ -2388,7 +2571,7 @@ class UpdateTool(Resource): {"$set": update_data}, ) except Exception as err: - current_app.logger.error(f"Error updating tool: {err}") + current_app.logger.error(f"Error updating tool: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -2425,7 +2608,7 @@ class UpdateToolConfig(Resource): {"$set": {"config": data["config"]}}, ) except Exception as err: - current_app.logger.error(f"Error updating tool config: {err}") + current_app.logger.error(f"Error updating tool config: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -2464,7 +2647,7 @@ class UpdateToolActions(Resource): {"$set": {"actions": data["actions"]}}, ) except Exception as err: - current_app.logger.error(f"Error updating tool actions: {err}") + current_app.logger.error(f"Error updating tool actions: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -2501,7 +2684,7 @@ class UpdateToolStatus(Resource): {"$set": {"status": data["status"]}}, ) except Exception as err: - current_app.logger.error(f"Error updating tool status: {err}") + current_app.logger.error(f"Error updating tool status: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @@ -2534,7 +2717,7 @@ class DeleteTool(Resource): if result.deleted_count == 0: return {"success": False, "message": "Tool not found"}, 404 except Exception as err: - current_app.logger.error(f"Error deleting tool: {err}") + current_app.logger.error(f"Error deleting tool: {err}", exc_info=True) return {"success": False}, 400 return {"success": True}, 200 @@ -2585,7 +2768,7 @@ class GetChunks(Resource): ) except Exception as e: - current_app.logger.error(f"Error getting chunks: {e}") + current_app.logger.error(f"Error getting chunks: {e}", exc_info=True) return make_response(jsonify({"success": False}), 500) @@ -2639,7 +2822,7 @@ class AddChunk(Resource): 201, ) except Exception as e: - current_app.logger.error(f"Error adding chunk: {e}") + current_app.logger.error(f"Error adding chunk: {e}", exc_info=True) return make_response(jsonify({"success": False}), 500) @@ -2679,7 +2862,7 @@ class DeleteChunk(Resource): 404, ) except Exception as e: - current_app.logger.error(f"Error deleting chunk: {e}") + current_app.logger.error(f"Error deleting chunk: {e}", exc_info=True) return make_response(jsonify({"success": False}), 500) @@ -2761,7 +2944,7 @@ class UpdateChunk(Resource): 200, ) except Exception as e: - current_app.logger.error(f"Error updating chunk: {e}") + current_app.logger.error(f"Error updating chunk: {e}", exc_info=True) return make_response(jsonify({"success": False}), 500) @@ -2781,7 +2964,6 @@ class StoreAttachment(Resource): if not decoded_token: return make_response(jsonify({"success": False}), 401) - # Get single file instead of list file = request.files.get("file") if not file or file.filename == "": @@ -2795,27 +2977,18 @@ class StoreAttachment(Resource): try: attachment_id = ObjectId() original_filename = secure_filename(file.filename) + relative_path = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{str(attachment_id)}/{original_filename}" - save_dir = os.path.join( - current_dir, - settings.UPLOAD_FOLDER, - user, - "attachments", - str(attachment_id), - ) - os.makedirs(save_dir, exist_ok=True) + file_content = file.read() - file_path = os.path.join(save_dir, original_filename) - - file.save(file_path) file_info = { "filename": original_filename, "attachment_id": str(attachment_id), + "path": relative_path, + "file_content": file_content, } - current_app.logger.info(f"Saved file: {file_path}") - # Start async task to process single file - task = store_attachment.delay(save_dir, file_info, user) + task = store_attachment.delay(file_info, user) return make_response( jsonify( @@ -2827,7 +3000,6 @@ class StoreAttachment(Resource): ), 200, ) - except Exception as err: - current_app.logger.error(f"Error storing attachment: {err}") + current_app.logger.error(f"Error storing attachment: {err}", exc_info=True) return make_response(jsonify({"success": False, "error": str(err)}), 400) diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index 24cff3c6..fffa9ba9 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -1,7 +1,13 @@ from datetime import timedelta from application.celery_init import celery -from application.worker import ingest_worker, remote_worker, sync_worker, attachment_worker +from application.worker import ( + agent_webhook_worker, + attachment_worker, + ingest_worker, + remote_worker, + sync_worker, +) @celery.task(bind=True) @@ -23,8 +29,14 @@ def schedule_syncs(self, frequency): @celery.task(bind=True) -def store_attachment(self, directory, saved_files, user): - resp = attachment_worker(self, directory, saved_files, user) +def store_attachment(self, file_info, user): + resp = attachment_worker(self, file_info, user) + return resp + + +@celery.task(bind=True) +def process_agent_webhook(self, agent_id, payload): + resp = agent_webhook_worker(self, agent_id, payload) return resp diff --git a/application/cache.py b/application/cache.py index 117b444a..3fdb6b8d 100644 --- a/application/cache.py +++ b/application/cache.py @@ -61,14 +61,14 @@ def gen_cache(func): if cached_response: return cached_response.decode("utf-8") except Exception as e: - logger.error(f"Error getting cached response: {e}") + logger.error(f"Error getting cached response: {e}", exc_info=True) result = func(self, model, messages, stream, tools, *args, **kwargs) if redis_client and isinstance(result, str): try: redis_client.set(cache_key, result, ex=1800) except Exception as e: - logger.error(f"Error setting cache: {e}") + logger.error(f"Error setting cache: {e}", exc_info=True) return result @@ -100,7 +100,7 @@ def stream_cache(func): time.sleep(0.03) # Simulate streaming delay return except Exception as e: - logger.error(f"Error getting cached stream: {e}") + logger.error(f"Error getting cached stream: {e}", exc_info=True) stream_cache_data = [] for chunk in func(self, model, messages, stream, tools, *args, **kwargs): @@ -112,6 +112,6 @@ def stream_cache(func): redis_client.set(cache_key, json.dumps(stream_cache_data), ex=1800) logger.info(f"Stream cache saved for key: {cache_key}") except Exception as e: - logger.error(f"Error setting stream cache: {e}") + logger.error(f"Error setting stream cache: {e}", exc_info=True) return wrapper diff --git a/application/core/settings.py b/application/core/settings.py index 74bffe53..3be34242 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -19,6 +19,7 @@ class Settings(BaseSettings): CELERY_BROKER_URL: str = "redis://localhost:6379/0" CELERY_RESULT_BACKEND: str = "redis://localhost:6379/1" MONGO_URI: str = "mongodb://localhost:27017/docsgpt" + MONGO_DB_NAME: str = "docsgpt" MODEL_PATH: str = os.path.join(current_dir, "models/docsgpt-7b-f16.gguf") DEFAULT_MAX_HISTORY: int = 150 MODEL_TOKEN_LIMITS: dict = { @@ -98,6 +99,8 @@ class Settings(BaseSettings): BRAVE_SEARCH_API_KEY: Optional[str] = None FLASK_DEBUG_MODE: bool = False + STORAGE_TYPE: str = "local" # local or s3 + JWT_SECRET_KEY: str = "" diff --git a/application/llm/google_ai.py b/application/llm/google_ai.py index c049eaa2..b749431b 100644 --- a/application/llm/google_ai.py +++ b/application/llm/google_ai.py @@ -1,11 +1,11 @@ from google import genai from google.genai import types -import os import logging -import mimetypes import json from application.llm.base import BaseLLM +from application.storage.storage_creator import StorageCreator +from application.core.settings import settings class GoogleLLM(BaseLLM): @@ -14,11 +14,12 @@ class GoogleLLM(BaseLLM): self.api_key = api_key self.user_api_key = user_api_key self.client = genai.Client(api_key=self.api_key) + self.storage = StorageCreator.get_storage() def get_supported_attachment_types(self): """ Return a list of MIME types supported by Google Gemini for file uploads. - + Returns: list: List of supported MIME types """ @@ -30,35 +31,35 @@ class GoogleLLM(BaseLLM): 'image/webp', 'image/gif' ] - + def prepare_messages_with_attachments(self, messages, attachments=None): """ Process attachments using Google AI's file API for more efficient handling. - + Args: messages (list): List of message dictionaries. attachments (list): List of attachment dictionaries with content and metadata. - + Returns: list: Messages formatted with file references for Google AI API. """ if not attachments: return messages - + prepared_messages = messages.copy() - + # Find the user message to attach files to the last one user_message_index = None for i in range(len(prepared_messages) - 1, -1, -1): if prepared_messages[i].get("role") == "user": user_message_index = i break - + if user_message_index is None: user_message = {"role": "user", "content": []} prepared_messages.append(user_message) user_message_index = len(prepared_messages) - 1 - + if isinstance(prepared_messages[user_message_index].get("content"), str): text_content = prepared_messages[user_message_index]["content"] prepared_messages[user_message_index]["content"] = [ @@ -66,84 +67,71 @@ class GoogleLLM(BaseLLM): ] elif not isinstance(prepared_messages[user_message_index].get("content"), list): prepared_messages[user_message_index]["content"] = [] - + files = [] for attachment in attachments: mime_type = attachment.get('mime_type') - if not mime_type: - file_path = attachment.get('path') - if file_path: - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - + if mime_type in self.get_supported_attachment_types(): try: file_uri = self._upload_file_to_google(attachment) logging.info(f"GoogleLLM: Successfully uploaded file, got URI: {file_uri}") files.append({"file_uri": file_uri, "mime_type": mime_type}) except Exception as e: - logging.error(f"GoogleLLM: Error uploading file: {e}") + logging.error(f"GoogleLLM: Error uploading file: {e}", exc_info=True) if 'content' in attachment: prepared_messages[user_message_index]["content"].append({ - "type": "text", + "type": "text", "text": f"[File could not be processed: {attachment.get('path', 'unknown')}]" }) - + if files: logging.info(f"GoogleLLM: Adding {len(files)} files to message") prepared_messages[user_message_index]["content"].append({ "files": files }) - + return prepared_messages def _upload_file_to_google(self, attachment): """ Upload a file to Google AI and return the file URI. - + Args: attachment (dict): Attachment dictionary with path and metadata. - + Returns: str: Google AI file URI for the uploaded file. """ if 'google_file_uri' in attachment: return attachment['google_file_uri'] - + file_path = attachment.get('path') if not file_path: raise ValueError("No file path provided in attachment") - - if not os.path.isabs(file_path): - current_dir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ) - file_path = os.path.join(current_dir, "application", file_path) - - if not os.path.exists(file_path): + + if not self.storage.file_exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") - - mime_type = attachment.get('mime_type') - if not mime_type: - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - + try: - response = self.client.files.upload(file=file_path) - - file_uri = response.uri - + file_uri = self.storage.process_file( + file_path, + lambda local_path, **kwargs: self.client.files.upload(file=local_path).uri + ) + from application.core.mongo_db import MongoDB mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] attachments_collection = db["attachments"] if '_id' in attachment: attachments_collection.update_one( {"_id": attachment['_id']}, {"$set": {"google_file_uri": file_uri}} ) - + return file_uri except Exception as e: - logging.error(f"Error uploading file to Google AI: {e}") + logging.error(f"Error uploading file to Google AI: {e}", exc_info=True) raise def _clean_messages_google(self, messages): @@ -289,7 +277,7 @@ class GoogleLLM(BaseLLM): if tools: cleaned_tools = self._clean_tools_format(tools) config.tools = cleaned_tools - + # Check if we have both tools and file attachments has_attachments = False for message in messages: @@ -299,16 +287,16 @@ class GoogleLLM(BaseLLM): break if has_attachments: break - + logging.info(f"GoogleLLM: Starting stream generation. Model: {model}, Messages: {json.dumps(messages, default=str)}, Has attachments: {has_attachments}") - + response = client.models.generate_content_stream( model=model, contents=messages, config=config, ) - - + + for chunk in response: if hasattr(chunk, "candidates") and chunk.candidates: for candidate in chunk.candidates: diff --git a/application/llm/openai.py b/application/llm/openai.py index 75bd37e0..c918768d 100644 --- a/application/llm/openai.py +++ b/application/llm/openai.py @@ -1,11 +1,10 @@ import json import base64 -import os -import mimetypes import logging from application.core.settings import settings from application.llm.base import BaseLLM +from application.storage.storage_creator import StorageCreator class OpenAILLM(BaseLLM): @@ -20,6 +19,7 @@ class OpenAILLM(BaseLLM): self.client = OpenAI(api_key=api_key) self.api_key = api_key self.user_api_key = user_api_key + self.storage = StorageCreator.get_storage() def _clean_messages_openai(self, messages): cleaned_messages = [] @@ -77,6 +77,8 @@ class OpenAILLM(BaseLLM): content_parts.append(item) elif "type" in item and item["type"] == "file" and "file" in item: content_parts.append(item) + elif "type" in item and item["type"] == "image_url" and "image_url" in item: + content_parts.append(item) cleaned_messages.append({"role": role, "content": content_parts}) else: raise ValueError( @@ -149,7 +151,7 @@ class OpenAILLM(BaseLLM): def get_supported_attachment_types(self): """ Return a list of MIME types supported by OpenAI for file uploads. - + Returns: list: List of supported MIME types """ @@ -161,35 +163,35 @@ class OpenAILLM(BaseLLM): 'image/webp', 'image/gif' ] - + def prepare_messages_with_attachments(self, messages, attachments=None): """ Process attachments using OpenAI's file API for more efficient handling. - + Args: messages (list): List of message dictionaries. attachments (list): List of attachment dictionaries with content and metadata. - + Returns: list: Messages formatted with file references for OpenAI API. """ if not attachments: return messages - + prepared_messages = messages.copy() - + # Find the user message to attach file_id to the last one user_message_index = None for i in range(len(prepared_messages) - 1, -1, -1): if prepared_messages[i].get("role") == "user": user_message_index = i break - + if user_message_index is None: user_message = {"role": "user", "content": []} prepared_messages.append(user_message) user_message_index = len(prepared_messages) - 1 - + if isinstance(prepared_messages[user_message_index].get("content"), str): text_content = prepared_messages[user_message_index]["content"] prepared_messages[user_message_index]["content"] = [ @@ -197,14 +199,10 @@ class OpenAILLM(BaseLLM): ] elif not isinstance(prepared_messages[user_message_index].get("content"), list): prepared_messages[user_message_index]["content"] = [] - + for attachment in attachments: mime_type = attachment.get('mime_type') - if not mime_type: - file_path = attachment.get('path') - if file_path: - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - + if mime_type and mime_type.startswith('image/'): try: base64_image = self._get_base64_image(attachment) @@ -215,119 +213,102 @@ class OpenAILLM(BaseLLM): } }) except Exception as e: - logging.error(f"Error processing image attachment: {e}") + logging.error(f"Error processing image attachment: {e}", exc_info=True) if 'content' in attachment: prepared_messages[user_message_index]["content"].append({ - "type": "text", + "type": "text", "text": f"[Image could not be processed: {attachment.get('path', 'unknown')}]" }) # Handle PDFs using the file API elif mime_type == 'application/pdf': try: file_id = self._upload_file_to_openai(attachment) - prepared_messages[user_message_index]["content"].append({ "type": "file", "file": {"file_id": file_id} }) except Exception as e: - logging.error(f"Error uploading PDF to OpenAI: {e}") + logging.error(f"Error uploading PDF to OpenAI: {e}", exc_info=True) if 'content' in attachment: prepared_messages[user_message_index]["content"].append({ - "type": "text", + "type": "text", "text": f"File content:\n\n{attachment['content']}" }) - + return prepared_messages def _get_base64_image(self, attachment): """ Convert an image file to base64 encoding. - + Args: attachment (dict): Attachment dictionary with path and metadata. - + Returns: str: Base64-encoded image data. """ file_path = attachment.get('path') if not file_path: raise ValueError("No file path provided in attachment") - - if not os.path.isabs(file_path): - current_dir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ) - file_path = os.path.join(current_dir, "application", file_path) - - if not os.path.exists(file_path): - raise FileNotFoundError(f"File not found: {file_path}") - - with open(file_path, "rb") as image_file: - return base64.b64encode(image_file.read()).decode('utf-8') - def _upload_file_to_openai(self, attachment): ##pdfs + try: + with self.storage.get_file(file_path) as image_file: + return base64.b64encode(image_file.read()).decode('utf-8') + except FileNotFoundError: + raise FileNotFoundError(f"File not found: {file_path}") + + def _upload_file_to_openai(self, attachment): """ Upload a file to OpenAI and return the file_id. - + Args: attachment (dict): Attachment dictionary with path and metadata. Expected keys: - path: Path to the file - id: Optional MongoDB ID for caching - + Returns: str: OpenAI file_id for the uploaded file. """ - import os import logging - + if 'openai_file_id' in attachment: return attachment['openai_file_id'] - + file_path = attachment.get('path') - if not file_path: - raise ValueError("No file path provided in attachment") - - if not os.path.isabs(file_path): - current_dir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ) - file_path = os.path.join(current_dir,"application", file_path) - - if not os.path.exists(file_path): + + if not self.storage.file_exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") - try: - with open(file_path, 'rb') as file: - response = self.client.files.create( - file=file, + file_id = self.storage.process_file( + file_path, + lambda local_path, **kwargs: self.client.files.create( + file=open(local_path, 'rb'), purpose="assistants" - ) - - file_id = response.id - + ).id + ) + from application.core.mongo_db import MongoDB mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] attachments_collection = db["attachments"] if '_id' in attachment: attachments_collection.update_one( {"_id": attachment['_id']}, {"$set": {"openai_file_id": file_id}} ) - + return file_id except Exception as e: - logging.error(f"Error uploading file to OpenAI: {e}") + logging.error(f"Error uploading file to OpenAI: {e}", exc_info=True) raise class AzureOpenAILLM(OpenAILLM): def __init__( - self, api_key, user_api_key, *args, **kwargs + self, api_key, user_api_key, *args, **kwargs ): super().__init__(api_key) diff --git a/application/logging.py b/application/logging.py index 1dd0d557..d48fb17e 100644 --- a/application/logging.py +++ b/application/logging.py @@ -7,6 +7,7 @@ import uuid from typing import Any, Callable, Dict, Generator, List from application.core.mongo_db import MongoDB +from application.core.settings import settings logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" @@ -29,6 +30,8 @@ def build_stack_data( exclude_attributes: List[str] = None, custom_data: Dict = None, ) -> Dict: + if obj is None: + raise ValueError("The 'obj' parameter cannot be None") data = {} if include_attributes is None: include_attributes = [] @@ -56,8 +59,8 @@ def build_stack_data( data[attr_name] = [str(item) for item in attr_value] elif isinstance(attr_value, dict): data[attr_name] = {k: str(v) for k, v in attr_value.items()} - else: - data[attr_name] = str(attr_value) + except AttributeError as e: + logging.warning(f"AttributeError while accessing {attr_name}: {e}") except AttributeError: pass if custom_data: @@ -131,7 +134,7 @@ def _log_to_mongodb( ) -> None: try: mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] user_logs_collection = db["stack_logs"] log_entry = { @@ -148,4 +151,4 @@ def _log_to_mongodb( logging.debug(f"Logged activity to MongoDB: {activity_id}") except Exception as e: - logging.error(f"Failed to log to MongoDB: {e}") + logging.error(f"Failed to log to MongoDB: {e}", exc_info=True) diff --git a/application/parser/embedding_pipeline.py b/application/parser/embedding_pipeline.py index 0435cd14..87d9a8d5 100755 --- a/application/parser/embedding_pipeline.py +++ b/application/parser/embedding_pipeline.py @@ -19,7 +19,7 @@ def add_text_to_store_with_retry(store, doc, source_id): doc.metadata["source_id"] = str(source_id) store.add_texts([doc.page_content], metadatas=[doc.metadata]) except Exception as e: - logging.error(f"Failed to add document with retry: {e}") + logging.error(f"Failed to add document with retry: {e}", exc_info=True) raise @@ -75,7 +75,7 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status): # Add document to vector store add_text_to_store_with_retry(store, doc, source_id) except Exception as e: - logging.error(f"Error embedding document {idx}: {e}") + logging.error(f"Error embedding document {idx}: {e}", exc_info=True) logging.info(f"Saving progress at document {idx} out of {total_docs}") store.save_local(folder_name) break diff --git a/application/parser/remote/crawler_loader.py b/application/parser/remote/crawler_loader.py index c2da230b..2ff6cf6f 100644 --- a/application/parser/remote/crawler_loader.py +++ b/application/parser/remote/crawler_loader.py @@ -1,3 +1,4 @@ +import logging import requests from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup @@ -42,7 +43,7 @@ class CrawlerLoader(BaseRemote): ) ) except Exception as e: - print(f"Error processing URL {current_url}: {e}") + logging.error(f"Error processing URL {current_url}: {e}", exc_info=True) continue # Parse the HTML content to extract all links @@ -61,4 +62,4 @@ class CrawlerLoader(BaseRemote): if self.limit is not None and len(visited_urls) >= self.limit: break - return loaded_content \ No newline at end of file + return loaded_content diff --git a/application/parser/remote/sitemap_loader.py b/application/parser/remote/sitemap_loader.py index 8066f4f6..6d54ea9b 100644 --- a/application/parser/remote/sitemap_loader.py +++ b/application/parser/remote/sitemap_loader.py @@ -1,3 +1,4 @@ +import logging import requests import re # Import regular expression library import xml.etree.ElementTree as ET @@ -32,7 +33,7 @@ class SitemapLoader(BaseRemote): documents.extend(loader.load()) processed_urls += 1 # Increment the counter after processing each URL except Exception as e: - print(f"Error processing URL {url}: {e}") + logging.error(f"Error processing URL {url}: {e}", exc_info=True) continue return documents diff --git a/application/parser/remote/web_loader.py b/application/parser/remote/web_loader.py index cc1cdcb8..77cb145b 100644 --- a/application/parser/remote/web_loader.py +++ b/application/parser/remote/web_loader.py @@ -1,3 +1,4 @@ +import logging from application.parser.remote.base import BaseRemote from application.parser.schema.base import Document from langchain_community.document_loaders import WebBaseLoader @@ -39,6 +40,6 @@ class WebLoader(BaseRemote): ) ) except Exception as e: - print(f"Error processing URL {url}: {e}") + logging.error(f"Error processing URL {url}: {e}", exc_info=True) continue - return documents \ No newline at end of file + return documents diff --git a/application/retriever/classic_rag.py b/application/retriever/classic_rag.py index 08771337..b8ac69e4 100644 --- a/application/retriever/classic_rag.py +++ b/application/retriever/classic_rag.py @@ -1,3 +1,4 @@ +import logging from application.core.settings import settings from application.llm.llm_creator import LLMCreator from application.retriever.base import BaseRetriever @@ -72,7 +73,7 @@ class ClassicRAG(BaseRetriever): print(f"Rephrased query: {rephrased_query}") return rephrased_query if rephrased_query else self.original_question except Exception as e: - print(f"Error rephrasing query: {e}") + logging.error(f"Error rephrasing query: {e}", exc_info=True) return self.original_question def _get_data(self): diff --git a/application/storage/base.py b/application/storage/base.py new file mode 100644 index 00000000..273e7761 --- /dev/null +++ b/application/storage/base.py @@ -0,0 +1,94 @@ +"""Base storage class for file system abstraction.""" +from abc import ABC, abstractmethod +from typing import BinaryIO, List, Callable + + +class BaseStorage(ABC): + """Abstract base class for storage implementations.""" + + @abstractmethod + def save_file(self, file_data: BinaryIO, path: str) -> dict: + """ + Save a file to storage. + + Args: + file_data: File-like object containing the data + path: Path where the file should be stored + + Returns: + dict: A dictionary containing metadata about the saved file, including: + - 'path': The path where the file was saved + - 'storage_type': The type of storage (e.g., 'local', 's3') + - Other storage-specific metadata (e.g., 'uri', 'bucket_name', etc.) + """ + pass + + @abstractmethod + def get_file(self, path: str) -> BinaryIO: + """ + Retrieve a file from storage. + + Args: + path: Path to the file + + Returns: + BinaryIO: File-like object containing the file data + """ + pass + + @abstractmethod + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + This method handles the details of retrieving the file and providing + it to the processor function in an appropriate way based on the storage type. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + pass + + @abstractmethod + def delete_file(self, path: str) -> bool: + """ + Delete a file from storage. + + Args: + path: Path to the file + + Returns: + bool: True if deletion was successful + """ + pass + + @abstractmethod + def file_exists(self, path: str) -> bool: + """ + Check if a file exists. + + Args: + path: Path to the file + + Returns: + bool: True if the file exists + """ + pass + + @abstractmethod + def list_files(self, directory: str) -> List[str]: + """ + List all files in a directory. + + Args: + directory: Directory path to list + + Returns: + List[str]: List of file paths + """ + pass diff --git a/application/storage/local.py b/application/storage/local.py new file mode 100644 index 00000000..fb21f08d --- /dev/null +++ b/application/storage/local.py @@ -0,0 +1,103 @@ +"""Local file system implementation.""" +import os +import shutil +from typing import BinaryIO, List, Callable + +from application.storage.base import BaseStorage + + +class LocalStorage(BaseStorage): + """Local file system storage implementation.""" + + def __init__(self, base_dir: str = None): + """ + Initialize local storage. + + Args: + base_dir: Base directory for all operations. If None, uses current directory. + """ + self.base_dir = base_dir or os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) + + def _get_full_path(self, path: str) -> str: + """Get absolute path by combining base_dir and path.""" + if os.path.isabs(path): + return path + return os.path.join(self.base_dir, path) + + def save_file(self, file_data: BinaryIO, path: str) -> dict: + """Save a file to local storage.""" + full_path = self._get_full_path(path) + + os.makedirs(os.path.dirname(full_path), exist_ok=True) + + if hasattr(file_data, 'save'): + file_data.save(full_path) + else: + with open(full_path, 'wb') as f: + shutil.copyfileobj(file_data, f) + + return { + 'storage_type': 'local' + } + + def get_file(self, path: str) -> BinaryIO: + """Get a file from local storage.""" + full_path = self._get_full_path(path) + + if not os.path.exists(full_path): + raise FileNotFoundError(f"File not found: {full_path}") + + return open(full_path, 'rb') + + def delete_file(self, path: str) -> bool: + """Delete a file from local storage.""" + full_path = self._get_full_path(path) + + if not os.path.exists(full_path): + return False + + os.remove(full_path) + return True + + def file_exists(self, path: str) -> bool: + """Check if a file exists in local storage.""" + full_path = self._get_full_path(path) + return os.path.exists(full_path) + + def list_files(self, directory: str) -> List[str]: + """List all files in a directory in local storage.""" + full_path = self._get_full_path(directory) + + if not os.path.exists(full_path): + return [] + + result = [] + for root, _, files in os.walk(full_path): + for file in files: + rel_path = os.path.relpath(os.path.join(root, file), self.base_dir) + result.append(rel_path) + + return result + + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + For local storage, we can directly pass the full path to the processor. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + full_path = self._get_full_path(path) + + if not os.path.exists(full_path): + raise FileNotFoundError(f"File not found: {full_path}") + + return processor_func(local_path=full_path, **kwargs) diff --git a/application/storage/s3.py b/application/storage/s3.py new file mode 100644 index 00000000..abc57c6d --- /dev/null +++ b/application/storage/s3.py @@ -0,0 +1,120 @@ +"""S3 storage implementation.""" +import io +from typing import BinaryIO, List, Callable +import os + +import boto3 +from botocore.exceptions import ClientError + +from application.storage.base import BaseStorage +from application.core.settings import settings + + +class S3Storage(BaseStorage): + """AWS S3 storage implementation.""" + + def __init__(self, bucket_name=None): + """ + Initialize S3 storage. + + Args: + bucket_name: S3 bucket name (optional, defaults to settings) + """ + self.bucket_name = bucket_name or getattr(settings, "S3_BUCKET_NAME", "docsgpt-test-bucket") + + # Get credentials from settings + aws_access_key_id = getattr(settings, "SAGEMAKER_ACCESS_KEY", None) + aws_secret_access_key = getattr(settings, "SAGEMAKER_SECRET_KEY", None) + region_name = getattr(settings, "SAGEMAKER_REGION", None) + + self.s3 = boto3.client( + 's3', + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name + ) + + def save_file(self, file_data: BinaryIO, path: str) -> dict: + """Save a file to S3 storage.""" + self.s3.upload_fileobj(file_data, self.bucket_name, path) + + region = getattr(settings, "SAGEMAKER_REGION", None) + + return { + 'storage_type': 's3', + 'bucket_name': self.bucket_name, + 'uri': f's3://{self.bucket_name}/{path}', + 'region': region + } + + def get_file(self, path: str) -> BinaryIO: + """Get a file from S3 storage.""" + if not self.file_exists(path): + raise FileNotFoundError(f"File not found: {path}") + + file_obj = io.BytesIO() + self.s3.download_fileobj(self.bucket_name, path, file_obj) + file_obj.seek(0) + return file_obj + + def delete_file(self, path: str) -> bool: + """Delete a file from S3 storage.""" + try: + self.s3.delete_object(Bucket=self.bucket_name, Key=path) + return True + except ClientError: + return False + + def file_exists(self, path: str) -> bool: + """Check if a file exists in S3 storage.""" + try: + self.s3.head_object(Bucket=self.bucket_name, Key=path) + return True + except ClientError: + return False + + def list_files(self, directory: str) -> List[str]: + """List all files in a directory in S3 storage.""" + # Ensure directory ends with a slash if it's not empty + if directory and not directory.endswith('/'): + directory += '/' + + result = [] + paginator = self.s3.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=self.bucket_name, Prefix=directory) + + for page in pages: + if 'Contents' in page: + for obj in page['Contents']: + result.append(obj['Key']) + + return result + + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + import tempfile + import logging + + if not self.file_exists(path): + raise FileNotFoundError(f"File not found in S3: {path}") + + with tempfile.NamedTemporaryFile(suffix=os.path.splitext(path)[1], delete=True) as temp_file: + try: + # Download the file from S3 to the temporary file + self.s3.download_fileobj(self.bucket_name, path, temp_file) + temp_file.flush() + + return processor_func(local_path=temp_file.name, **kwargs) + except Exception as e: + logging.error(f"Error processing S3 file {path}: {e}", exc_info=True) + raise diff --git a/application/storage/storage_creator.py b/application/storage/storage_creator.py new file mode 100644 index 00000000..3eca2f47 --- /dev/null +++ b/application/storage/storage_creator.py @@ -0,0 +1,32 @@ +"""Storage factory for creating different storage implementations.""" +from typing import Dict, Type + +from application.storage.base import BaseStorage +from application.storage.local import LocalStorage +from application.storage.s3 import S3Storage +from application.core.settings import settings + + +class StorageCreator: + storages: Dict[str, Type[BaseStorage]] = { + "local": LocalStorage, + "s3": S3Storage, + } + + _instance = None + + @classmethod + def get_storage(cls) -> BaseStorage: + if cls._instance is None: + storage_type = getattr(settings, "STORAGE_TYPE", "local") + cls._instance = cls.create_storage(storage_type) + + return cls._instance + + @classmethod + def create_storage(cls, type_name: str, *args, **kwargs) -> BaseStorage: + storage_class = cls.storages.get(type_name.lower()) + if not storage_class: + raise ValueError(f"No storage implementation found for type {type_name}") + + return storage_class(*args, **kwargs) diff --git a/application/usage.py b/application/usage.py index 85328c1f..46620fff 100644 --- a/application/usage.py +++ b/application/usage.py @@ -2,10 +2,11 @@ import sys from datetime import datetime from application.core.mongo_db import MongoDB +from application.core.settings import settings from application.utils import num_tokens_from_object_or_list, num_tokens_from_string mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] usage_collection = db["token_usage"] diff --git a/application/vectorstore/faiss.py b/application/vectorstore/faiss.py index 87ffcccb..ce455bd8 100644 --- a/application/vectorstore/faiss.py +++ b/application/vectorstore/faiss.py @@ -1,17 +1,19 @@ import os +import tempfile from langchain_community.vectorstores import FAISS from application.core.settings import settings from application.parser.schema.base import Document from application.vectorstore.base import BaseVectorStore +from application.storage.storage_creator import StorageCreator def get_vectorstore(path: str) -> str: if path: - vectorstore = os.path.join("application", "indexes", path) + vectorstore = f"indexes/{path}" else: - vectorstore = os.path.join("application") + vectorstore = "indexes" return vectorstore @@ -21,16 +23,36 @@ class FaissStore(BaseVectorStore): self.source_id = source_id self.path = get_vectorstore(source_id) self.embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) + self.storage = StorageCreator.get_storage() try: if docs_init: self.docsearch = FAISS.from_documents(docs_init, self.embeddings) else: - self.docsearch = FAISS.load_local( - self.path, self.embeddings, allow_dangerous_deserialization=True - ) - except Exception: - raise + with tempfile.TemporaryDirectory() as temp_dir: + faiss_path = f"{self.path}/index.faiss" + pkl_path = f"{self.path}/index.pkl" + + if not self.storage.file_exists(faiss_path) or not self.storage.file_exists(pkl_path): + raise FileNotFoundError(f"Index files not found in storage at {self.path}") + + faiss_file = self.storage.get_file(faiss_path) + pkl_file = self.storage.get_file(pkl_path) + + local_faiss_path = os.path.join(temp_dir, "index.faiss") + local_pkl_path = os.path.join(temp_dir, "index.pkl") + + with open(local_faiss_path, 'wb') as f: + f.write(faiss_file.read()) + + with open(local_pkl_path, 'wb') as f: + f.write(pkl_file.read()) + + self.docsearch = FAISS.load_local( + temp_dir, self.embeddings, allow_dangerous_deserialization=True + ) + except Exception as e: + raise Exception(f"Error loading FAISS index: {str(e)}") self.assert_embedding_dimensions(self.embeddings) diff --git a/application/vectorstore/mongodb.py b/application/vectorstore/mongodb.py index 94b757e0..aadd4652 100644 --- a/application/vectorstore/mongodb.py +++ b/application/vectorstore/mongodb.py @@ -1,3 +1,4 @@ +import logging from application.core.settings import settings from application.vectorstore.base import BaseVectorStore from application.vectorstore.document_class import Document @@ -146,7 +147,7 @@ class MongoDBVectorStore(BaseVectorStore): return chunks except Exception as e: - print(f"Error getting chunks: {e}") + logging.error(f"Error getting chunks: {e}", exc_info=True) return [] def add_chunk(self, text, metadata=None): @@ -172,5 +173,5 @@ class MongoDBVectorStore(BaseVectorStore): result = self._collection.delete_one({"_id": object_id}) return result.deleted_count > 0 except Exception as e: - print(f"Error deleting chunk: {e}") + logging.error(f"Error deleting chunk: {e}", exc_info=True) return False diff --git a/application/worker.py b/application/worker.py index bbd422ac..619993c9 100755 --- a/application/worker.py +++ b/application/worker.py @@ -1,25 +1,38 @@ +import datetime +import io +import json import logging +import mimetypes import os import shutil import string +import tempfile import zipfile + from collections import Counter from urllib.parse import urljoin import requests +from bson.dbref import DBRef from bson.objectid import ObjectId +from application.agents.agent_creator import AgentCreator +from application.api.answer.routes import get_prompt + from application.core.mongo_db import MongoDB from application.core.settings import settings -from application.parser.file.bulk import SimpleDirectoryReader +from application.parser.chunking import Chunker from application.parser.embedding_pipeline import embed_and_store_documents +from application.parser.file.bulk import SimpleDirectoryReader from application.parser.remote.remote_creator import RemoteCreator from application.parser.schema.base import Document -from application.parser.chunking import Chunker -from application.utils import count_tokens_docs +from application.retriever.retriever_creator import RetrieverCreator + +from application.storage.storage_creator import StorageCreator +from application.utils import count_tokens_docs, num_tokens_from_string mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] sources_collection = db["sources"] # Constants @@ -27,18 +40,22 @@ MIN_TOKENS = 150 MAX_TOKENS = 1250 RECURSION_DEPTH = 2 + # Define a function to extract metadata from a given filename. def metadata_from_filename(title): return {"title": title} + # Define a function to generate a random string of a given length. def generate_random_string(length): return "".join([string.ascii_letters[i % 52] for i in range(length)]) + current_dir = os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) + def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): """ Recursively extract zip files with a limit on recursion depth. @@ -58,7 +75,7 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): zip_ref.extractall(extract_to) os.remove(zip_path) # Remove the zip file after extracting except Exception as e: - logging.error(f"Error extracting zip file {zip_path}: {e}") + logging.error(f"Error extracting zip file {zip_path}: {e}", exc_info=True) return # Check for nested zip files and extract them @@ -69,6 +86,7 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5): file_path = os.path.join(root, file) extract_zip_recursive(file_path, root, current_depth + 1, max_depth) + def download_file(url, params, dest_path): try: response = requests.get(url, params=params) @@ -79,6 +97,7 @@ def download_file(url, params, dest_path): logging.error(f"Error downloading file: {e}") raise + def upload_index(full_path, file_data): try: if settings.VECTOR_STORE == "faiss": @@ -87,7 +106,9 @@ def upload_index(full_path, file_data): "file_pkl": open(full_path + "/index.pkl", "rb"), } response = requests.post( - urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data + urljoin(settings.API_URL, "/api/upload_index"), + files=files, + data=file_data, ) else: response = requests.post( @@ -102,6 +123,76 @@ def upload_index(full_path, file_data): for file in files.values(): file.close() + +def run_agent_logic(agent_config, input_data): + try: + source = agent_config.get("source") + retriever = agent_config.get("retriever", "classic") + if isinstance(source, DBRef): + source_doc = db.dereference(source) + source = str(source_doc["_id"]) + retriever = source_doc.get("retriever", agent_config.get("retriever")) + else: + source = {} + source = {"active_docs": source} + chunks = int(agent_config.get("chunks", 2)) + prompt_id = agent_config.get("prompt_id", "default") + user_api_key = agent_config["key"] + agent_type = agent_config.get("agent_type", "classic") + decoded_token = {"sub": agent_config.get("user")} + prompt = get_prompt(prompt_id) + agent = AgentCreator.create_agent( + agent_type, + endpoint="webhook", + llm_name=settings.LLM_NAME, + gpt_model=settings.MODEL_NAME, + api_key=settings.API_KEY, + user_api_key=user_api_key, + prompt=prompt, + chat_history=[], + decoded_token=decoded_token, + attachments=[], + ) + retriever = RetrieverCreator.create_retriever( + retriever, + source=source, + chat_history=[], + prompt=prompt, + chunks=chunks, + token_limit=settings.DEFAULT_MAX_HISTORY, + gpt_model=settings.MODEL_NAME, + user_api_key=user_api_key, + decoded_token=decoded_token, + ) + answer = agent.gen(query=input_data, retriever=retriever) + response_full = "" + thought = "" + source_log_docs = [] + tool_calls = [] + + for line in answer: + if "answer" in line: + response_full += str(line["answer"]) + elif "sources" in line: + source_log_docs.extend(line["sources"]) + elif "tool_calls" in line: + tool_calls.extend(line["tool_calls"]) + elif "thought" in line: + thought += line["thought"] + + result = { + "answer": response_full, + "sources": source_log_docs, + "tool_calls": tool_calls, + "thought": thought, + } + logging.info(f"Agent response: {result}") + return result + except Exception as e: + logging.error(f"Error in run_agent_logic: {e}", exc_info=True) + raise + + # Define the main function for ingesting and processing documents. def ingest_worker( self, directory, formats, name_job, filename, user, retriever="classic" @@ -126,62 +217,87 @@ def ingest_worker( limit = None exclude = True sample = False + + storage = StorageCreator.get_storage() + full_path = os.path.join(directory, user, name_job) + source_file_path = os.path.join(full_path, filename) logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job}) - file_data = {"name": name_job, "file": filename, "user": user} - if not os.path.exists(full_path): - os.makedirs(full_path) - download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename)) + # Create temporary working directory + with tempfile.TemporaryDirectory() as temp_dir: + try: + os.makedirs(temp_dir, exist_ok=True) - # check if file is .zip and extract it - if filename.endswith(".zip"): - extract_zip_recursive( - os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH - ) + # Download file from storage to temp directory + temp_file_path = os.path.join(temp_dir, filename) + file_data = storage.get_file(source_file_path) - self.update_state(state="PROGRESS", meta={"current": 1}) + with open(temp_file_path, "wb") as f: + f.write(file_data.read()) - raw_docs = SimpleDirectoryReader( - input_dir=full_path, - input_files=input_files, - recursive=recursive, - required_exts=formats, - num_files_limit=limit, - exclude_hidden=exclude, - file_metadata=metadata_from_filename, - ).load_data() + self.update_state(state="PROGRESS", meta={"current": 1}) - chunker = Chunker( - chunking_strategy="classic_chunk", - max_tokens=MAX_TOKENS, - min_tokens=MIN_TOKENS, - duplicate_headers=False - ) - raw_docs = chunker.chunk(documents=raw_docs) + # Handle zip files + if filename.endswith(".zip"): + logging.info(f"Extracting zip file: {filename}") + extract_zip_recursive( + temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH + ) - docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - id = ObjectId() + if sample: + logging.info(f"Sample mode enabled. Using {limit} documents.") - embed_and_store_documents(docs, full_path, id, self) - tokens = count_tokens_docs(docs) - self.update_state(state="PROGRESS", meta={"current": 100}) + reader = SimpleDirectoryReader( + input_dir=temp_dir, + input_files=input_files, + recursive=recursive, + required_exts=formats, + exclude_hidden=exclude, + file_metadata=metadata_from_filename, + ) + raw_docs = reader.load_data() - if sample: - for i in range(min(5, len(raw_docs))): - logging.info(f"Sample document {i}: {raw_docs[i]}") + chunker = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False, + ) + raw_docs = chunker.chunk(documents=raw_docs) - file_data.update({ - "tokens": tokens, - "retriever": retriever, - "id": str(id), - "type": "local", - }) - upload_index(full_path, file_data) + docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - # delete local - shutil.rmtree(full_path) + id = ObjectId() + + vector_store_path = os.path.join(temp_dir, "vector_store") + os.makedirs(vector_store_path, exist_ok=True) + + embed_and_store_documents(docs, vector_store_path, id, self) + + tokens = count_tokens_docs(docs) + + self.update_state(state="PROGRESS", meta={"current": 100}) + + if sample: + for i in range(min(5, len(raw_docs))): + logging.info(f"Sample document {i}: {raw_docs[i]}") + file_data = { + "name": name_job, + "file": filename, + "user": user, + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": "local", + } + + upload_index(vector_store_path, file_data) + + except Exception as e: + logging.error(f"Error in ingest_worker: {e}", exc_info=True) + raise return { "directory": directory, @@ -192,6 +308,7 @@ def ingest_worker( "limited": False, } + def remote_worker( self, source_data, @@ -203,7 +320,7 @@ def remote_worker( sync_frequency="never", operation_mode="upload", doc_id=None, -): +): full_path = os.path.join(directory, user, name_job) if not os.path.exists(full_path): os.makedirs(full_path) @@ -218,7 +335,7 @@ def remote_worker( chunking_strategy="classic_chunk", max_tokens=MAX_TOKENS, min_tokens=MIN_TOKENS, - duplicate_headers=False + duplicate_headers=False, ) docs = chunker.chunk(documents=raw_docs) docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] @@ -260,6 +377,7 @@ def remote_worker( logging.info("remote_worker task completed successfully") return {"urls": source_data, "name_job": name_job, "user": user, "limited": False} + def sync( self, source_data, @@ -285,10 +403,11 @@ def sync( doc_id, ) except Exception as e: - logging.error(f"Error during sync: {e}") + logging.error(f"Error during sync: {e}", exc_info=True) return {"status": "error", "error": str(e)} return {"status": "success"} + def sync_worker(self, frequency): sync_counts = Counter() sources = sources_collection.find() @@ -313,84 +432,135 @@ def sync_worker(self, frequency): for key in ["total_sync_count", "sync_success", "sync_failure"] } -def attachment_worker(self, directory, file_info, user): + +def attachment_worker(self, file_info, user): """ Process and store a single attachment without vectorization. - - Args: - self: Reference to the instance of the task. - directory (str): Base directory for storing files. - file_info (dict): Dictionary with folder and filename info. - user (str): User identifier. - - Returns: - dict: Information about processed attachment. """ - import datetime - import os - import mimetypes - from application.utils import num_tokens_from_string - + mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] attachments_collection = db["attachments"] - + filename = file_info["filename"] attachment_id = file_info["attachment_id"] - - logging.info(f"Processing attachment: {attachment_id}/{filename}", extra={"user": user}) - - self.update_state(state="PROGRESS", meta={"current": 10}) - - file_path = os.path.join(directory, filename) - - if not os.path.exists(file_path): - logging.warning(f"File not found: {file_path}", extra={"user": user}) - raise FileNotFoundError(f"File not found: {file_path}") - + relative_path = file_info["path"] + file_content = file_info["file_content"] + try: - reader = SimpleDirectoryReader( - input_files=[file_path] + self.update_state(state="PROGRESS", meta={"current": 10}) + storage_type = getattr(settings, "STORAGE_TYPE", "local") + storage = StorageCreator.create_storage(storage_type) + self.update_state( + state="PROGRESS", meta={"current": 30, "status": "Processing content"} ) - documents = reader.load_data() - - self.update_state(state="PROGRESS", meta={"current": 50}) - - if documents: + + with tempfile.NamedTemporaryFile( + suffix=os.path.splitext(filename)[1] + ) as temp_file: + temp_file.write(file_content) + temp_file.flush() + reader = SimpleDirectoryReader( + input_files=[temp_file.name], exclude_hidden=True, errors="ignore" + ) + documents = reader.load_data() + + if not documents: + logging.warning(f"No content extracted from file: {filename}") + raise ValueError(f"Failed to extract content from file: {filename}") + content = documents[0].text token_count = num_tokens_from_string(content) - - file_path_relative = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{attachment_id}/{filename}" - - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - + + self.update_state( + state="PROGRESS", meta={"current": 60, "status": "Saving file"} + ) + file_obj = io.BytesIO(file_content) + + metadata = storage.save_file(file_obj, relative_path) + + mime_type = mimetypes.guess_type(filename)[0] or "application/octet-stream" + + self.update_state( + state="PROGRESS", meta={"current": 80, "status": "Storing in database"} + ) + doc_id = ObjectId(attachment_id) - attachments_collection.insert_one({ - "_id": doc_id, - "user": user, - "path": file_path_relative, - "content": content, - "token_count": token_count, - "mime_type": mime_type, - "date": datetime.datetime.now(), - }) - - logging.info(f"Stored attachment with ID: {attachment_id}", - extra={"user": user}) - - self.update_state(state="PROGRESS", meta={"current": 100}) - + attachments_collection.insert_one( + { + "_id": doc_id, + "user": user, + "path": relative_path, + "content": content, + "token_count": token_count, + "mime_type": mime_type, + "date": datetime.datetime.now(), + "metadata": metadata, + } + ) + + logging.info( + f"Stored attachment with ID: {attachment_id}", extra={"user": user} + ) + + self.update_state( + state="PROGRESS", meta={"current": 100, "status": "Complete"} + ) + return { "filename": filename, - "path": file_path_relative, + "path": relative_path, "token_count": token_count, "attachment_id": attachment_id, - "mime_type": mime_type + "mime_type": mime_type, + "metadata": metadata, } - else: - logging.warning("No content was extracted from the file", - extra={"user": user}) - raise ValueError("No content was extracted from the file") + except Exception as e: - logging.error(f"Error processing file {filename}: {e}", extra={"user": user}, exc_info=True) + logging.error( + f"Error processing file {filename}: {e}", + extra={"user": user}, + exc_info=True, + ) raise + + +def agent_webhook_worker(self, agent_id, payload): + """ + Process the webhook payload for an agent. + + Args: + self: Reference to the instance of the task. + agent_id (str): Unique identifier for the agent. + payload (dict): The payload data from the webhook. + + Returns: + dict: Information about the processed webhook. + """ + mongo = MongoDB.get_client() + db = mongo["docsgpt"] + agents_collection = db["agents"] + + self.update_state(state="PROGRESS", meta={"current": 1}) + try: + agent_oid = ObjectId(agent_id) + agent_config = agents_collection.find_one({"_id": agent_oid}) + if not agent_config: + raise ValueError(f"Agent with ID {agent_id} not found.") + input_data = json.dumps(payload) + except Exception as e: + logging.error(f"Error processing agent webhook: {e}", exc_info=True) + return {"status": "error", "error": str(e)} + + self.update_state(state="PROGRESS", meta={"current": 50}) + try: + result = run_agent_logic(agent_config, input_data) + except Exception as e: + logging.error(f"Error running agent logic: {e}", exc_info=True) + return {"status": "error", "error": str(e)} + finally: + self.update_state(state="PROGRESS", meta={"current": 100}) + logging.info( + f"Webhook processed for agent {agent_id}", extra={"agent_id": agent_id} + ) + return {"status": "success", "result": result} diff --git a/frontend/package-lock.json b/frontend/package-lock.json index b314a488..58e9c924 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -10,6 +10,7 @@ "dependencies": { "@reduxjs/toolkit": "^2.5.1", "chart.js": "^4.4.4", + "clsx": "^2.1.1", "i18next": "^24.2.0", "i18next-browser-languagedetector": "^8.0.2", "mermaid": "^11.6.0", @@ -2013,7 +2014,7 @@ "version": "18.3.0", "resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.3.0.tgz", "integrity": "sha512-EhwApuTmMBmXuFOikhQLIBUn6uFg81SwLMOAUgodJF14SOBOCMdU04gDoYi0WOJJHD144TL32z4yDqCW3dnkQg==", - "devOptional": true, + "dev": true, "dependencies": { "@types/react": "*" } @@ -3162,6 +3163,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/clsx": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/clsx/-/clsx-2.1.1.tgz", + "integrity": "sha512-eYm0QWBtUrBWZWG0d386OGAw16Z995PiOVo2B7bjWSbHedGl5e0ZWaq65kOGgUSNesEIDkB9ISbTg/JK9dhCZA==", + "license": "MIT", + "engines": { + "node": ">=6" + } + }, "node_modules/color-name": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", @@ -10609,7 +10619,7 @@ "version": "5.7.2", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.7.2.tgz", "integrity": "sha512-i5t66RHxDvVN40HfDd1PsEThGNnlMCMT3jMUuoh9/0TaqWevNontacunWyN02LA9/fIbEWlcHZcgTKb9QoaLfg==", - "devOptional": true, + "dev": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" diff --git a/frontend/package.json b/frontend/package.json index d43add99..1adb8554 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -21,6 +21,7 @@ "dependencies": { "@reduxjs/toolkit": "^2.5.1", "chart.js": "^4.4.4", + "clsx": "^2.1.1", "i18next": "^24.2.0", "i18next-browser-languagedetector": "^8.0.2", "mermaid": "^11.6.0", diff --git a/frontend/src/Hero.tsx b/frontend/src/Hero.tsx index 0161eac2..583f8a1b 100644 --- a/frontend/src/Hero.tsx +++ b/frontend/src/Hero.tsx @@ -38,9 +38,12 @@ export default function Hero({ )} @@ -361,7 +360,7 @@ export default function NewAgent({ mode }: { mode: 'new' | 'edit' | 'draft' }) { sourceDocs?.map((doc: Doc) => ({ id: doc.id || doc.retriever || doc.name, label: doc.name, - icon: SourceIcon, + icon: , })) || [] } selectedIds={selectedSourceIds} @@ -408,7 +407,7 @@ export default function NewAgent({ mode }: { mode: 'new' | 'edit' | 'draft' }) { agent.prompt_id ? prompts.filter( (prompt) => prompt.id === agent.prompt_id, - )[0].name || null + )[0]?.name || null : null } onSelect={(option: { label: string; value: string }) => @@ -532,7 +531,7 @@ function AgentPreviewArea() { const selectedAgent = useSelector(selectSelectedAgent); return (
- {selectedAgent?.id ? ( + {selectedAgent?.status === 'published' ? (
@@ -540,7 +539,7 @@ function AgentPreviewArea() {
{' '}

- Published agents can be previewd here + Published agents can be previewed here

)} diff --git a/frontend/src/agents/index.tsx b/frontend/src/agents/index.tsx index 49123cd6..c2edb34a 100644 --- a/frontend/src/agents/index.tsx +++ b/frontend/src/agents/index.tsx @@ -12,7 +12,13 @@ import ThreeDots from '../assets/three-dots.svg'; import ContextMenu, { MenuOption } from '../components/ContextMenu'; import ConfirmationModal from '../modals/ConfirmationModal'; import { ActiveState } from '../models/misc'; -import { selectToken, setSelectedAgent } from '../preferences/preferenceSlice'; +import { + selectToken, + setSelectedAgent, + setAgents, + selectAgents, + selectSelectedAgent, +} from '../preferences/preferenceSlice'; import AgentLogs from './AgentLogs'; import NewAgent from './NewAgent'; import { Agent } from './types'; @@ -31,9 +37,12 @@ export default function Agents() { function AgentsList() { const navigate = useNavigate(); + const dispatch = useDispatch(); const token = useSelector(selectToken); + const agents = useSelector(selectAgents); + const selectedAgent = useSelector(selectSelectedAgent); - const [userAgents, setUserAgents] = useState([]); + const [userAgents, setUserAgents] = useState(agents || []); const [loading, setLoading] = useState(true); const getAgents = async () => { @@ -43,6 +52,7 @@ function AgentsList() { if (!response.ok) throw new Error('Failed to fetch agents'); const data = await response.json(); setUserAgents(data); + dispatch(setAgents(data)); setLoading(false); } catch (error) { console.error('Error:', error); @@ -52,6 +62,7 @@ function AgentsList() { useEffect(() => { getAgents(); + if (selectedAgent) dispatch(setSelectedAgent(null)); }, [token]); return (
@@ -62,6 +73,7 @@ function AgentsList() { Discover and create custom versions of DocsGPT that combine instructions, extra knowledge, and any combination of skills.

+ {/* Premade agents section */} {/*

Premade by DocsGPT @@ -126,6 +138,7 @@ function AgentsList() { )) @@ -148,9 +161,11 @@ function AgentsList() { function AgentCard({ agent, + agents, setUserAgents, }: { agent: Agent; + agents: Agent[]; setUserAgents: React.Dispatch>; }) { const navigate = useNavigate(); @@ -200,8 +215,10 @@ function AgentCard({ ]; const handleClick = () => { - dispatch(setSelectedAgent(agent)); - navigate(`/`); + if (agent.status === 'published') { + dispatch(setSelectedAgent(agent)); + navigate(`/`); + } }; const handleDelete = async (agentId: string) => { @@ -211,11 +228,15 @@ function AgentCard({ setUserAgents((prevAgents) => prevAgents.filter((prevAgent) => prevAgent.id !== data.id), ); + dispatch(setAgents(agents.filter((prevAgent) => prevAgent.id !== data.id))); }; return (
handleClick()} + className={`relative flex h-44 w-48 flex-col justify-between rounded-[1.2rem] bg-[#F6F6F6] px-6 py-5 hover:bg-[#ECECEC] dark:bg-[#383838] hover:dark:bg-[#383838]/80 ${agent.status === 'published' && 'cursor-pointer'}`} + onClick={(e) => { + e.stopPropagation(); + handleClick(); + }} >
`/api/update_agent/${agent_id}`, DELETE_AGENT: (id: string) => `/api/delete_agent?id=${id}`, + AGENT_WEBHOOK: (id: string) => `/api/agent_webhook?id=${id}`, PROMPTS: '/api/get_prompts', CREATE_PROMPT: '/api/create_prompt', DELETE_PROMPT: '/api/delete_prompt', diff --git a/frontend/src/api/services/userService.ts b/frontend/src/api/services/userService.ts index bbe20b10..4a0f45d8 100644 --- a/frontend/src/api/services/userService.ts +++ b/frontend/src/api/services/userService.ts @@ -31,6 +31,8 @@ const userService = { apiClient.put(endpoints.USER.UPDATE_AGENT(agent_id), data, token), deleteAgent: (id: string, token: string | null): Promise => apiClient.delete(endpoints.USER.DELETE_AGENT(id), token), + getAgentWebhook: (id: string, token: string | null): Promise => + apiClient.get(endpoints.USER.AGENT_WEBHOOK(id), token), getPrompts: (token: string | null): Promise => apiClient.get(endpoints.USER.PROMPTS, token), createPrompt: (data: any, token: string | null): Promise => diff --git a/frontend/src/assets/monitoring-purple.svg b/frontend/src/assets/monitoring-purple.svg new file mode 100644 index 00000000..ef849521 --- /dev/null +++ b/frontend/src/assets/monitoring-purple.svg @@ -0,0 +1,3 @@ + + + diff --git a/frontend/src/assets/monitoring-white.svg b/frontend/src/assets/monitoring-white.svg new file mode 100644 index 00000000..b015eeee --- /dev/null +++ b/frontend/src/assets/monitoring-white.svg @@ -0,0 +1,3 @@ + + + diff --git a/frontend/src/components/CopyButton.tsx b/frontend/src/components/CopyButton.tsx index c430603f..0afbbe82 100644 --- a/frontend/src/components/CopyButton.tsx +++ b/frontend/src/components/CopyButton.tsx @@ -1,58 +1,136 @@ +import clsx from 'clsx'; import copy from 'copy-to-clipboard'; -import { useState } from 'react'; +import { useCallback, useEffect, useRef, useState } from 'react'; import { useTranslation } from 'react-i18next'; import CheckMark from '../assets/checkmark.svg?react'; -import Copy from '../assets/copy.svg?react'; +import CopyIcon from '../assets/copy.svg?react'; + +type CopyButtonProps = { + textToCopy: string; + bgColorLight?: string; + bgColorDark?: string; + hoverBgColorLight?: string; + hoverBgColorDark?: string; + iconSize?: string; + padding?: string; + showText?: boolean; + copiedDuration?: number; + className?: string; + iconWrapperClassName?: string; + textClassName?: string; +}; + +const DEFAULT_ICON_SIZE = 'w-4 h-4'; +const DEFAULT_PADDING = 'p-2'; +const DEFAULT_COPIED_DURATION = 2000; +const DEFAULT_BG_LIGHT = '#FFFFFF'; +const DEFAULT_BG_DARK = 'transparent'; +const DEFAULT_HOVER_BG_LIGHT = '#EEEEEE'; +const DEFAULT_HOVER_BG_DARK = '#4A4A4A'; export default function CopyButton({ - text, - colorLight, - colorDark, + textToCopy, + bgColorLight = DEFAULT_BG_LIGHT, + bgColorDark = DEFAULT_BG_DARK, + hoverBgColorLight = DEFAULT_HOVER_BG_LIGHT, + hoverBgColorDark = DEFAULT_HOVER_BG_DARK, + iconSize = DEFAULT_ICON_SIZE, + padding = DEFAULT_PADDING, showText = false, -}: { - text: string; - colorLight?: string; - colorDark?: string; - showText?: boolean; -}) { + copiedDuration = DEFAULT_COPIED_DURATION, + className, + iconWrapperClassName, + textClassName, +}: CopyButtonProps) { const { t } = useTranslation(); - const [copied, setCopied] = useState(false); - const [isCopyHovered, setIsCopyHovered] = useState(false); + const [isCopied, setIsCopied] = useState(false); + const timeoutIdRef = useRef(null); - const handleCopyClick = (text: string) => { - copy(text); - setCopied(true); - setTimeout(() => { - setCopied(false); - }, 3000); - }; + const iconWrapperClasses = clsx( + 'flex items-center justify-center rounded-full transition-colors duration-150 ease-in-out', + padding, + `bg-[${bgColorLight}] dark:bg-[${bgColorDark}]`, + `hover:bg-[${hoverBgColorLight}] dark:hover:bg-[${hoverBgColorDark}]`, + { + 'bg-green-100 dark:bg-green-900 hover:bg-green-100 dark:hover:bg-green-900': + isCopied, + }, + iconWrapperClassName, + ); + const rootButtonClasses = clsx( + 'flex items-center gap-2 group', + 'focus:outline-none focus-visible:ring-2 focus-visible:ring-offset-2 focus-visible:ring-blue-500 rounded-full', + className, + ); + + const textSpanClasses = clsx( + 'text-xs text-gray-600 dark:text-gray-400 transition-opacity duration-150 ease-in-out', + { 'opacity-75': isCopied }, + textClassName, + ); + + const IconComponent = isCopied ? CheckMark : CopyIcon; + const iconClasses = clsx(iconSize, { + 'stroke-green-600 dark:stroke-green-400': isCopied, + 'fill-none text-gray-700 dark:text-gray-300': !isCopied, + }); + + const buttonTitle = isCopied + ? t('conversation.copied') + : t('conversation.copy'); + const displayedText = isCopied + ? t('conversation.copied') + : t('conversation.copy'); + + const handleCopy = useCallback(() => { + if (isCopied) return; + + try { + const success = copy(textToCopy); + if (success) { + setIsCopied(true); + + if (timeoutIdRef.current) { + clearTimeout(timeoutIdRef.current); + } + + timeoutIdRef.current = setTimeout(() => { + setIsCopied(false); + timeoutIdRef.current = null; + }, copiedDuration); + } else { + console.warn('Copy command failed.'); + } + } catch (error) { + console.error('Failed to copy text:', error); + } + }, [textToCopy, copiedDuration, isCopied]); + + useEffect(() => { + return () => { + if (timeoutIdRef.current) { + clearTimeout(timeoutIdRef.current); + } + }; + }, []); return ( ); } diff --git a/frontend/src/components/MermaidRenderer.tsx b/frontend/src/components/MermaidRenderer.tsx index 11285bc5..06d5628b 100644 --- a/frontend/src/components/MermaidRenderer.tsx +++ b/frontend/src/components/MermaidRenderer.tsx @@ -261,7 +261,7 @@ const MermaidRenderer: React.FC = ({ mermaid
- + {showDiagramOptions && (
diff --git a/frontend/src/components/MessageInput.tsx b/frontend/src/components/MessageInput.tsx index e7ef7f9d..60cd4b81 100644 --- a/frontend/src/components/MessageInput.tsx +++ b/frontend/src/components/MessageInput.tsx @@ -36,15 +36,7 @@ type MessageInputProps = { loading: boolean; showSourceButton?: boolean; showToolButton?: boolean; -}; - -type UploadState = { - taskId: string; - fileName: string; - progress: number; - attachment_id?: string; - token_count?: number; - status: 'uploading' | 'processing' | 'completed' | 'failed'; + autoFocus?: boolean; }; export default function MessageInput({ @@ -54,6 +46,7 @@ export default function MessageInput({ loading, showSourceButton = true, showToolButton = true, + autoFocus = true, }: MessageInputProps) { const { t } = useTranslation(); const [isDarkTheme] = useDarkTheme(); @@ -235,7 +228,7 @@ export default function MessageInput({ }; useEffect(() => { - inputRef.current?.focus(); + if (autoFocus) inputRef.current?.focus(); handleInput(); }, []); diff --git a/frontend/src/conversation/ConversationBubble.tsx b/frontend/src/conversation/ConversationBubble.tsx index 5e3b83c1..457308ec 100644 --- a/frontend/src/conversation/ConversationBubble.tsx +++ b/frontend/src/conversation/ConversationBubble.tsx @@ -5,10 +5,7 @@ import { useTranslation } from 'react-i18next'; import ReactMarkdown from 'react-markdown'; import { useSelector } from 'react-redux'; import { Prism as SyntaxHighlighter } from 'react-syntax-highlighter'; -import { - oneLight, - vscDarkPlus, -} from 'react-syntax-highlighter/dist/cjs/styles/prism'; +import { oneLight, vscDarkPlus } from 'react-syntax-highlighter/dist/cjs/styles/prism'; import rehypeKatex from 'rehype-katex'; import remarkGfm from 'remark-gfm'; import remarkMath from 'remark-math'; @@ -29,10 +26,7 @@ import CopyButton from '../components/CopyButton'; import Sidebar from '../components/Sidebar'; import SpeakButton from '../components/TextToSpeechButton'; import { useDarkTheme, useOutsideAlerter } from '../hooks'; -import { - selectChunks, - selectSelectedDocs, -} from '../preferences/preferenceSlice'; +import { selectChunks, selectSelectedDocs } from '../preferences/preferenceSlice'; import classes from './ConversationBubble.module.css'; import { FEEDBACK, MESSAGE_TYPE } from './conversationModels'; import { ToolCallsType } from './types'; @@ -414,7 +408,7 @@ const ConversationBubble = forwardRef< {language}
- +
{' '}

@@ -739,7 +733,7 @@ function ToolCalls({ toolCalls }: { toolCalls: ToolCallsType[] }) { Response {' '}

@@ -816,7 +810,7 @@ function Thought({ {language}

(null); + const [apiKey, setApiKey] = useState(null); + const [webhookUrl, setWebhookUrl] = useState(null); + const [loadingStates, setLoadingStates] = useState({ + publicLink: false, + apiKey: false, + webhook: false, + }); + + const setLoading = ( + key: 'publicLink' | 'apiKey' | 'webhook', + state: boolean, + ) => { + setLoadingStates((prev) => ({ ...prev, [key]: state })); + }; + + const handleGenerateWebhook = async () => { + setLoading('webhook', true); + const response = await userService.getAgentWebhook(agent.id ?? '', token); + if (!response.ok) { + setLoading('webhook', false); + return; + } + const data = await response.json(); + setWebhookUrl(data.webhook_url); + setLoading('webhook', false); + }; + if (modalState !== 'ACTIVE') return null; return ( { - // if (mode === 'new') navigate('/agents'); setModalState('INACTIVE'); }} > @@ -54,12 +88,34 @@ export default function AgentDetailsModal({ )}
-

- Webhooks -

- +
+

+ Webhook URL +

+ {webhookUrl && ( +
+ +
+ )} +
+ {webhookUrl ? ( +
+

+ {webhookUrl} +

+
+ ) : ( + + )}

diff --git a/frontend/src/modals/ConfirmationModal.tsx b/frontend/src/modals/ConfirmationModal.tsx index 25f8c2da..28151736 100644 --- a/frontend/src/modals/ConfirmationModal.tsx +++ b/frontend/src/modals/ConfirmationModal.tsx @@ -40,19 +40,23 @@ export default function ConfirmationModal({ >
-

+

{message}