diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 91b028d5..98af5343 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -2494,7 +2494,6 @@ class StoreAttachment(Resource): if not decoded_token: return make_response(jsonify({"success": False}), 401) - # Get single file instead of list file = request.files.get("file") if not file or file.filename == "": @@ -2508,29 +2507,18 @@ class StoreAttachment(Resource): try: attachment_id = ObjectId() original_filename = secure_filename(file.filename) + relative_path = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{str(attachment_id)}/{original_filename}" - save_dir = os.path.join( - current_dir, - settings.UPLOAD_FOLDER, - user, - "attachments", - str(attachment_id) - ) - os.makedirs(save_dir, exist_ok=True) + file_content = file.read() - file_path = os.path.join(save_dir, original_filename) - - - file.save(file_path) file_info = { "filename": original_filename, - "attachment_id": str(attachment_id) + "attachment_id": str(attachment_id), + "path": relative_path, + "file_content": file_content } - current_app.logger.info(f"Saved file: {file_path}") - # Start async task to process single file task = store_attachment.delay( - save_dir, file_info, user ) @@ -2543,7 +2531,6 @@ class StoreAttachment(Resource): }), 200 ) - except Exception as err: current_app.logger.error(f"Error storing attachment: {err}") return make_response(jsonify({"success": False, "error": str(err)}), 400) diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index 24cff3c6..c9d4d39d 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -23,8 +23,8 @@ def schedule_syncs(self, frequency): @celery.task(bind=True) -def store_attachment(self, directory, saved_files, user): - resp = attachment_worker(self, directory, saved_files, user) +def store_attachment(self, file_info, user): + resp = attachment_worker(self, file_info, user) return resp diff --git a/application/worker.py b/application/worker.py index bbd422ac..d561a53f 100755 --- a/application/worker.py +++ b/application/worker.py @@ -3,15 +3,21 @@ import os import shutil import string import zipfile +import io +import datetime +import mimetypes +import requests + from collections import Counter from urllib.parse import urljoin -import requests +from application.storage.storage_creator import StorageCreator +from application.utils import num_tokens_from_string +from application.core.settings import settings +from application.parser.file.bulk import SimpleDirectoryReader from bson.objectid import ObjectId from application.core.mongo_db import MongoDB -from application.core.settings import settings -from application.parser.file.bulk import SimpleDirectoryReader from application.parser.embedding_pipeline import embed_and_store_documents from application.parser.remote.remote_creator import RemoteCreator from application.parser.schema.base import Document @@ -313,23 +319,11 @@ def sync_worker(self, frequency): for key in ["total_sync_count", "sync_success", "sync_failure"] } -def attachment_worker(self, directory, file_info, user): + +def attachment_worker(self, file_info, user): """ Process and store a single attachment without vectorization. - - Args: - self: Reference to the instance of the task. - directory (str): Base directory for storing files. - file_info (dict): Dictionary with folder and filename info. - user (str): User identifier. - - Returns: - dict: Information about processed attachment. """ - import datetime - import os - import mimetypes - from application.utils import num_tokens_from_string mongo = MongoDB.get_client() db = mongo["docsgpt"] @@ -337,60 +331,79 @@ def attachment_worker(self, directory, file_info, user): filename = file_info["filename"] attachment_id = file_info["attachment_id"] - - logging.info(f"Processing attachment: {attachment_id}/{filename}", extra={"user": user}) - - self.update_state(state="PROGRESS", meta={"current": 10}) - - file_path = os.path.join(directory, filename) - - if not os.path.exists(file_path): - logging.warning(f"File not found: {file_path}", extra={"user": user}) - raise FileNotFoundError(f"File not found: {file_path}") + relative_path = file_info["path"] + file_content = file_info["file_content"] try: - reader = SimpleDirectoryReader( - input_files=[file_path] - ) - documents = reader.load_data() + self.update_state(state="PROGRESS", meta={"current": 10}) - self.update_state(state="PROGRESS", meta={"current": 50}) + storage_type = getattr(settings, "STORAGE_TYPE", "local") + storage = StorageCreator.create_storage(storage_type) - if documents: + self.update_state(state="PROGRESS", meta={"current": 30, "status": "Saving file"}) + file_obj = io.BytesIO(file_content) + storage.save_file(file_obj, relative_path) + + def process_document(file_path, **kwargs): + self.update_state(state="PROGRESS", meta={"current": 50, "status": "Processing content"}) + + reader = SimpleDirectoryReader( + input_files=[file_path], + exclude_hidden=True, + errors="ignore" + ) + documents = reader.load_data() + + if not documents: + logging.warning(f"No content extracted from file: {filename}") + raise ValueError(f"Failed to extract content from file: {filename}") + content = documents[0].text token_count = num_tokens_from_string(content) - file_path_relative = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{attachment_id}/{filename}" + mime_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream' - mime_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' + metadata = { + "storage_type": storage_type, + } + + if storage_type == "s3": + metadata.update({ + "bucket_name": getattr(storage, "bucket_name", "docsgpt-test-bucket"), + "uri": f"s3://{storage.bucket_name}/{relative_path}", + "region": getattr(settings, "SAGEMAKER_REGION", "us-east-1") + }) + + self.update_state(state="PROGRESS", meta={"current": 80, "status": "Storing in database"}) doc_id = ObjectId(attachment_id) attachments_collection.insert_one({ "_id": doc_id, "user": user, - "path": file_path_relative, + "path": relative_path, "content": content, "token_count": token_count, "mime_type": mime_type, "date": datetime.datetime.now(), + "metadata": metadata }) logging.info(f"Stored attachment with ID: {attachment_id}", extra={"user": user}) - self.update_state(state="PROGRESS", meta={"current": 100}) + self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"}) return { "filename": filename, - "path": file_path_relative, + "path": relative_path, "token_count": token_count, "attachment_id": attachment_id, - "mime_type": mime_type + "mime_type": mime_type, + "metadata": metadata } - else: - logging.warning("No content was extracted from the file", - extra={"user": user}) - raise ValueError("No content was extracted from the file") + + return storage.process_file(relative_path, process_document) + except Exception as e: logging.error(f"Error processing file {filename}: {e}", extra={"user": user}, exc_info=True) raise