From 89d5e7bee5bcb4c84b53ab60e6f63b20b8b33611 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Wed, 7 May 2025 19:15:36 +0530 Subject: [PATCH] (feat:attachment) store file in endpoint layer --- application/api/user/routes.py | 10 ++-- application/worker.py | 95 ++++++++++++++-------------------- 2 files changed, 46 insertions(+), 59 deletions(-) diff --git a/application/api/user/routes.py b/application/api/user/routes.py index e10082d3..606f6cd3 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -28,6 +28,8 @@ from application.extensions import api from application.tts.google_tts import GoogleTTS from application.utils import check_required_fields, validate_function_name from application.vectorstore.vector_creator import VectorCreator +from application.storage.storage_creator import StorageCreator +storage = StorageCreator.get_storage() mongo = MongoDB.get_client() db = mongo[settings.MONGO_DB_NAME] @@ -2978,14 +2980,14 @@ class StoreAttachment(Resource): attachment_id = ObjectId() original_filename = secure_filename(file.filename) relative_path = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{str(attachment_id)}/{original_filename}" - - file_content = file.read() - + + metadata = storage.save_file(file, relative_path) + file_info = { "filename": original_filename, "attachment_id": str(attachment_id), "path": relative_path, - "file_content": file_content, + "metadata": metadata } task = store_attachment.delay(file_info, user) diff --git a/application/worker.py b/application/worker.py index 619993c9..2f9e97f3 100755 --- a/application/worker.py +++ b/application/worker.py @@ -445,76 +445,61 @@ def attachment_worker(self, file_info, user): filename = file_info["filename"] attachment_id = file_info["attachment_id"] relative_path = file_info["path"] - file_content = file_info["file_content"] + metadata = file_info.get("metadata", {}) try: self.update_state(state="PROGRESS", meta={"current": 10}) - storage_type = getattr(settings, "STORAGE_TYPE", "local") - storage = StorageCreator.create_storage(storage_type) + storage = StorageCreator.get_storage() + self.update_state( state="PROGRESS", meta={"current": 30, "status": "Processing content"} ) - with tempfile.NamedTemporaryFile( - suffix=os.path.splitext(filename)[1] - ) as temp_file: - temp_file.write(file_content) - temp_file.flush() - reader = SimpleDirectoryReader( - input_files=[temp_file.name], exclude_hidden=True, errors="ignore" - ) - documents = reader.load_data() + content = storage.process_file( + relative_path, + lambda local_path, **kwargs: SimpleDirectoryReader( + input_files=[local_path], exclude_hidden=True, errors="ignore" + ).load_data()[0].text + ) + + token_count = num_tokens_from_string(content) - if not documents: - logging.warning(f"No content extracted from file: {filename}") - raise ValueError(f"Failed to extract content from file: {filename}") + self.update_state( + state="PROGRESS", meta={"current": 80, "status": "Storing in database"} + ) - content = documents[0].text - token_count = num_tokens_from_string(content) + mime_type = mimetypes.guess_type(filename)[0] or "application/octet-stream" - self.update_state( - state="PROGRESS", meta={"current": 60, "status": "Saving file"} - ) - file_obj = io.BytesIO(file_content) - - metadata = storage.save_file(file_obj, relative_path) - - mime_type = mimetypes.guess_type(filename)[0] or "application/octet-stream" - - self.update_state( - state="PROGRESS", meta={"current": 80, "status": "Storing in database"} - ) - - doc_id = ObjectId(attachment_id) - attachments_collection.insert_one( - { - "_id": doc_id, - "user": user, - "path": relative_path, - "content": content, - "token_count": token_count, - "mime_type": mime_type, - "date": datetime.datetime.now(), - "metadata": metadata, - } - ) - - logging.info( - f"Stored attachment with ID: {attachment_id}", extra={"user": user} - ) - - self.update_state( - state="PROGRESS", meta={"current": 100, "status": "Complete"} - ) - - return { - "filename": filename, + doc_id = ObjectId(attachment_id) + attachments_collection.insert_one( + { + "_id": doc_id, + "user": user, "path": relative_path, + "content": content, "token_count": token_count, - "attachment_id": attachment_id, "mime_type": mime_type, + "date": datetime.datetime.now(), "metadata": metadata, } + ) + + logging.info( + f"Stored attachment with ID: {attachment_id}", extra={"user": user} + ) + + self.update_state( + state="PROGRESS", meta={"current": 100, "status": "Complete"} + ) + + return { + "filename": filename, + "path": relative_path, + "token_count": token_count, + "attachment_id": attachment_id, + "mime_type": mime_type, + "metadata": metadata, + } except Exception as e: logging.error(