Merge pull request #1742 from ManishMadan2882/main

File System Abstraction
This commit is contained in:
Alex
2025-04-24 01:32:27 +03:00
committed by GitHub
13 changed files with 701 additions and 340 deletions

View File

@@ -15,95 +15,86 @@ class LLMHandler(ABC):
@abstractmethod
def handle_response(self, agent, resp, tools_dict, messages, attachments=None, **kwargs):
pass
def prepare_messages_with_attachments(self, agent, messages, attachments=None):
"""
Prepare messages with attachment content if available.
Args:
agent: The current agent instance.
messages (list): List of message dictionaries.
attachments (list): List of attachment dictionaries with content.
Returns:
list: Messages with attachment context added to the system prompt.
"""
if not attachments:
return messages
logger.info(f"Preparing messages with {len(attachments)} attachments")
supported_types = agent.llm.get_supported_attachment_types()
supported_attachments = []
unsupported_attachments = []
for attachment in attachments:
mime_type = attachment.get('mime_type')
if not mime_type:
import mimetypes
file_path = attachment.get('path')
if file_path:
mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
else:
unsupported_attachments.append(attachment)
continue
if mime_type in supported_types:
supported_attachments.append(attachment)
else:
unsupported_attachments.append(attachment)
# Process supported attachments with the LLM's custom method
prepared_messages = messages
if supported_attachments:
logger.info(f"Processing {len(supported_attachments)} supported attachments with {agent.llm.__class__.__name__}'s method")
prepared_messages = agent.llm.prepare_messages_with_attachments(messages, supported_attachments)
# Process unsupported attachments with the default method
if unsupported_attachments:
logger.info(f"Processing {len(unsupported_attachments)} unsupported attachments with default method")
prepared_messages = self._append_attachment_content_to_system(prepared_messages, unsupported_attachments)
return prepared_messages
def _append_attachment_content_to_system(self, messages, attachments):
"""
Default method to append attachment content to the system prompt.
Args:
messages (list): List of message dictionaries.
attachments (list): List of attachment dictionaries with content.
Returns:
list: Messages with attachment context added to the system prompt.
"""
prepared_messages = messages.copy()
attachment_texts = []
for attachment in attachments:
logger.info(f"Adding attachment {attachment.get('id')} to context")
if 'content' in attachment:
attachment_texts.append(f"Attached file content:\n\n{attachment['content']}")
if attachment_texts:
combined_attachment_text = "\n\n".join(attachment_texts)
system_found = False
for i in range(len(prepared_messages)):
if prepared_messages[i].get("role") == "system":
prepared_messages[i]["content"] += f"\n\n{combined_attachment_text}"
system_found = True
break
if not system_found:
prepared_messages.insert(0, {"role": "system", "content": combined_attachment_text})
return prepared_messages
class OpenAILLMHandler(LLMHandler):
def handle_response(self, agent, resp, tools_dict, messages, attachments=None, stream: bool = True):
messages = self.prepare_messages_with_attachments(agent, messages, attachments)
logger.info(f"Messages with attachments: {messages}")
if not stream:
@@ -167,7 +158,7 @@ class OpenAILLMHandler(LLMHandler):
if isinstance(chunk, str) and len(chunk) > 0:
yield chunk
continue
elif hasattr(chunk, "delta"):
elif hasattr(chunk, "delta"):
chunk_delta = chunk.delta
if (
@@ -258,7 +249,7 @@ class OpenAILLMHandler(LLMHandler):
return resp
elif isinstance(chunk, str) and len(chunk) == 0:
continue
logger.info(f"Regenerating with messages: {messages}")
resp = agent.llm.gen_stream(
model=agent.gpt_model, messages=messages, tools=agent.tools
@@ -269,9 +260,9 @@ class OpenAILLMHandler(LLMHandler):
class GoogleLLMHandler(LLMHandler):
def handle_response(self, agent, resp, tools_dict, messages, attachments=None, stream: bool = True):
from google.genai import types
messages = self.prepare_messages_with_attachments(agent, messages, attachments)
while True:
if not stream:
response = agent.llm.gen(

View File

@@ -3,10 +3,13 @@ import datetime
from flask import Blueprint, request, send_from_directory
from werkzeug.utils import secure_filename
from bson.objectid import ObjectId
import logging
from application.core.mongo_db import MongoDB
from application.core.settings import settings
from application.storage.storage_creator import StorageCreator
logger = logging.getLogger(__name__)
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
@@ -45,26 +48,26 @@ def upload_index_files():
remote_data = request.form["remote_data"] if "remote_data" in request.form else None
sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None
save_dir = os.path.join(current_dir, "indexes", str(id))
storage = StorageCreator.get_storage()
index_base_path = f"indexes/{id}"
if settings.VECTOR_STORE == "faiss":
if "file_faiss" not in request.files:
print("No file part")
logger.error("No file_faiss part")
return {"status": "no file"}
file_faiss = request.files["file_faiss"]
if file_faiss.filename == "":
return {"status": "no file name"}
if "file_pkl" not in request.files:
print("No file part")
logger.error("No file_pkl part")
return {"status": "no file"}
file_pkl = request.files["file_pkl"]
if file_pkl.filename == "":
return {"status": "no file name"}
# saves index files
if not os.path.exists(save_dir):
os.makedirs(save_dir)
file_faiss.save(os.path.join(save_dir, "index.faiss"))
file_pkl.save(os.path.join(save_dir, "index.pkl"))
# Save index files to storage
storage.save_file(file_faiss, f"{index_base_path}/index.faiss")
storage.save_file(file_pkl, f"{index_base_path}/index.pkl")
existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry:

View File

@@ -413,81 +413,85 @@ class UploadFile(Resource):
user = secure_filename(decoded_token.get("sub"))
job_name = secure_filename(request.form["name"])
try:
save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name)
os.makedirs(save_dir, exist_ok=True)
from application.storage.storage_creator import StorageCreator
storage = StorageCreator.get_storage()
base_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}"
if len(files) > 1:
temp_dir = os.path.join(save_dir, "temp")
os.makedirs(temp_dir, exist_ok=True)
temp_files = []
for file in files:
filename = secure_filename(file.filename)
file.save(os.path.join(temp_dir, filename))
temp_path = f"{base_path}/temp/{filename}"
storage.save_file(file, temp_path)
temp_files.append(temp_path)
print(f"Saved file: {filename}")
zip_path = shutil.make_archive(
base_name=os.path.join(save_dir, job_name),
format="zip",
root_dir=temp_dir,
)
final_filename = os.path.basename(zip_path)
shutil.rmtree(temp_dir)
zip_filename = f"{job_name}.zip"
zip_path = f"{base_path}/{zip_filename}"
def create_zip_archive(temp_paths, **kwargs):
import tempfile
with tempfile.TemporaryDirectory() as temp_dir:
for path in temp_paths:
file_data = storage.get_file(path)
with open(os.path.join(temp_dir, os.path.basename(path)), 'wb') as f:
f.write(file_data.read())
# Create zip archive
zip_temp = shutil.make_archive(
base_name=os.path.join(temp_dir, job_name),
format="zip",
root_dir=temp_dir
)
return zip_temp
zip_temp_path = create_zip_archive(temp_files)
with open(zip_temp_path, 'rb') as zip_file:
storage.save_file(zip_file, zip_path)
# Clean up temp files
for temp_path in temp_files:
storage.delete_file(temp_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst",
".md",
".pdf",
".txt",
".docx",
".csv",
".epub",
".html",
".mdx",
".json",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
],
job_name,
final_filename,
zip_filename,
user,
)
else:
# For single file
file = files[0]
final_filename = secure_filename(file.filename)
file_path = os.path.join(save_dir, final_filename)
file.save(file_path)
filename = secure_filename(file.filename)
file_path = f"{base_path}/{filename}"
storage.save_file(file, file_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst",
".md",
".pdf",
".txt",
".docx",
".csv",
".epub",
".html",
".mdx",
".json",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
],
job_name,
final_filename,
filename,
user,
)
except Exception as err:
current_app.logger.error(f"Error uploading file: {err}")
return make_response(jsonify({"success": False}), 400)
return make_response(jsonify({"success": True, "task_id": task.id}), 200)
@@ -2780,10 +2784,9 @@ class StoreAttachment(Resource):
decoded_token = request.decoded_token
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
# Get single file instead of list
file = request.files.get("file")
if not file or file.filename == "":
return make_response(
jsonify({"status": "error", "message": "Missing file"}),
@@ -2791,43 +2794,35 @@ class StoreAttachment(Resource):
)
user = secure_filename(decoded_token.get("sub"))
try:
attachment_id = ObjectId()
original_filename = secure_filename(file.filename)
save_dir = os.path.join(
current_dir,
settings.UPLOAD_FOLDER,
user,
"attachments",
str(attachment_id),
)
os.makedirs(save_dir, exist_ok=True)
file_path = os.path.join(save_dir, original_filename)
file.save(file_path)
relative_path = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{str(attachment_id)}/{original_filename}"
file_content = file.read()
file_info = {
"filename": original_filename,
"attachment_id": str(attachment_id),
"path": relative_path,
"file_content": file_content
}
current_app.logger.info(f"Saved file: {file_path}")
# Start async task to process single file
task = store_attachment.delay(save_dir, file_info, user)
return make_response(
jsonify(
{
"success": True,
"task_id": task.id,
"message": "File uploaded successfully. Processing started.",
}
),
200,
task = store_attachment.delay(
file_info,
user
)
return make_response(
jsonify({
"success": True,
"task_id": task.id,
"message": "File uploaded successfully. Processing started."
}),
200
)
except Exception as err:
current_app.logger.error(f"Error storing attachment: {err}")
return make_response(jsonify({"success": False, "error": str(err)}), 400)

View File

@@ -23,8 +23,8 @@ def schedule_syncs(self, frequency):
@celery.task(bind=True)
def store_attachment(self, directory, saved_files, user):
resp = attachment_worker(self, directory, saved_files, user)
def store_attachment(self, file_info, user):
resp = attachment_worker(self, file_info, user)
return resp

View File

@@ -98,6 +98,8 @@ class Settings(BaseSettings):
BRAVE_SEARCH_API_KEY: Optional[str] = None
FLASK_DEBUG_MODE: bool = False
STORAGE_TYPE: str = "local" # local or s3
JWT_SECRET_KEY: str = ""

View File

@@ -1,11 +1,10 @@
from google import genai
from google.genai import types
import os
import logging
import mimetypes
import json
from application.llm.base import BaseLLM
from application.storage.storage_creator import StorageCreator
class GoogleLLM(BaseLLM):
@@ -14,11 +13,12 @@ class GoogleLLM(BaseLLM):
self.api_key = api_key
self.user_api_key = user_api_key
self.client = genai.Client(api_key=self.api_key)
self.storage = StorageCreator.get_storage()
def get_supported_attachment_types(self):
"""
Return a list of MIME types supported by Google Gemini for file uploads.
Returns:
list: List of supported MIME types
"""
@@ -30,35 +30,35 @@ class GoogleLLM(BaseLLM):
'image/webp',
'image/gif'
]
def prepare_messages_with_attachments(self, messages, attachments=None):
"""
Process attachments using Google AI's file API for more efficient handling.
Args:
messages (list): List of message dictionaries.
attachments (list): List of attachment dictionaries with content and metadata.
Returns:
list: Messages formatted with file references for Google AI API.
"""
if not attachments:
return messages
prepared_messages = messages.copy()
# Find the user message to attach files to the last one
user_message_index = None
for i in range(len(prepared_messages) - 1, -1, -1):
if prepared_messages[i].get("role") == "user":
user_message_index = i
break
if user_message_index is None:
user_message = {"role": "user", "content": []}
prepared_messages.append(user_message)
user_message_index = len(prepared_messages) - 1
if isinstance(prepared_messages[user_message_index].get("content"), str):
text_content = prepared_messages[user_message_index]["content"]
prepared_messages[user_message_index]["content"] = [
@@ -66,15 +66,11 @@ class GoogleLLM(BaseLLM):
]
elif not isinstance(prepared_messages[user_message_index].get("content"), list):
prepared_messages[user_message_index]["content"] = []
files = []
for attachment in attachments:
mime_type = attachment.get('mime_type')
if not mime_type:
file_path = attachment.get('path')
if file_path:
mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
if mime_type in self.get_supported_attachment_types():
try:
file_uri = self._upload_file_to_google(attachment)
@@ -84,53 +80,44 @@ class GoogleLLM(BaseLLM):
logging.error(f"GoogleLLM: Error uploading file: {e}")
if 'content' in attachment:
prepared_messages[user_message_index]["content"].append({
"type": "text",
"type": "text",
"text": f"[File could not be processed: {attachment.get('path', 'unknown')}]"
})
if files:
logging.info(f"GoogleLLM: Adding {len(files)} files to message")
prepared_messages[user_message_index]["content"].append({
"files": files
})
return prepared_messages
def _upload_file_to_google(self, attachment):
"""
Upload a file to Google AI and return the file URI.
Args:
attachment (dict): Attachment dictionary with path and metadata.
Returns:
str: Google AI file URI for the uploaded file.
"""
if 'google_file_uri' in attachment:
return attachment['google_file_uri']
file_path = attachment.get('path')
if not file_path:
raise ValueError("No file path provided in attachment")
if not os.path.isabs(file_path):
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
file_path = os.path.join(current_dir, "application", file_path)
if not os.path.exists(file_path):
if not self.storage.file_exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
mime_type = attachment.get('mime_type')
if not mime_type:
mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
try:
response = self.client.files.upload(file=file_path)
file_uri = response.uri
file_uri = self.storage.process_file(
file_path,
lambda local_path, **kwargs: self.client.files.upload(file=local_path).uri
)
from application.core.mongo_db import MongoDB
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
@@ -140,7 +127,7 @@ class GoogleLLM(BaseLLM):
{"_id": attachment['_id']},
{"$set": {"google_file_uri": file_uri}}
)
return file_uri
except Exception as e:
logging.error(f"Error uploading file to Google AI: {e}")
@@ -289,7 +276,7 @@ class GoogleLLM(BaseLLM):
if tools:
cleaned_tools = self._clean_tools_format(tools)
config.tools = cleaned_tools
# Check if we have both tools and file attachments
has_attachments = False
for message in messages:
@@ -299,16 +286,16 @@ class GoogleLLM(BaseLLM):
break
if has_attachments:
break
logging.info(f"GoogleLLM: Starting stream generation. Model: {model}, Messages: {json.dumps(messages, default=str)}, Has attachments: {has_attachments}")
response = client.models.generate_content_stream(
model=model,
contents=messages,
config=config,
)
for chunk in response:
if hasattr(chunk, "candidates") and chunk.candidates:
for candidate in chunk.candidates:

