From 502dc9ec52517908dd202fdddea01be86721e5a7 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Wed, 26 Mar 2025 18:01:31 +0530 Subject: [PATCH] (feat:attachments) store and ingest files shared --- application/api/user/routes.py | 73 ++++++++++++++++++++++++++- application/api/user/tasks.py | 8 ++- application/worker.py | 90 ++++++++++++++++++++++++++++++++++ 3 files changed, 169 insertions(+), 2 deletions(-) diff --git a/application/api/user/routes.py b/application/api/user/routes.py index f3599c7e..083f5f22 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -14,7 +14,7 @@ from werkzeug.utils import secure_filename from application.agents.tools.tool_manager import ToolManager -from application.api.user.tasks import ingest, ingest_remote +from application.api.user.tasks import ingest, ingest_remote, store_attachment from application.core.mongo_db import MongoDB from application.core.settings import settings from application.extensions import api @@ -2476,3 +2476,74 @@ class UpdateChunk(Resource): except Exception as e: current_app.logger.error(f"Error updating chunk: {e}") return make_response(jsonify({"success": False}), 500) + + +@user_ns.route("/api/store_attachment") +class StoreAttachment(Resource): + @api.expect( + api.model( + "AttachmentModel", + { + "file": fields.Raw(required=True, description="File to upload"), + }, + ) + ) + @api.doc(description="Stores an attachment without vectorization or training") + def post(self): + decoded_token = request.decoded_token + if not decoded_token: + return make_response(jsonify({"success": False}), 401) + + files = request.files.getlist("file") + + if not files or all(file.filename == "" for file in files): + return make_response( + jsonify({"status": "error", "message": "Missing files"}), + 400, + ) + + user = secure_filename(decoded_token.get("sub")) + saved_files = [] + + try: + for file in files: + original_filename = secure_filename(file.filename) + folder_name = original_filename + + # Create directory structure: user/attachments/filename/ + base_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, "attachments", folder_name) + os.makedirs(base_dir, exist_ok=True) + + file_path = os.path.join(base_dir, original_filename) + + # Handle filename conflicts + if os.path.exists(file_path): + name_parts = os.path.splitext(original_filename) + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + new_filename = f"{name_parts[0]}_{timestamp}{name_parts[1]}" + file_path = os.path.join(base_dir, new_filename) + original_filename = new_filename + + file.save(file_path) + saved_files.append({"folder": folder_name, "filename": original_filename}) + current_app.logger.info(f"Saved file: {file_path}") + + # Start async task to process files + task = store_attachment.delay( + os.path.abspath(os.path.join(current_dir, settings.UPLOAD_FOLDER)), + saved_files, + user + ) + + return make_response( + jsonify({ + "success": True, + "task_id": task.id, + "message": "Files uploaded successfully. Processing started." + }), + 200 + ) + + except Exception as err: + current_app.logger.error(f"Error storing attachment: {err}") + return make_response(jsonify({"success": False, "error": str(err)}), 400) \ No newline at end of file diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index 73ad716e..24cff3c6 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -1,7 +1,7 @@ from datetime import timedelta from application.celery_init import celery -from application.worker import ingest_worker, remote_worker, sync_worker +from application.worker import ingest_worker, remote_worker, sync_worker, attachment_worker @celery.task(bind=True) @@ -22,6 +22,12 @@ def schedule_syncs(self, frequency): return resp +@celery.task(bind=True) +def store_attachment(self, directory, saved_files, user): + resp = attachment_worker(self, directory, saved_files, user) + return resp + + @celery.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task( diff --git a/application/worker.py b/application/worker.py index df0bbe7d..260a1db5 100755 --- a/application/worker.py +++ b/application/worker.py @@ -312,3 +312,93 @@ def sync_worker(self, frequency): key: sync_counts[key] for key in ["total_sync_count", "sync_success", "sync_failure"] } + +def attachment_worker(self, directory, saved_files, user): + """ + Process and store attachments without vectorization. + + Args: + self: Reference to the instance of the task. + directory (str): Base directory for storing files. + saved_files (list): List of dictionaries with folder and filename info. + user (str): User identifier. + + Returns: + dict: Information about processed attachments. + """ + import datetime + import os + from application.utils import num_tokens_from_string + + mongo = MongoDB.get_client() + db = mongo["docsgpt"] + attachments_collection = db["attachments"] + + file_entries = [] + total_tokens = 0 + total_files = len(saved_files) + + job_name = saved_files[0]["folder"] if saved_files else "attachment_job" + logging.info(f"Processing attachments: {job_name}", extra={"user": user, "job": job_name}) + + for idx, file_info in enumerate(saved_files): + + progress = int(((idx + 1) / total_files) * 100) + self.update_state(state="PROGRESS", meta={"current": progress}) + + folder_name = file_info["folder"] + filename = file_info["filename"] + + base_dir = os.path.join(directory, user, "attachments", folder_name) + file_path = os.path.join(base_dir, filename) + + logging.info(f"Processing file: {file_path}", extra={"user": user, "job": job_name}) + + if not os.path.exists(file_path): + logging.warning(f"File not found: {file_path}", extra={"user": user, "job": job_name}) + continue + + try: + reader = SimpleDirectoryReader( + input_files=[file_path] + ) + + documents = reader.load_data() + + if documents: + content = documents[0].text + token_count = num_tokens_from_string(content) + total_tokens += token_count + + file_entries.append({ + "path": f"{user}/attachments/{folder_name}/{filename}", + "content": content, + "token_count": token_count + }) + logging.info(f"Successfully processed {filename} with {token_count} tokens", + extra={"user": user, "job": job_name}) + except Exception as e: + logging.error(f"Error processing file {filename}: {e}", + extra={"user": user, "job": job_name}, exc_info=True) + + if file_entries: + attachment_id = attachments_collection.insert_one({ + "user": user, + "files": file_entries, + "total_tokens": total_tokens, + "date": datetime.datetime.now(), + }).inserted_id + + logging.info(f"Stored attachment with ID: {attachment_id}", + extra={"user": user, "job": job_name}) + + return { + "attachment_id": str(attachment_id), + "files": [{"filename": fe["filename"], "folder": fe["folder"], "path": fe["path"]} for fe in file_entries], + "total_tokens": total_tokens, + "file_contents": [{"filename": fe["filename"], "token_count": fe["token_count"]} for fe in file_entries] + } + else: + logging.warning("No files were successfully processed", + extra={"user": user, "job": job_name}) + return {"error": "No files were successfully processed"}