(feat:fs_abstract) attachment uploads

This commit is contained in:
ManishMadan2882
2025-04-17 02:35:45 +05:30
parent 377e33c148
commit 0a0e16547e
3 changed files with 63 additions and 63 deletions

View File

@@ -2494,7 +2494,6 @@ class StoreAttachment(Resource):
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
# Get single file instead of list
file = request.files.get("file")
if not file or file.filename == "":
@@ -2508,29 +2507,18 @@ class StoreAttachment(Resource):
try:
attachment_id = ObjectId()
original_filename = secure_filename(file.filename)
relative_path = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{str(attachment_id)}/{original_filename}"
save_dir = os.path.join(
current_dir,
settings.UPLOAD_FOLDER,
user,
"attachments",
str(attachment_id)
)
os.makedirs(save_dir, exist_ok=True)
file_content = file.read()
file_path = os.path.join(save_dir, original_filename)
file.save(file_path)
file_info = {
"filename": original_filename,
"attachment_id": str(attachment_id)
"attachment_id": str(attachment_id),
"path": relative_path,
"file_content": file_content
}
current_app.logger.info(f"Saved file: {file_path}")
# Start async task to process single file
task = store_attachment.delay(
save_dir,
file_info,
user
)
@@ -2543,7 +2531,6 @@ class StoreAttachment(Resource):
}),
200
)
except Exception as err:
current_app.logger.error(f"Error storing attachment: {err}")
return make_response(jsonify({"success": False, "error": str(err)}), 400)

View File

@@ -23,8 +23,8 @@ def schedule_syncs(self, frequency):
@celery.task(bind=True)
def store_attachment(self, directory, saved_files, user):
resp = attachment_worker(self, directory, saved_files, user)
def store_attachment(self, file_info, user):
resp = attachment_worker(self, file_info, user)
return resp

View File

@@ -3,15 +3,21 @@ import os
import shutil
import string
import zipfile
import io
import datetime
import mimetypes
import requests
from collections import Counter
from urllib.parse import urljoin
import requests
from application.storage.storage_creator import StorageCreator
from application.utils import num_tokens_from_string
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from bson.objectid import ObjectId
from application.core.mongo_db import MongoDB
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.embedding_pipeline import embed_and_store_documents
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.schema.base import Document
@@ -313,23 +319,11 @@ def sync_worker(self, frequency):
for key in ["total_sync_count", "sync_success", "sync_failure"]
}
def attachment_worker(self, directory, file_info, user):
def attachment_worker(self, file_info, user):
"""
Process and store a single attachment without vectorization.
Args:
self: Reference to the instance of the task.
directory (str): Base directory for storing files.
file_info (dict): Dictionary with folder and filename info.
user (str): User identifier.
Returns:
dict: Information about processed attachment.
"""
import datetime
import os
import mimetypes
from application.utils import num_tokens_from_string
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
@@ -337,60 +331,79 @@ def attachment_worker(self, directory, file_info, user):
filename = file_info["filename"]
attachment_id = file_info["attachment_id"]
logging.info(f"Processing attachment: {attachment_id}/{filename}", extra={"user": user})
self.update_state(state="PROGRESS", meta={"current": 10})
file_path = os.path.join(directory, filename)
if not os.path.exists(file_path):
logging.warning(f"File not found: {file_path}", extra={"user": user})
raise FileNotFoundError(f"File not found: {file_path}")
relative_path = file_info["path"]
file_content = file_info["file_content"]
try:
reader = SimpleDirectoryReader(
input_files=[file_path]
)
documents = reader.load_data()
self.update_state(state="PROGRESS", meta={"current": 10})
self.update_state(state="PROGRESS", meta={"current": 50})
storage_type = getattr(settings, "STORAGE_TYPE", "local")
storage = StorageCreator.create_storage(storage_type)
if documents:
self.update_state(state="PROGRESS", meta={"current": 30, "status": "Saving file"})
file_obj = io.BytesIO(file_content)
storage.save_file(file_obj, relative_path)
def process_document(file_path, **kwargs):
self.update_state(state="PROGRESS", meta={"current": 50, "status": "Processing content"})
reader = SimpleDirectoryReader(
input_files=[file_path],
exclude_hidden=True,
errors="ignore"
)
documents = reader.load_data()
if not documents:
logging.warning(f"No content extracted from file: {filename}")
raise ValueError(f"Failed to extract content from file: {filename}")
content = documents[0].text
token_count = num_tokens_from_string(content)
file_path_relative = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{attachment_id}/{filename}"
mime_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
metadata = {
"storage_type": storage_type,
}
if storage_type == "s3":
metadata.update({
"bucket_name": getattr(storage, "bucket_name", "docsgpt-test-bucket"),
"uri": f"s3://{storage.bucket_name}/{relative_path}",
"region": getattr(settings, "SAGEMAKER_REGION", "us-east-1")
})
self.update_state(state="PROGRESS", meta={"current": 80, "status": "Storing in database"})
doc_id = ObjectId(attachment_id)
attachments_collection.insert_one({
"_id": doc_id,
"user": user,
"path": file_path_relative,
"path": relative_path,
"content": content,
"token_count": token_count,
"mime_type": mime_type,
"date": datetime.datetime.now(),
"metadata": metadata
})
logging.info(f"Stored attachment with ID: {attachment_id}",
extra={"user": user})
self.update_state(state="PROGRESS", meta={"current": 100})
self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"})
return {
"filename": filename,
"path": file_path_relative,
"path": relative_path,
"token_count": token_count,
"attachment_id": attachment_id,
"mime_type": mime_type
"mime_type": mime_type,
"metadata": metadata
}
else:
logging.warning("No content was extracted from the file",
extra={"user": user})
raise ValueError("No content was extracted from the file")
return storage.process_file(relative_path, process_document)
except Exception as e:
logging.error(f"Error processing file {filename}: {e}", extra={"user": user}, exc_info=True)
raise