View File

@@ -1,11 +1,10 @@
import json
import base64
import os
import mimetypes
import logging
from application.core.settings import settings
from application.llm.base import BaseLLM
from application.storage.storage_creator import StorageCreator
class OpenAILLM(BaseLLM):
@@ -20,6 +19,7 @@ class OpenAILLM(BaseLLM):
self.client = OpenAI(api_key=api_key)
self.api_key = api_key
self.user_api_key = user_api_key
self.storage = StorageCreator.get_storage()
def _clean_messages_openai(self, messages):
cleaned_messages = []
@@ -77,6 +77,8 @@ class OpenAILLM(BaseLLM):
content_parts.append(item)
elif "type" in item and item["type"] == "file" and "file" in item:
content_parts.append(item)
elif "type" in item and item["type"] == "image_url" and "image_url" in item:
content_parts.append(item)
cleaned_messages.append({"role": role, "content": content_parts})
else:
raise ValueError(
@@ -149,7 +151,7 @@ class OpenAILLM(BaseLLM):
def get_supported_attachment_types(self):
"""
Return a list of MIME types supported by OpenAI for file uploads.
Returns:
list: List of supported MIME types
"""
@@ -161,35 +163,35 @@ class OpenAILLM(BaseLLM):
'image/webp',
'image/gif'
]
def prepare_messages_with_attachments(self, messages, attachments=None):
"""
Process attachments using OpenAI's file API for more efficient handling.
Args:
messages (list): List of message dictionaries.
attachments (list): List of attachment dictionaries with content and metadata.
Returns:
list: Messages formatted with file references for OpenAI API.
"""
if not attachments:
return messages
prepared_messages = messages.copy()
# Find the user message to attach file_id to the last one
user_message_index = None
for i in range(len(prepared_messages) - 1, -1, -1):
if prepared_messages[i].get("role") == "user":
user_message_index = i
break
if user_message_index is None:
user_message = {"role": "user", "content": []}
prepared_messages.append(user_message)
user_message_index = len(prepared_messages) - 1
if isinstance(prepared_messages[user_message_index].get("content"), str):
text_content = prepared_messages[user_message_index]["content"]
prepared_messages[user_message_index]["content"] = [
@@ -197,14 +199,10 @@ class OpenAILLM(BaseLLM):
]
elif not isinstance(prepared_messages[user_message_index].get("content"), list):
prepared_messages[user_message_index]["content"] = []
for attachment in attachments:
mime_type = attachment.get('mime_type')
if not mime_type:
file_path = attachment.get('path')
if file_path:
mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
if mime_type and mime_type.startswith('image/'):
try:
base64_image = self._get_base64_image(attachment)
@@ -218,14 +216,13 @@ class OpenAILLM(BaseLLM):
logging.error(f"Error processing image attachment: {e}")
if 'content' in attachment:
prepared_messages[user_message_index]["content"].append({
"type": "text",
"type": "text",
"text": f"[Image could not be processed: {attachment.get('path', 'unknown')}]"
})
# Handle PDFs using the file API
elif mime_type == 'application/pdf':
try:
file_id = self._upload_file_to_openai(attachment)
prepared_messages[user_message_index]["content"].append({
"type": "file",
"file": {"file_id": file_id}
@@ -234,80 +231,64 @@ class OpenAILLM(BaseLLM):
logging.error(f"Error uploading PDF to OpenAI: {e}")
if 'content' in attachment:
prepared_messages[user_message_index]["content"].append({
"type": "text",
"type": "text",
"text": f"File content:\n\n{attachment['content']}"
})
return prepared_messages
def _get_base64_image(self, attachment):
"""
Convert an image file to base64 encoding.
Args:
attachment (dict): Attachment dictionary with path and metadata.
Returns:
str: Base64-encoded image data.
"""
file_path = attachment.get('path')
if not file_path:
raise ValueError("No file path provided in attachment")
if not os.path.isabs(file_path):
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
file_path = os.path.join(current_dir, "application", file_path)
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def _upload_file_to_openai(self, attachment): ##pdfs
try:
with self.storage.get_file(file_path) as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {file_path}")
def _upload_file_to_openai(self, attachment):
"""
Upload a file to OpenAI and return the file_id.
Args:
attachment (dict): Attachment dictionary with path and metadata.
Expected keys:
- path: Path to the file
- id: Optional MongoDB ID for caching
Returns:
str: OpenAI file_id for the uploaded file.
"""
import os
import logging
if 'openai_file_id' in attachment:
return attachment['openai_file_id']
file_path = attachment.get('path')
if not file_path:
raise ValueError("No file path provided in attachment")
if not os.path.isabs(file_path):
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
file_path = os.path.join(current_dir,"application", file_path)
if not os.path.exists(file_path):
if not self.storage.file_exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
with open(file_path, 'rb') as file:
response = self.client.files.create(
file=file,
file_id = self.storage.process_file(
file_path,
lambda local_path, **kwargs: self.client.files.create(
file=open(local_path, 'rb'),
purpose="assistants"
)
file_id = response.id
).id
)
from application.core.mongo_db import MongoDB
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
@@ -317,7 +298,7 @@ class OpenAILLM(BaseLLM):
{"_id": attachment['_id']},
{"$set": {"openai_file_id": file_id}}
)
return file_id
except Exception as e:
logging.error(f"Error uploading file to OpenAI: {e}")
@@ -327,7 +308,7 @@ class OpenAILLM(BaseLLM):
class AzureOpenAILLM(OpenAILLM):
def __init__(
self, api_key, user_api_key, *args, **kwargs
self, api_key, user_api_key, *args, **kwargs
):
super().__init__(api_key)

View File

@@ -0,0 +1,94 @@
"""Base storage class for file system abstraction."""
from abc import ABC, abstractmethod
from typing import BinaryIO, List, Callable
class BaseStorage(ABC):
"""Abstract base class for storage implementations."""
@abstractmethod
def save_file(self, file_data: BinaryIO, path: str) -> dict:
"""
Save a file to storage.
Args:
file_data: File-like object containing the data
path: Path where the file should be stored
Returns:
dict: A dictionary containing metadata about the saved file, including:
- 'path': The path where the file was saved
- 'storage_type': The type of storage (e.g., 'local', 's3')
- Other storage-specific metadata (e.g., 'uri', 'bucket_name', etc.)
"""
pass
@abstractmethod
def get_file(self, path: str) -> BinaryIO:
"""
Retrieve a file from storage.
Args:
path: Path to the file
Returns:
BinaryIO: File-like object containing the file data
"""
pass
@abstractmethod
def process_file(self, path: str, processor_func: Callable, **kwargs):
"""
Process a file using the provided processor function.
This method handles the details of retrieving the file and providing
it to the processor function in an appropriate way based on the storage type.
Args:
path: Path to the file
processor_func: Function that processes the file
**kwargs: Additional arguments to pass to the processor function
Returns:
The result of the processor function
"""
pass
@abstractmethod
def delete_file(self, path: str) -> bool:
"""
Delete a file from storage.
Args:
path: Path to the file
Returns:
bool: True if deletion was successful
"""
pass
@abstractmethod
def file_exists(self, path: str) -> bool:
"""
Check if a file exists.
Args:
path: Path to the file
Returns:
bool: True if the file exists
"""
pass
@abstractmethod
def list_files(self, directory: str) -> List[str]:
"""
List all files in a directory.
Args:
directory: Directory path to list
Returns:
List[str]: List of file paths
"""
pass

View File

@@ -0,0 +1,103 @@
"""Local file system implementation."""
import os
import shutil
from typing import BinaryIO, List, Callable
from application.storage.base import BaseStorage
class LocalStorage(BaseStorage):
"""Local file system storage implementation."""
def __init__(self, base_dir: str = None):
"""
Initialize local storage.
Args:
base_dir: Base directory for all operations. If None, uses current directory.
"""
self.base_dir = base_dir or os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
def _get_full_path(self, path: str) -> str:
"""Get absolute path by combining base_dir and path."""
if os.path.isabs(path):
return path
return os.path.join(self.base_dir, path)
def save_file(self, file_data: BinaryIO, path: str) -> dict:
"""Save a file to local storage."""
full_path = self._get_full_path(path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
if hasattr(file_data, 'save'):
file_data.save(full_path)
else:
with open(full_path, 'wb') as f:
shutil.copyfileobj(file_data, f)
return {
'storage_type': 'local'
}
def get_file(self, path: str) -> BinaryIO:
"""Get a file from local storage."""
full_path = self._get_full_path(path)
if not os.path.exists(full_path):
raise FileNotFoundError(f"File not found: {full_path}")
return open(full_path, 'rb')
def delete_file(self, path: str) -> bool:
"""Delete a file from local storage."""
full_path = self._get_full_path(path)
if not os.path.exists(full_path):
return False
os.remove(full_path)
return True
def file_exists(self, path: str) -> bool:
"""Check if a file exists in local storage."""
full_path = self._get_full_path(path)
return os.path.exists(full_path)
def list_files(self, directory: str) -> List[str]:
"""List all files in a directory in local storage."""
full_path = self._get_full_path(directory)
if not os.path.exists(full_path):
return []
result = []
for root, _, files in os.walk(full_path):
for file in files:
rel_path = os.path.relpath(os.path.join(root, file), self.base_dir)
result.append(rel_path)
return result
def process_file(self, path: str, processor_func: Callable, **kwargs):
"""
Process a file using the provided processor function.
For local storage, we can directly pass the full path to the processor.
Args:
path: Path to the file
processor_func: Function that processes the file
**kwargs: Additional arguments to pass to the processor function
Returns:
The result of the processor function
"""
full_path = self._get_full_path(path)
if not os.path.exists(full_path):
raise FileNotFoundError(f"File not found: {full_path}")
return processor_func(local_path=full_path, **kwargs)

120
application/storage/s3.py Normal file
View File

@@ -0,0 +1,120 @@
"""S3 storage implementation."""
import io
from typing import BinaryIO, List, Callable
import os
import boto3
from botocore.exceptions import ClientError
from application.storage.base import BaseStorage
from application.core.settings import settings
class S3Storage(BaseStorage):
"""AWS S3 storage implementation."""
def __init__(self, bucket_name=None):
"""
Initialize S3 storage.
Args:
bucket_name: S3 bucket name (optional, defaults to settings)
"""
self.bucket_name = bucket_name or getattr(settings, "S3_BUCKET_NAME", "docsgpt-test-bucket")
# Get credentials from settings
aws_access_key_id = getattr(settings, "SAGEMAKER_ACCESS_KEY", None)
aws_secret_access_key = getattr(settings, "SAGEMAKER_SECRET_KEY", None)
region_name = getattr(settings, "SAGEMAKER_REGION", None)
self.s3 = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
def save_file(self, file_data: BinaryIO, path: str) -> dict:
"""Save a file to S3 storage."""
self.s3.upload_fileobj(file_data, self.bucket_name, path)
region = getattr(settings, "SAGEMAKER_REGION", None)
return {
'storage_type': 's3',
'bucket_name': self.bucket_name,
'uri': f's3://{self.bucket_name}/{path}',
'region': region
}
def get_file(self, path: str) -> BinaryIO:
"""Get a file from S3 storage."""
if not self.file_exists(path):
raise FileNotFoundError(f"File not found: {path}")
file_obj = io.BytesIO()
self.s3.download_fileobj(self.bucket_name, path, file_obj)
file_obj.seek(0)
return file_obj
def delete_file(self, path: str) -> bool:
"""Delete a file from S3 storage."""
try:
self.s3.delete_object(Bucket=self.bucket_name, Key=path)
return True
except ClientError:
return False
def file_exists(self, path: str) -> bool:
"""Check if a file exists in S3 storage."""
try:
self.s3.head_object(Bucket=self.bucket_name, Key=path)
return True
except ClientError:
return False
def list_files(self, directory: str) -> List[str]:
"""List all files in a directory in S3 storage."""
# Ensure directory ends with a slash if it's not empty
if directory and not directory.endswith('/'):
directory += '/'
result = []
paginator = self.s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=self.bucket_name, Prefix=directory)
for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
result.append(obj['Key'])
return result
def process_file(self, path: str, processor_func: Callable, **kwargs):
"""
Process a file using the provided processor function.
Args:
path: Path to the file
processor_func: Function that processes the file
**kwargs: Additional arguments to pass to the processor function
Returns:
The result of the processor function
"""
import tempfile
import logging
if not self.file_exists(path):
raise FileNotFoundError(f"File not found in S3: {path}")
with tempfile.NamedTemporaryFile(suffix=os.path.splitext(path)[1], delete=True) as temp_file:
try:
# Download the file from S3 to the temporary file
self.s3.download_fileobj(self.bucket_name, path, temp_file)
temp_file.flush()
return processor_func(local_path=temp_file.name, **kwargs)
except Exception as e:
logging.error(f"Error processing S3 file {path}: {e}", exc_info=True)
raise

View File

@@ -0,0 +1,32 @@
"""Storage factory for creating different storage implementations."""
from typing import Dict, Type
from application.storage.base import BaseStorage
from application.storage.local import LocalStorage
from application.storage.s3 import S3Storage
from application.core.settings import settings
class StorageCreator:
storages: Dict[str, Type[BaseStorage]] = {
"local": LocalStorage,
"s3": S3Storage,
}
_instance = None
@classmethod
def get_storage(cls) -> BaseStorage:
if cls._instance is None:
storage_type = getattr(settings, "STORAGE_TYPE", "local")
cls._instance = cls.create_storage(storage_type)
return cls._instance
@classmethod
def create_storage(cls, type_name: str, *args, **kwargs) -> BaseStorage:
storage_class = cls.storages.get(type_name.lower())
if not storage_class:
raise ValueError(f"No storage implementation found for type {type_name}")
return storage_class(*args, **kwargs)

View File

@@ -1,17 +1,19 @@
import os
import tempfile
from langchain_community.vectorstores import FAISS
from application.core.settings import settings
from application.parser.schema.base import Document
from application.vectorstore.base import BaseVectorStore
from application.storage.storage_creator import StorageCreator
def get_vectorstore(path: str) -> str:
if path:
vectorstore = os.path.join("application", "indexes", path)
vectorstore = f"indexes/{path}"
else:
vectorstore = os.path.join("application")
vectorstore = "indexes"
return vectorstore
@@ -21,16 +23,36 @@ class FaissStore(BaseVectorStore):
self.source_id = source_id
self.path = get_vectorstore(source_id)
self.embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
self.storage = StorageCreator.get_storage()
try:
if docs_init:
self.docsearch = FAISS.from_documents(docs_init, self.embeddings)
else:
self.docsearch = FAISS.load_local(
self.path, self.embeddings, allow_dangerous_deserialization=True
)
except Exception:
raise
with tempfile.TemporaryDirectory() as temp_dir:
faiss_path = f"{self.path}/index.faiss"
pkl_path = f"{self.path}/index.pkl"
if not self.storage.file_exists(faiss_path) or not self.storage.file_exists(pkl_path):
raise FileNotFoundError(f"Index files not found in storage at {self.path}")
faiss_file = self.storage.get_file(faiss_path)
pkl_file = self.storage.get_file(pkl_path)
local_faiss_path = os.path.join(temp_dir, "index.faiss")
local_pkl_path = os.path.join(temp_dir, "index.pkl")
with open(local_faiss_path, 'wb') as f:
f.write(faiss_file.read())
with open(local_pkl_path, 'wb') as f:
f.write(pkl_file.read())
self.docsearch = FAISS.load_local(
temp_dir, self.embeddings, allow_dangerous_deserialization=True
)
except Exception as e:
raise Exception(f"Error loading FAISS index: {str(e)}")
self.assert_embedding_dimensions(self.embeddings)

View File

@@ -3,15 +3,22 @@ import os
import shutil
import string
import zipfile
import io
import datetime
import mimetypes
import requests
import tempfile
from collections import Counter
from urllib.parse import urljoin
import requests
from application.storage.storage_creator import StorageCreator
from application.utils import num_tokens_from_string
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from bson.objectid import ObjectId
from application.core.mongo_db import MongoDB
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.embedding_pipeline import embed_and_store_documents
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.schema.base import Document
@@ -126,62 +133,91 @@ def ingest_worker(
limit = None
exclude = True
sample = False
storage = StorageCreator.get_storage()
full_path = os.path.join(directory, user, name_job)
source_file_path = os.path.join(full_path, filename)
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job})
file_data = {"name": name_job, "file": filename, "user": user}
# Create temporary working directory
with tempfile.TemporaryDirectory() as temp_dir:
try:
os.makedirs(temp_dir, exist_ok=True)
# Download file from storage to temp directory
temp_file_path = os.path.join(temp_dir, filename)
file_data = storage.get_file(source_file_path)
with open(temp_file_path, 'wb') as f:
f.write(file_data.read())
self.update_state(state="PROGRESS", meta={"current": 1})
if not os.path.exists(full_path):
os.makedirs(full_path)
download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename))
# Handle zip files
if filename.endswith('.zip'):
logging.info(f"Extracting zip file: {filename}")
extract_zip_recursive(
temp_file_path,
temp_dir,
current_depth=0,
max_depth=RECURSION_DEPTH
)
# check if file is .zip and extract it
if filename.endswith(".zip"):
extract_zip_recursive(
os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
)
if sample:
logging.info(f"Sample mode enabled. Using {limit} documents.")
self.update_state(state="PROGRESS", meta={"current": 1})
reader = SimpleDirectoryReader(
input_dir=temp_dir,
input_files=input_files,
recursive=recursive,
required_exts=formats,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
)
raw_docs = reader.load_data()
raw_docs = SimpleDirectoryReader(
input_dir=full_path,
input_files=input_files,
recursive=recursive,
required_exts=formats,
num_files_limit=limit,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
).load_data()
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False
)
raw_docs = chunker.chunk(documents=raw_docs)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
id = ObjectId()
vector_store_path = os.path.join(temp_dir, 'vector_store')
os.makedirs(vector_store_path, exist_ok=True)
embed_and_store_documents(docs, vector_store_path, id, self)
tokens = count_tokens_docs(docs)
self.update_state(state="PROGRESS", meta={"current": 100})
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False
)
raw_docs = chunker.chunk(documents=raw_docs)
if sample:
for i in range(min(5, len(raw_docs))):
logging.info(f"Sample document {i}: {raw_docs[i]}")
file_data = {
"name": name_job,
"file": filename,
"user": user,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
}
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
id = ObjectId()
embed_and_store_documents(docs, full_path, id, self)
tokens = count_tokens_docs(docs)
self.update_state(state="PROGRESS", meta={"current": 100})
upload_index(vector_store_path, file_data)
if sample:
for i in range(min(5, len(raw_docs))):
logging.info(f"Sample document {i}: {raw_docs[i]}")
file_data.update({
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
})
upload_index(full_path, file_data)
# delete local
shutil.rmtree(full_path)
except Exception as e:
logging.error(f"Error in ingest_worker: {e}", exc_info=True)
raise
return {
"directory": directory,
@@ -203,7 +239,7 @@ def remote_worker(
sync_frequency="never",
operation_mode="upload",
doc_id=None,
):
):
full_path = os.path.join(directory, user, name_job)
if not os.path.exists(full_path):
os.makedirs(full_path)
@@ -313,84 +349,79 @@ def sync_worker(self, frequency):
for key in ["total_sync_count", "sync_success", "sync_failure"]
}
def attachment_worker(self, directory, file_info, user):
def attachment_worker(self, file_info, user):
"""
Process and store a single attachment without vectorization.
Args:
self: Reference to the instance of the task.
directory (str): Base directory for storing files.
file_info (dict): Dictionary with folder and filename info.
user (str): User identifier.
Returns:
dict: Information about processed attachment.
"""
import datetime
import os
import mimetypes
from application.utils import num_tokens_from_string
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
attachments_collection = db["attachments"]
filename = file_info["filename"]
attachment_id = file_info["attachment_id"]
logging.info(f"Processing attachment: {attachment_id}/{filename}", extra={"user": user})
self.update_state(state="PROGRESS", meta={"current": 10})
file_path = os.path.join(directory, filename)
if not os.path.exists(file_path):
logging.warning(f"File not found: {file_path}", extra={"user": user})
raise FileNotFoundError(f"File not found: {file_path}")
relative_path = file_info["path"]
file_content = file_info["file_content"]
try:
reader = SimpleDirectoryReader(
input_files=[file_path]
)
documents = reader.load_data()
self.update_state(state="PROGRESS", meta={"current": 50})
if documents:
self.update_state(state="PROGRESS", meta={"current": 10})
storage_type = getattr(settings, "STORAGE_TYPE", "local")
storage = StorageCreator.create_storage(storage_type)
self.update_state(state="PROGRESS", meta={"current": 30, "status": "Processing content"})
with tempfile.NamedTemporaryFile(suffix=os.path.splitext(filename)[1]) as temp_file:
temp_file.write(file_content)
temp_file.flush()
reader = SimpleDirectoryReader(
input_files=[temp_file.name],
exclude_hidden=True,
errors="ignore"
)
documents = reader.load_data()
if not documents:
logging.warning(f"No content extracted from file: {filename}")
raise ValueError(f"Failed to extract content from file: {filename}")
content = documents[0].text
token_count = num_tokens_from_string(content)
file_path_relative = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{attachment_id}/{filename}"
mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
self.update_state(state="PROGRESS", meta={"current": 60, "status": "Saving file"})
file_obj = io.BytesIO(file_content)
metadata = storage.save_file(file_obj, relative_path)
mime_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
self.update_state(state="PROGRESS", meta={"current": 80, "status": "Storing in database"})
doc_id = ObjectId(attachment_id)
attachments_collection.insert_one({
"_id": doc_id,
"user": user,
"path": file_path_relative,
"path": relative_path,
"content": content,
"token_count": token_count,
"mime_type": mime_type,
"date": datetime.datetime.now(),
"metadata": metadata
})
logging.info(f"Stored attachment with ID: {attachment_id}",
logging.info(f"Stored attachment with ID: {attachment_id}",
extra={"user": user})
self.update_state(state="PROGRESS", meta={"current": 100})
self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"})
return {
"filename": filename,
"path": file_path_relative,
"path": relative_path,
"token_count": token_count,
"attachment_id": attachment_id,
"mime_type": mime_type
"mime_type": mime_type,
"metadata": metadata
}
else:
logging.warning("No content was extracted from the file",
extra={"user": user})
raise ValueError("No content was extracted from the file")
except Exception as e:
logging.error(f"Error processing file {filename}: {e}", extra={"user": user}, exc_info=True)
raise