diff --git a/application/Dockerfile b/application/Dockerfile index 308b721b..e33721a2 100644 --- a/application/Dockerfile +++ b/application/Dockerfile @@ -84,4 +84,4 @@ EXPOSE 7091 USER appuser # Start Gunicorn -CMD ["gunicorn", "-w", "2", "--timeout", "120", "--bind", "0.0.0.0:7091", "application.wsgi:app"] \ No newline at end of file +CMD ["gunicorn", "-w", "1", "--timeout", "120", "--bind", "0.0.0.0:7091", "--preload", "application.wsgi:app"] diff --git a/application/agents/base.py b/application/agents/base.py index b3797fc6..d44244cf 100644 --- a/application/agents/base.py +++ b/application/agents/base.py @@ -10,6 +10,7 @@ from application.core.mongo_db import MongoDB from application.llm.llm_creator import LLMCreator from application.logging import build_stack_data, log_activity, LogContext from application.retriever.base import BaseRetriever +from application.core.settings import settings from bson.objectid import ObjectId @@ -61,7 +62,7 @@ class BaseAgent(ABC): def _get_tools(self, api_key: str = None) -> Dict[str, Dict]: mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] agents_collection = db["agents"] tools_collection = db["user_tools"] @@ -82,7 +83,7 @@ class BaseAgent(ABC): def _get_user_tools(self, user="local"): mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] user_tools_collection = db["user_tools"] user_tools = user_tools_collection.find({"user": user, "status": True}) user_tools = list(user_tools) diff --git a/application/agents/llm_handler.py b/application/agents/llm_handler.py index 7fe794f8..bf39f625 100644 --- a/application/agents/llm_handler.py +++ b/application/agents/llm_handler.py @@ -15,95 +15,86 @@ class LLMHandler(ABC): @abstractmethod def handle_response(self, agent, resp, tools_dict, messages, attachments=None, **kwargs): pass - + def prepare_messages_with_attachments(self, agent, messages, attachments=None): """ Prepare messages with attachment content if available. - + Args: agent: The current agent instance. messages (list): List of message dictionaries. attachments (list): List of attachment dictionaries with content. - + Returns: list: Messages with attachment context added to the system prompt. """ if not attachments: return messages - + logger.info(f"Preparing messages with {len(attachments)} attachments") - + supported_types = agent.llm.get_supported_attachment_types() - + supported_attachments = [] unsupported_attachments = [] - + for attachment in attachments: mime_type = attachment.get('mime_type') - if not mime_type: - import mimetypes - file_path = attachment.get('path') - if file_path: - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - else: - unsupported_attachments.append(attachment) - continue - if mime_type in supported_types: supported_attachments.append(attachment) else: unsupported_attachments.append(attachment) - + # Process supported attachments with the LLM's custom method prepared_messages = messages if supported_attachments: logger.info(f"Processing {len(supported_attachments)} supported attachments with {agent.llm.__class__.__name__}'s method") prepared_messages = agent.llm.prepare_messages_with_attachments(messages, supported_attachments) - + # Process unsupported attachments with the default method if unsupported_attachments: logger.info(f"Processing {len(unsupported_attachments)} unsupported attachments with default method") prepared_messages = self._append_attachment_content_to_system(prepared_messages, unsupported_attachments) - + return prepared_messages - + def _append_attachment_content_to_system(self, messages, attachments): """ Default method to append attachment content to the system prompt. - + Args: messages (list): List of message dictionaries. attachments (list): List of attachment dictionaries with content. - + Returns: list: Messages with attachment context added to the system prompt. """ prepared_messages = messages.copy() - + attachment_texts = [] for attachment in attachments: logger.info(f"Adding attachment {attachment.get('id')} to context") if 'content' in attachment: attachment_texts.append(f"Attached file content:\n\n{attachment['content']}") - + if attachment_texts: combined_attachment_text = "\n\n".join(attachment_texts) - + system_found = False for i in range(len(prepared_messages)): if prepared_messages[i].get("role") == "system": prepared_messages[i]["content"] += f"\n\n{combined_attachment_text}" system_found = True break - + if not system_found: prepared_messages.insert(0, {"role": "system", "content": combined_attachment_text}) - + return prepared_messages class OpenAILLMHandler(LLMHandler): def handle_response(self, agent, resp, tools_dict, messages, attachments=None, stream: bool = True): - + messages = self.prepare_messages_with_attachments(agent, messages, attachments) logger.info(f"Messages with attachments: {messages}") if not stream: @@ -167,7 +158,7 @@ class OpenAILLMHandler(LLMHandler): if isinstance(chunk, str) and len(chunk) > 0: yield chunk continue - elif hasattr(chunk, "delta"): + elif hasattr(chunk, "delta"): chunk_delta = chunk.delta if ( @@ -258,7 +249,7 @@ class OpenAILLMHandler(LLMHandler): return resp elif isinstance(chunk, str) and len(chunk) == 0: continue - + logger.info(f"Regenerating with messages: {messages}") resp = agent.llm.gen_stream( model=agent.gpt_model, messages=messages, tools=agent.tools @@ -269,9 +260,9 @@ class OpenAILLMHandler(LLMHandler): class GoogleLLMHandler(LLMHandler): def handle_response(self, agent, resp, tools_dict, messages, attachments=None, stream: bool = True): from google.genai import types - + messages = self.prepare_messages_with_attachments(agent, messages, attachments) - + while True: if not stream: response = agent.llm.gen( diff --git a/application/api/answer/routes.py b/application/api/answer/routes.py index 8f44385b..2a8476d8 100644 --- a/application/api/answer/routes.py +++ b/application/api/answer/routes.py @@ -23,7 +23,7 @@ from application.utils import check_required_fields, limit_chat_history logger = logging.getLogger(__name__) mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] conversations_collection = db["conversations"] sources_collection = db["sources"] prompts_collection = db["prompts"] diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index c8e32d11..80759593 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -3,12 +3,15 @@ import datetime from flask import Blueprint, request, send_from_directory from werkzeug.utils import secure_filename from bson.objectid import ObjectId - +import logging from application.core.mongo_db import MongoDB from application.core.settings import settings +from application.storage.storage_creator import StorageCreator + +logger = logging.getLogger(__name__) mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] conversations_collection = db["conversations"] sources_collection = db["sources"] @@ -45,26 +48,26 @@ def upload_index_files(): remote_data = request.form["remote_data"] if "remote_data" in request.form else None sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None - save_dir = os.path.join(current_dir, "indexes", str(id)) + storage = StorageCreator.get_storage() + index_base_path = f"indexes/{id}" + if settings.VECTOR_STORE == "faiss": if "file_faiss" not in request.files: - print("No file part") + logger.error("No file_faiss part") return {"status": "no file"} file_faiss = request.files["file_faiss"] if file_faiss.filename == "": return {"status": "no file name"} if "file_pkl" not in request.files: - print("No file part") + logger.error("No file_pkl part") return {"status": "no file"} file_pkl = request.files["file_pkl"] if file_pkl.filename == "": return {"status": "no file name"} - # saves index files - - if not os.path.exists(save_dir): - os.makedirs(save_dir) - file_faiss.save(os.path.join(save_dir, "index.faiss")) - file_pkl.save(os.path.join(save_dir, "index.pkl")) + + # Save index files to storage + storage.save_file(file_faiss, f"{index_base_path}/index.faiss") + storage.save_file(file_pkl, f"{index_base_path}/index.pkl") existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) if existing_entry: diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 391444fc..528a4c29 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -29,7 +29,7 @@ from application.utils import check_required_fields, validate_function_name from application.vectorstore.vector_creator import VectorCreator mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] conversations_collection = db["conversations"] sources_collection = db["sources"] prompts_collection = db["prompts"] @@ -419,81 +419,85 @@ class UploadFile(Resource): user = secure_filename(decoded_token.get("sub")) job_name = secure_filename(request.form["name"]) + try: - save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) - os.makedirs(save_dir, exist_ok=True) - + from application.storage.storage_creator import StorageCreator + storage = StorageCreator.get_storage() + + base_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}" + if len(files) > 1: - temp_dir = os.path.join(save_dir, "temp") - os.makedirs(temp_dir, exist_ok=True) - + temp_files = [] for file in files: filename = secure_filename(file.filename) - file.save(os.path.join(temp_dir, filename)) + temp_path = f"{base_path}/temp/{filename}" + storage.save_file(file, temp_path) + temp_files.append(temp_path) print(f"Saved file: {filename}") - zip_path = shutil.make_archive( - base_name=os.path.join(save_dir, job_name), - format="zip", - root_dir=temp_dir, - ) - final_filename = os.path.basename(zip_path) - shutil.rmtree(temp_dir) + + zip_filename = f"{job_name}.zip" + zip_path = f"{base_path}/{zip_filename}" + + def create_zip_archive(temp_paths, **kwargs): + import tempfile + with tempfile.TemporaryDirectory() as temp_dir: + for path in temp_paths: + file_data = storage.get_file(path) + with open(os.path.join(temp_dir, os.path.basename(path)), 'wb') as f: + f.write(file_data.read()) + + # Create zip archive + zip_temp = shutil.make_archive( + base_name=os.path.join(temp_dir, job_name), + format="zip", + root_dir=temp_dir + ) + + return zip_temp + + zip_temp_path = create_zip_archive(temp_files) + with open(zip_temp_path, 'rb') as zip_file: + storage.save_file(zip_file, zip_path) + + # Clean up temp files + for temp_path in temp_files: + storage.delete_file(temp_path) + task = ingest.delay( settings.UPLOAD_FOLDER, [ - ".rst", - ".md", - ".pdf", - ".txt", - ".docx", - ".csv", - ".epub", - ".html", - ".mdx", - ".json", - ".xlsx", - ".pptx", - ".png", - ".jpg", - ".jpeg", + ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", + ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", + ".jpg", ".jpeg", ], job_name, - final_filename, + zip_filename, user, ) else: + # For single file file = files[0] - final_filename = secure_filename(file.filename) - file_path = os.path.join(save_dir, final_filename) - file.save(file_path) - + filename = secure_filename(file.filename) + file_path = f"{base_path}/{filename}" + + storage.save_file(file, file_path) + task = ingest.delay( settings.UPLOAD_FOLDER, [ - ".rst", - ".md", - ".pdf", - ".txt", - ".docx", - ".csv", - ".epub", - ".html", - ".mdx", - ".json", - ".xlsx", - ".pptx", - ".png", - ".jpg", - ".jpeg", + ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", + ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", + ".jpg", ".jpeg", ], job_name, - final_filename, + filename, user, ) except Exception as err: current_app.logger.error(f"Error uploading file: {err}") return make_response(jsonify({"success": False}), 400) + return make_response(jsonify({"success": True, "task_id": task.id}), 200) @@ -2868,10 +2872,9 @@ class StoreAttachment(Resource): decoded_token = request.decoded_token if not decoded_token: return make_response(jsonify({"success": False}), 401) - - # Get single file instead of list + file = request.files.get("file") - + if not file or file.filename == "": return make_response( jsonify({"status": "error", "message": "Missing file"}), @@ -2879,43 +2882,35 @@ class StoreAttachment(Resource): ) user = secure_filename(decoded_token.get("sub")) - + try: attachment_id = ObjectId() original_filename = secure_filename(file.filename) - - save_dir = os.path.join( - current_dir, - settings.UPLOAD_FOLDER, - user, - "attachments", - str(attachment_id), - ) - os.makedirs(save_dir, exist_ok=True) - - file_path = os.path.join(save_dir, original_filename) - - file.save(file_path) + relative_path = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{str(attachment_id)}/{original_filename}" + + file_content = file.read() + file_info = { "filename": original_filename, "attachment_id": str(attachment_id), + "path": relative_path, + "file_content": file_content } - current_app.logger.info(f"Saved file: {file_path}") - - # Start async task to process single file - task = store_attachment.delay(save_dir, file_info, user) - - return make_response( - jsonify( - { - "success": True, - "task_id": task.id, - "message": "File uploaded successfully. Processing started.", - } - ), - 200, + + task = store_attachment.delay( + file_info, + user + ) + + return make_response( + jsonify({ + "success": True, + "task_id": task.id, + "message": "File uploaded successfully. Processing started." + }), + 200 ) - except Exception as err: current_app.logger.error(f"Error storing attachment: {err}") return make_response(jsonify({"success": False, "error": str(err)}), 400) + diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index f53d856b..fffa9ba9 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -29,8 +29,8 @@ def schedule_syncs(self, frequency): @celery.task(bind=True) -def store_attachment(self, directory, saved_files, user): - resp = attachment_worker(self, directory, saved_files, user) +def store_attachment(self, file_info, user): + resp = attachment_worker(self, file_info, user) return resp diff --git a/application/core/settings.py b/application/core/settings.py index 74bffe53..3be34242 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -19,6 +19,7 @@ class Settings(BaseSettings): CELERY_BROKER_URL: str = "redis://localhost:6379/0" CELERY_RESULT_BACKEND: str = "redis://localhost:6379/1" MONGO_URI: str = "mongodb://localhost:27017/docsgpt" + MONGO_DB_NAME: str = "docsgpt" MODEL_PATH: str = os.path.join(current_dir, "models/docsgpt-7b-f16.gguf") DEFAULT_MAX_HISTORY: int = 150 MODEL_TOKEN_LIMITS: dict = { @@ -98,6 +99,8 @@ class Settings(BaseSettings): BRAVE_SEARCH_API_KEY: Optional[str] = None FLASK_DEBUG_MODE: bool = False + STORAGE_TYPE: str = "local" # local or s3 + JWT_SECRET_KEY: str = "" diff --git a/application/llm/google_ai.py b/application/llm/google_ai.py index c049eaa2..a56616d2 100644 --- a/application/llm/google_ai.py +++ b/application/llm/google_ai.py @@ -1,11 +1,11 @@ from google import genai from google.genai import types -import os import logging -import mimetypes import json from application.llm.base import BaseLLM +from application.storage.storage_creator import StorageCreator +from application.core.settings import settings class GoogleLLM(BaseLLM): @@ -14,11 +14,12 @@ class GoogleLLM(BaseLLM): self.api_key = api_key self.user_api_key = user_api_key self.client = genai.Client(api_key=self.api_key) + self.storage = StorageCreator.get_storage() def get_supported_attachment_types(self): """ Return a list of MIME types supported by Google Gemini for file uploads. - + Returns: list: List of supported MIME types """ @@ -30,35 +31,35 @@ class GoogleLLM(BaseLLM): 'image/webp', 'image/gif' ] - + def prepare_messages_with_attachments(self, messages, attachments=None): """ Process attachments using Google AI's file API for more efficient handling. - + Args: messages (list): List of message dictionaries. attachments (list): List of attachment dictionaries with content and metadata. - + Returns: list: Messages formatted with file references for Google AI API. """ if not attachments: return messages - + prepared_messages = messages.copy() - + # Find the user message to attach files to the last one user_message_index = None for i in range(len(prepared_messages) - 1, -1, -1): if prepared_messages[i].get("role") == "user": user_message_index = i break - + if user_message_index is None: user_message = {"role": "user", "content": []} prepared_messages.append(user_message) user_message_index = len(prepared_messages) - 1 - + if isinstance(prepared_messages[user_message_index].get("content"), str): text_content = prepared_messages[user_message_index]["content"] prepared_messages[user_message_index]["content"] = [ @@ -66,15 +67,11 @@ class GoogleLLM(BaseLLM): ] elif not isinstance(prepared_messages[user_message_index].get("content"), list): prepared_messages[user_message_index]["content"] = [] - + files = [] for attachment in attachments: mime_type = attachment.get('mime_type') - if not mime_type: - file_path = attachment.get('path') - if file_path: - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - + if mime_type in self.get_supported_attachment_types(): try: file_uri = self._upload_file_to_google(attachment) @@ -84,63 +81,54 @@ class GoogleLLM(BaseLLM): logging.error(f"GoogleLLM: Error uploading file: {e}") if 'content' in attachment: prepared_messages[user_message_index]["content"].append({ - "type": "text", + "type": "text", "text": f"[File could not be processed: {attachment.get('path', 'unknown')}]" }) - + if files: logging.info(f"GoogleLLM: Adding {len(files)} files to message") prepared_messages[user_message_index]["content"].append({ "files": files }) - + return prepared_messages def _upload_file_to_google(self, attachment): """ Upload a file to Google AI and return the file URI. - + Args: attachment (dict): Attachment dictionary with path and metadata. - + Returns: str: Google AI file URI for the uploaded file. """ if 'google_file_uri' in attachment: return attachment['google_file_uri'] - + file_path = attachment.get('path') if not file_path: raise ValueError("No file path provided in attachment") - - if not os.path.isabs(file_path): - current_dir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ) - file_path = os.path.join(current_dir, "application", file_path) - - if not os.path.exists(file_path): + + if not self.storage.file_exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") - - mime_type = attachment.get('mime_type') - if not mime_type: - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - + try: - response = self.client.files.upload(file=file_path) - - file_uri = response.uri - + file_uri = self.storage.process_file( + file_path, + lambda local_path, **kwargs: self.client.files.upload(file=local_path).uri + ) + from application.core.mongo_db import MongoDB mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] attachments_collection = db["attachments"] if '_id' in attachment: attachments_collection.update_one( {"_id": attachment['_id']}, {"$set": {"google_file_uri": file_uri}} ) - + return file_uri except Exception as e: logging.error(f"Error uploading file to Google AI: {e}") @@ -289,7 +277,7 @@ class GoogleLLM(BaseLLM): if tools: cleaned_tools = self._clean_tools_format(tools) config.tools = cleaned_tools - + # Check if we have both tools and file attachments has_attachments = False for message in messages: @@ -299,16 +287,16 @@ class GoogleLLM(BaseLLM): break if has_attachments: break - + logging.info(f"GoogleLLM: Starting stream generation. Model: {model}, Messages: {json.dumps(messages, default=str)}, Has attachments: {has_attachments}") - + response = client.models.generate_content_stream( model=model, contents=messages, config=config, ) - - + + for chunk in response: if hasattr(chunk, "candidates") and chunk.candidates: for candidate in chunk.candidates: diff --git a/application/llm/openai.py b/application/llm/openai.py index 75bd37e0..248fd7e2 100644 --- a/application/llm/openai.py +++ b/application/llm/openai.py @@ -1,11 +1,10 @@ import json import base64 -import os -import mimetypes import logging from application.core.settings import settings from application.llm.base import BaseLLM +from application.storage.storage_creator import StorageCreator class OpenAILLM(BaseLLM): @@ -20,6 +19,7 @@ class OpenAILLM(BaseLLM): self.client = OpenAI(api_key=api_key) self.api_key = api_key self.user_api_key = user_api_key + self.storage = StorageCreator.get_storage() def _clean_messages_openai(self, messages): cleaned_messages = [] @@ -77,6 +77,8 @@ class OpenAILLM(BaseLLM): content_parts.append(item) elif "type" in item and item["type"] == "file" and "file" in item: content_parts.append(item) + elif "type" in item and item["type"] == "image_url" and "image_url" in item: + content_parts.append(item) cleaned_messages.append({"role": role, "content": content_parts}) else: raise ValueError( @@ -149,7 +151,7 @@ class OpenAILLM(BaseLLM): def get_supported_attachment_types(self): """ Return a list of MIME types supported by OpenAI for file uploads. - + Returns: list: List of supported MIME types """ @@ -161,35 +163,35 @@ class OpenAILLM(BaseLLM): 'image/webp', 'image/gif' ] - + def prepare_messages_with_attachments(self, messages, attachments=None): """ Process attachments using OpenAI's file API for more efficient handling. - + Args: messages (list): List of message dictionaries. attachments (list): List of attachment dictionaries with content and metadata. - + Returns: list: Messages formatted with file references for OpenAI API. """ if not attachments: return messages - + prepared_messages = messages.copy() - + # Find the user message to attach file_id to the last one user_message_index = None for i in range(len(prepared_messages) - 1, -1, -1): if prepared_messages[i].get("role") == "user": user_message_index = i break - + if user_message_index is None: user_message = {"role": "user", "content": []} prepared_messages.append(user_message) user_message_index = len(prepared_messages) - 1 - + if isinstance(prepared_messages[user_message_index].get("content"), str): text_content = prepared_messages[user_message_index]["content"] prepared_messages[user_message_index]["content"] = [ @@ -197,14 +199,10 @@ class OpenAILLM(BaseLLM): ] elif not isinstance(prepared_messages[user_message_index].get("content"), list): prepared_messages[user_message_index]["content"] = [] - + for attachment in attachments: mime_type = attachment.get('mime_type') - if not mime_type: - file_path = attachment.get('path') - if file_path: - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' - + if mime_type and mime_type.startswith('image/'): try: base64_image = self._get_base64_image(attachment) @@ -218,14 +216,13 @@ class OpenAILLM(BaseLLM): logging.error(f"Error processing image attachment: {e}") if 'content' in attachment: prepared_messages[user_message_index]["content"].append({ - "type": "text", + "type": "text", "text": f"[Image could not be processed: {attachment.get('path', 'unknown')}]" }) # Handle PDFs using the file API elif mime_type == 'application/pdf': try: file_id = self._upload_file_to_openai(attachment) - prepared_messages[user_message_index]["content"].append({ "type": "file", "file": {"file_id": file_id} @@ -234,90 +231,74 @@ class OpenAILLM(BaseLLM): logging.error(f"Error uploading PDF to OpenAI: {e}") if 'content' in attachment: prepared_messages[user_message_index]["content"].append({ - "type": "text", + "type": "text", "text": f"File content:\n\n{attachment['content']}" }) - + return prepared_messages def _get_base64_image(self, attachment): """ Convert an image file to base64 encoding. - + Args: attachment (dict): Attachment dictionary with path and metadata. - + Returns: str: Base64-encoded image data. """ file_path = attachment.get('path') if not file_path: raise ValueError("No file path provided in attachment") - - if not os.path.isabs(file_path): - current_dir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ) - file_path = os.path.join(current_dir, "application", file_path) - - if not os.path.exists(file_path): - raise FileNotFoundError(f"File not found: {file_path}") - - with open(file_path, "rb") as image_file: - return base64.b64encode(image_file.read()).decode('utf-8') - def _upload_file_to_openai(self, attachment): ##pdfs + try: + with self.storage.get_file(file_path) as image_file: + return base64.b64encode(image_file.read()).decode('utf-8') + except FileNotFoundError: + raise FileNotFoundError(f"File not found: {file_path}") + + def _upload_file_to_openai(self, attachment): """ Upload a file to OpenAI and return the file_id. - + Args: attachment (dict): Attachment dictionary with path and metadata. Expected keys: - path: Path to the file - id: Optional MongoDB ID for caching - + Returns: str: OpenAI file_id for the uploaded file. """ - import os import logging - + if 'openai_file_id' in attachment: return attachment['openai_file_id'] - + file_path = attachment.get('path') - if not file_path: - raise ValueError("No file path provided in attachment") - - if not os.path.isabs(file_path): - current_dir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ) - file_path = os.path.join(current_dir,"application", file_path) - - if not os.path.exists(file_path): + + if not self.storage.file_exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") - try: - with open(file_path, 'rb') as file: - response = self.client.files.create( - file=file, + file_id = self.storage.process_file( + file_path, + lambda local_path, **kwargs: self.client.files.create( + file=open(local_path, 'rb'), purpose="assistants" - ) - - file_id = response.id - + ).id + ) + from application.core.mongo_db import MongoDB mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] attachments_collection = db["attachments"] if '_id' in attachment: attachments_collection.update_one( {"_id": attachment['_id']}, {"$set": {"openai_file_id": file_id}} ) - + return file_id except Exception as e: logging.error(f"Error uploading file to OpenAI: {e}") @@ -327,7 +308,7 @@ class OpenAILLM(BaseLLM): class AzureOpenAILLM(OpenAILLM): def __init__( - self, api_key, user_api_key, *args, **kwargs + self, api_key, user_api_key, *args, **kwargs ): super().__init__(api_key) diff --git a/application/logging.py b/application/logging.py index eaf43d7c..b447ffa8 100644 --- a/application/logging.py +++ b/application/logging.py @@ -7,6 +7,7 @@ import uuid from typing import Any, Callable, Dict, Generator, List from application.core.mongo_db import MongoDB +from application.core.settings import settings logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" @@ -133,7 +134,7 @@ def _log_to_mongodb( ) -> None: try: mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] user_logs_collection = db["stack_logs"] log_entry = { diff --git a/application/requirements.txt b/application/requirements.txt index 760078ae..56edab9f 100644 --- a/application/requirements.txt +++ b/application/requirements.txt @@ -15,7 +15,7 @@ Flask==3.1.0 faiss-cpu==1.9.0.post1 flask-restx==1.3.0 google-genai==1.3.0 -google-generativeai==0.8.3 +google-generativeai==0.8.5 gTTS==2.5.4 gunicorn==23.0.0 html2text==2024.2.26 diff --git a/application/storage/base.py b/application/storage/base.py new file mode 100644 index 00000000..273e7761 --- /dev/null +++ b/application/storage/base.py @@ -0,0 +1,94 @@ +"""Base storage class for file system abstraction.""" +from abc import ABC, abstractmethod +from typing import BinaryIO, List, Callable + + +class BaseStorage(ABC): + """Abstract base class for storage implementations.""" + + @abstractmethod + def save_file(self, file_data: BinaryIO, path: str) -> dict: + """ + Save a file to storage. + + Args: + file_data: File-like object containing the data + path: Path where the file should be stored + + Returns: + dict: A dictionary containing metadata about the saved file, including: + - 'path': The path where the file was saved + - 'storage_type': The type of storage (e.g., 'local', 's3') + - Other storage-specific metadata (e.g., 'uri', 'bucket_name', etc.) + """ + pass + + @abstractmethod + def get_file(self, path: str) -> BinaryIO: + """ + Retrieve a file from storage. + + Args: + path: Path to the file + + Returns: + BinaryIO: File-like object containing the file data + """ + pass + + @abstractmethod + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + This method handles the details of retrieving the file and providing + it to the processor function in an appropriate way based on the storage type. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + pass + + @abstractmethod + def delete_file(self, path: str) -> bool: + """ + Delete a file from storage. + + Args: + path: Path to the file + + Returns: + bool: True if deletion was successful + """ + pass + + @abstractmethod + def file_exists(self, path: str) -> bool: + """ + Check if a file exists. + + Args: + path: Path to the file + + Returns: + bool: True if the file exists + """ + pass + + @abstractmethod + def list_files(self, directory: str) -> List[str]: + """ + List all files in a directory. + + Args: + directory: Directory path to list + + Returns: + List[str]: List of file paths + """ + pass diff --git a/application/storage/local.py b/application/storage/local.py new file mode 100644 index 00000000..fb21f08d --- /dev/null +++ b/application/storage/local.py @@ -0,0 +1,103 @@ +"""Local file system implementation.""" +import os +import shutil +from typing import BinaryIO, List, Callable + +from application.storage.base import BaseStorage + + +class LocalStorage(BaseStorage): + """Local file system storage implementation.""" + + def __init__(self, base_dir: str = None): + """ + Initialize local storage. + + Args: + base_dir: Base directory for all operations. If None, uses current directory. + """ + self.base_dir = base_dir or os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) + + def _get_full_path(self, path: str) -> str: + """Get absolute path by combining base_dir and path.""" + if os.path.isabs(path): + return path + return os.path.join(self.base_dir, path) + + def save_file(self, file_data: BinaryIO, path: str) -> dict: + """Save a file to local storage.""" + full_path = self._get_full_path(path) + + os.makedirs(os.path.dirname(full_path), exist_ok=True) + + if hasattr(file_data, 'save'): + file_data.save(full_path) + else: + with open(full_path, 'wb') as f: + shutil.copyfileobj(file_data, f) + + return { + 'storage_type': 'local' + } + + def get_file(self, path: str) -> BinaryIO: + """Get a file from local storage.""" + full_path = self._get_full_path(path) + + if not os.path.exists(full_path): + raise FileNotFoundError(f"File not found: {full_path}") + + return open(full_path, 'rb') + + def delete_file(self, path: str) -> bool: + """Delete a file from local storage.""" + full_path = self._get_full_path(path) + + if not os.path.exists(full_path): + return False + + os.remove(full_path) + return True + + def file_exists(self, path: str) -> bool: + """Check if a file exists in local storage.""" + full_path = self._get_full_path(path) + return os.path.exists(full_path) + + def list_files(self, directory: str) -> List[str]: + """List all files in a directory in local storage.""" + full_path = self._get_full_path(directory) + + if not os.path.exists(full_path): + return [] + + result = [] + for root, _, files in os.walk(full_path): + for file in files: + rel_path = os.path.relpath(os.path.join(root, file), self.base_dir) + result.append(rel_path) + + return result + + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + For local storage, we can directly pass the full path to the processor. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + full_path = self._get_full_path(path) + + if not os.path.exists(full_path): + raise FileNotFoundError(f"File not found: {full_path}") + + return processor_func(local_path=full_path, **kwargs) diff --git a/application/storage/s3.py b/application/storage/s3.py new file mode 100644 index 00000000..abc57c6d --- /dev/null +++ b/application/storage/s3.py @@ -0,0 +1,120 @@ +"""S3 storage implementation.""" +import io +from typing import BinaryIO, List, Callable +import os + +import boto3 +from botocore.exceptions import ClientError + +from application.storage.base import BaseStorage +from application.core.settings import settings + + +class S3Storage(BaseStorage): + """AWS S3 storage implementation.""" + + def __init__(self, bucket_name=None): + """ + Initialize S3 storage. + + Args: + bucket_name: S3 bucket name (optional, defaults to settings) + """ + self.bucket_name = bucket_name or getattr(settings, "S3_BUCKET_NAME", "docsgpt-test-bucket") + + # Get credentials from settings + aws_access_key_id = getattr(settings, "SAGEMAKER_ACCESS_KEY", None) + aws_secret_access_key = getattr(settings, "SAGEMAKER_SECRET_KEY", None) + region_name = getattr(settings, "SAGEMAKER_REGION", None) + + self.s3 = boto3.client( + 's3', + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + region_name=region_name + ) + + def save_file(self, file_data: BinaryIO, path: str) -> dict: + """Save a file to S3 storage.""" + self.s3.upload_fileobj(file_data, self.bucket_name, path) + + region = getattr(settings, "SAGEMAKER_REGION", None) + + return { + 'storage_type': 's3', + 'bucket_name': self.bucket_name, + 'uri': f's3://{self.bucket_name}/{path}', + 'region': region + } + + def get_file(self, path: str) -> BinaryIO: + """Get a file from S3 storage.""" + if not self.file_exists(path): + raise FileNotFoundError(f"File not found: {path}") + + file_obj = io.BytesIO() + self.s3.download_fileobj(self.bucket_name, path, file_obj) + file_obj.seek(0) + return file_obj + + def delete_file(self, path: str) -> bool: + """Delete a file from S3 storage.""" + try: + self.s3.delete_object(Bucket=self.bucket_name, Key=path) + return True + except ClientError: + return False + + def file_exists(self, path: str) -> bool: + """Check if a file exists in S3 storage.""" + try: + self.s3.head_object(Bucket=self.bucket_name, Key=path) + return True + except ClientError: + return False + + def list_files(self, directory: str) -> List[str]: + """List all files in a directory in S3 storage.""" + # Ensure directory ends with a slash if it's not empty + if directory and not directory.endswith('/'): + directory += '/' + + result = [] + paginator = self.s3.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=self.bucket_name, Prefix=directory) + + for page in pages: + if 'Contents' in page: + for obj in page['Contents']: + result.append(obj['Key']) + + return result + + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + import tempfile + import logging + + if not self.file_exists(path): + raise FileNotFoundError(f"File not found in S3: {path}") + + with tempfile.NamedTemporaryFile(suffix=os.path.splitext(path)[1], delete=True) as temp_file: + try: + # Download the file from S3 to the temporary file + self.s3.download_fileobj(self.bucket_name, path, temp_file) + temp_file.flush() + + return processor_func(local_path=temp_file.name, **kwargs) + except Exception as e: + logging.error(f"Error processing S3 file {path}: {e}", exc_info=True) + raise diff --git a/application/storage/storage_creator.py b/application/storage/storage_creator.py new file mode 100644 index 00000000..3eca2f47 --- /dev/null +++ b/application/storage/storage_creator.py @@ -0,0 +1,32 @@ +"""Storage factory for creating different storage implementations.""" +from typing import Dict, Type + +from application.storage.base import BaseStorage +from application.storage.local import LocalStorage +from application.storage.s3 import S3Storage +from application.core.settings import settings + + +class StorageCreator: + storages: Dict[str, Type[BaseStorage]] = { + "local": LocalStorage, + "s3": S3Storage, + } + + _instance = None + + @classmethod + def get_storage(cls) -> BaseStorage: + if cls._instance is None: + storage_type = getattr(settings, "STORAGE_TYPE", "local") + cls._instance = cls.create_storage(storage_type) + + return cls._instance + + @classmethod + def create_storage(cls, type_name: str, *args, **kwargs) -> BaseStorage: + storage_class = cls.storages.get(type_name.lower()) + if not storage_class: + raise ValueError(f"No storage implementation found for type {type_name}") + + return storage_class(*args, **kwargs) diff --git a/application/usage.py b/application/usage.py index 85328c1f..46620fff 100644 --- a/application/usage.py +++ b/application/usage.py @@ -2,10 +2,11 @@ import sys from datetime import datetime from application.core.mongo_db import MongoDB +from application.core.settings import settings from application.utils import num_tokens_from_object_or_list, num_tokens_from_string mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] usage_collection = db["token_usage"] diff --git a/application/vectorstore/faiss.py b/application/vectorstore/faiss.py index 87ffcccb..ce455bd8 100644 --- a/application/vectorstore/faiss.py +++ b/application/vectorstore/faiss.py @@ -1,17 +1,19 @@ import os +import tempfile from langchain_community.vectorstores import FAISS from application.core.settings import settings from application.parser.schema.base import Document from application.vectorstore.base import BaseVectorStore +from application.storage.storage_creator import StorageCreator def get_vectorstore(path: str) -> str: if path: - vectorstore = os.path.join("application", "indexes", path) + vectorstore = f"indexes/{path}" else: - vectorstore = os.path.join("application") + vectorstore = "indexes" return vectorstore @@ -21,16 +23,36 @@ class FaissStore(BaseVectorStore): self.source_id = source_id self.path = get_vectorstore(source_id) self.embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) + self.storage = StorageCreator.get_storage() try: if docs_init: self.docsearch = FAISS.from_documents(docs_init, self.embeddings) else: - self.docsearch = FAISS.load_local( - self.path, self.embeddings, allow_dangerous_deserialization=True - ) - except Exception: - raise + with tempfile.TemporaryDirectory() as temp_dir: + faiss_path = f"{self.path}/index.faiss" + pkl_path = f"{self.path}/index.pkl" + + if not self.storage.file_exists(faiss_path) or not self.storage.file_exists(pkl_path): + raise FileNotFoundError(f"Index files not found in storage at {self.path}") + + faiss_file = self.storage.get_file(faiss_path) + pkl_file = self.storage.get_file(pkl_path) + + local_faiss_path = os.path.join(temp_dir, "index.faiss") + local_pkl_path = os.path.join(temp_dir, "index.pkl") + + with open(local_faiss_path, 'wb') as f: + f.write(faiss_file.read()) + + with open(local_pkl_path, 'wb') as f: + f.write(pkl_file.read()) + + self.docsearch = FAISS.load_local( + temp_dir, self.embeddings, allow_dangerous_deserialization=True + ) + except Exception as e: + raise Exception(f"Error loading FAISS index: {str(e)}") self.assert_embedding_dimensions(self.embeddings) diff --git a/application/worker.py b/application/worker.py index 537206b7..e7ac85a9 100755 --- a/application/worker.py +++ b/application/worker.py @@ -1,9 +1,14 @@ +import datetime +import io import json import logging +import mimetypes import os import shutil import string +import tempfile import zipfile + from collections import Counter from urllib.parse import urljoin @@ -22,10 +27,12 @@ from application.parser.file.bulk import SimpleDirectoryReader from application.parser.remote.remote_creator import RemoteCreator from application.parser.schema.base import Document from application.retriever.retriever_creator import RetrieverCreator -from application.utils import count_tokens_docs + +from application.storage.storage_creator import StorageCreator +from application.utils import count_tokens_docs, num_tokens_from_string mongo = MongoDB.get_client() -db = mongo["docsgpt"] +db = mongo[settings.MONGO_DB_NAME] sources_collection = db["sources"] # Constants @@ -210,68 +217,87 @@ def ingest_worker( limit = None exclude = True sample = False + + storage = StorageCreator.get_storage() + full_path = os.path.join(directory, user, name_job) + source_file_path = os.path.join(full_path, filename) logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job}) - file_data = {"name": name_job, "file": filename, "user": user} - if not os.path.exists(full_path): - os.makedirs(full_path) - download_file( - urljoin(settings.API_URL, "/api/download"), - file_data, - os.path.join(full_path, filename), - ) + # Create temporary working directory + with tempfile.TemporaryDirectory() as temp_dir: + try: + os.makedirs(temp_dir, exist_ok=True) - # check if file is .zip and extract it - if filename.endswith(".zip"): - extract_zip_recursive( - os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH - ) + # Download file from storage to temp directory + temp_file_path = os.path.join(temp_dir, filename) + file_data = storage.get_file(source_file_path) - self.update_state(state="PROGRESS", meta={"current": 1}) + with open(temp_file_path, "wb") as f: + f.write(file_data.read()) - raw_docs = SimpleDirectoryReader( - input_dir=full_path, - input_files=input_files, - recursive=recursive, - required_exts=formats, - num_files_limit=limit, - exclude_hidden=exclude, - file_metadata=metadata_from_filename, - ).load_data() + self.update_state(state="PROGRESS", meta={"current": 1}) - chunker = Chunker( - chunking_strategy="classic_chunk", - max_tokens=MAX_TOKENS, - min_tokens=MIN_TOKENS, - duplicate_headers=False, - ) - raw_docs = chunker.chunk(documents=raw_docs) + # Handle zip files + if filename.endswith(".zip"): + logging.info(f"Extracting zip file: {filename}") + extract_zip_recursive( + temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH + ) - docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - id = ObjectId() + if sample: + logging.info(f"Sample mode enabled. Using {limit} documents.") - embed_and_store_documents(docs, full_path, id, self) - tokens = count_tokens_docs(docs) - self.update_state(state="PROGRESS", meta={"current": 100}) + reader = SimpleDirectoryReader( + input_dir=temp_dir, + input_files=input_files, + recursive=recursive, + required_exts=formats, + exclude_hidden=exclude, + file_metadata=metadata_from_filename, + ) + raw_docs = reader.load_data() - if sample: - for i in range(min(5, len(raw_docs))): - logging.info(f"Sample document {i}: {raw_docs[i]}") + chunker = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False, + ) + raw_docs = chunker.chunk(documents=raw_docs) - file_data.update( - { - "tokens": tokens, - "retriever": retriever, - "id": str(id), - "type": "local", - } - ) - upload_index(full_path, file_data) + docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - # delete local - shutil.rmtree(full_path) + id = ObjectId() + + vector_store_path = os.path.join(temp_dir, "vector_store") + os.makedirs(vector_store_path, exist_ok=True) + + embed_and_store_documents(docs, vector_store_path, id, self) + + tokens = count_tokens_docs(docs) + + self.update_state(state="PROGRESS", meta={"current": 100}) + + if sample: + for i in range(min(5, len(raw_docs))): + logging.info(f"Sample document {i}: {raw_docs[i]}") + file_data = { + "name": name_job, + "file": filename, + "user": user, + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": "local", + } + + upload_index(vector_store_path, file_data) + + except Exception as e: + logging.error(f"Error in ingest_worker: {e}", exc_info=True) + raise return { "directory": directory, @@ -407,68 +433,69 @@ def sync_worker(self, frequency): } -def attachment_worker(self, directory, file_info, user): +def attachment_worker(self, file_info, user): """ Process and store a single attachment without vectorization. - - Args: - self: Reference to the instance of the task. - directory (str): Base directory for storing files. - file_info (dict): Dictionary with folder and filename info. - user (str): User identifier. - - Returns: - dict: Information about processed attachment. """ - import datetime - import mimetypes - import os - - from application.utils import num_tokens_from_string mongo = MongoDB.get_client() - db = mongo["docsgpt"] + db = mongo[settings.MONGO_DB_NAME] attachments_collection = db["attachments"] filename = file_info["filename"] attachment_id = file_info["attachment_id"] - - logging.info( - f"Processing attachment: {attachment_id}/{filename}", extra={"user": user} - ) - - self.update_state(state="PROGRESS", meta={"current": 10}) - - file_path = os.path.join(directory, filename) - - if not os.path.exists(file_path): - logging.warning(f"File not found: {file_path}", extra={"user": user}) - raise FileNotFoundError(f"File not found: {file_path}") + relative_path = file_info["path"] + file_content = file_info["file_content"] try: - reader = SimpleDirectoryReader(input_files=[file_path]) - documents = reader.load_data() + self.update_state(state="PROGRESS", meta={"current": 10}) + storage_type = getattr(settings, "STORAGE_TYPE", "local") + storage = StorageCreator.create_storage(storage_type) + self.update_state( + state="PROGRESS", meta={"current": 30, "status": "Processing content"} + ) - self.update_state(state="PROGRESS", meta={"current": 50}) + with tempfile.NamedTemporaryFile( + suffix=os.path.splitext(filename)[1] + ) as temp_file: + temp_file.write(file_content) + temp_file.flush() + reader = SimpleDirectoryReader( + input_files=[temp_file.name], exclude_hidden=True, errors="ignore" + ) + documents = reader.load_data() + + if not documents: + logging.warning(f"No content extracted from file: {filename}") + raise ValueError(f"Failed to extract content from file: {filename}") - if documents: content = documents[0].text token_count = num_tokens_from_string(content) - file_path_relative = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{attachment_id}/{filename}" + self.update_state( + state="PROGRESS", meta={"current": 60, "status": "Saving file"} + ) + file_obj = io.BytesIO(file_content) - mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream" + metadata = storage.save_file(file_obj, relative_path) + + mime_type = mimetypes.guess_type(filename)[0] or "application/octet-stream" + + self.update_state( + state="PROGRESS", meta={"current": 80, "status": "Storing in database"} + ) doc_id = ObjectId(attachment_id) attachments_collection.insert_one( { "_id": doc_id, "user": user, - "path": file_path_relative, + "path": relative_path, "content": content, "token_count": token_count, "mime_type": mime_type, "date": datetime.datetime.now(), + "metadata": metadata, } ) @@ -476,20 +503,19 @@ def attachment_worker(self, directory, file_info, user): f"Stored attachment with ID: {attachment_id}", extra={"user": user} ) - self.update_state(state="PROGRESS", meta={"current": 100}) + self.update_state( + state="PROGRESS", meta={"current": 100, "status": "Complete"} + ) return { "filename": filename, - "path": file_path_relative, + "path": relative_path, "token_count": token_count, "attachment_id": attachment_id, "mime_type": mime_type, + "metadata": metadata, } - else: - logging.warning( - "No content was extracted from the file", extra={"user": user} - ) - raise ValueError("No content was extracted from the file") + except Exception as e: logging.error( f"Error processing file {filename}: {e}",