mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 08:33:20 +00:00
(feat:attachments) store and ingest files shared
This commit is contained in:
@@ -14,7 +14,7 @@ from werkzeug.utils import secure_filename
|
||||
|
||||
from application.agents.tools.tool_manager import ToolManager
|
||||
|
||||
from application.api.user.tasks import ingest, ingest_remote
|
||||
from application.api.user.tasks import ingest, ingest_remote, store_attachment
|
||||
from application.core.mongo_db import MongoDB
|
||||
from application.core.settings import settings
|
||||
from application.extensions import api
|
||||
@@ -2476,3 +2476,74 @@ class UpdateChunk(Resource):
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Error updating chunk: {e}")
|
||||
return make_response(jsonify({"success": False}), 500)
|
||||
|
||||
|
||||
@user_ns.route("/api/store_attachment")
|
||||
class StoreAttachment(Resource):
|
||||
@api.expect(
|
||||
api.model(
|
||||
"AttachmentModel",
|
||||
{
|
||||
"file": fields.Raw(required=True, description="File to upload"),
|
||||
},
|
||||
)
|
||||
)
|
||||
@api.doc(description="Stores an attachment without vectorization or training")
|
||||
def post(self):
|
||||
decoded_token = request.decoded_token
|
||||
if not decoded_token:
|
||||
return make_response(jsonify({"success": False}), 401)
|
||||
|
||||
files = request.files.getlist("file")
|
||||
|
||||
if not files or all(file.filename == "" for file in files):
|
||||
return make_response(
|
||||
jsonify({"status": "error", "message": "Missing files"}),
|
||||
400,
|
||||
)
|
||||
|
||||
user = secure_filename(decoded_token.get("sub"))
|
||||
saved_files = []
|
||||
|
||||
try:
|
||||
for file in files:
|
||||
original_filename = secure_filename(file.filename)
|
||||
folder_name = original_filename
|
||||
|
||||
# Create directory structure: user/attachments/filename/
|
||||
base_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, "attachments", folder_name)
|
||||
os.makedirs(base_dir, exist_ok=True)
|
||||
|
||||
file_path = os.path.join(base_dir, original_filename)
|
||||
|
||||
# Handle filename conflicts
|
||||
if os.path.exists(file_path):
|
||||
name_parts = os.path.splitext(original_filename)
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
new_filename = f"{name_parts[0]}_{timestamp}{name_parts[1]}"
|
||||
file_path = os.path.join(base_dir, new_filename)
|
||||
original_filename = new_filename
|
||||
|
||||
file.save(file_path)
|
||||
saved_files.append({"folder": folder_name, "filename": original_filename})
|
||||
current_app.logger.info(f"Saved file: {file_path}")
|
||||
|
||||
# Start async task to process files
|
||||
task = store_attachment.delay(
|
||||
os.path.abspath(os.path.join(current_dir, settings.UPLOAD_FOLDER)),
|
||||
saved_files,
|
||||
user
|
||||
)
|
||||
|
||||
return make_response(
|
||||
jsonify({
|
||||
"success": True,
|
||||
"task_id": task.id,
|
||||
"message": "Files uploaded successfully. Processing started."
|
||||
}),
|
||||
200
|
||||
)
|
||||
|
||||
except Exception as err:
|
||||
current_app.logger.error(f"Error storing attachment: {err}")
|
||||
return make_response(jsonify({"success": False, "error": str(err)}), 400)
|
||||
@@ -1,7 +1,7 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from application.celery_init import celery
|
||||
from application.worker import ingest_worker, remote_worker, sync_worker
|
||||
from application.worker import ingest_worker, remote_worker, sync_worker, attachment_worker
|
||||
|
||||
|
||||
@celery.task(bind=True)
|
||||
@@ -22,6 +22,12 @@ def schedule_syncs(self, frequency):
|
||||
return resp
|
||||
|
||||
|
||||
@celery.task(bind=True)
|
||||
def store_attachment(self, directory, saved_files, user):
|
||||
resp = attachment_worker(self, directory, saved_files, user)
|
||||
return resp
|
||||
|
||||
|
||||
@celery.on_after_configure.connect
|
||||
def setup_periodic_tasks(sender, **kwargs):
|
||||
sender.add_periodic_task(
|
||||
|
||||
@@ -312,3 +312,93 @@ def sync_worker(self, frequency):
|
||||
key: sync_counts[key]
|
||||
for key in ["total_sync_count", "sync_success", "sync_failure"]
|
||||
}
|
||||
|
||||
def attachment_worker(self, directory, saved_files, user):
|
||||
"""
|
||||
Process and store attachments without vectorization.
|
||||
|
||||
Args:
|
||||
self: Reference to the instance of the task.
|
||||
directory (str): Base directory for storing files.
|
||||
saved_files (list): List of dictionaries with folder and filename info.
|
||||
user (str): User identifier.
|
||||
|
||||
Returns:
|
||||
dict: Information about processed attachments.
|
||||
"""
|
||||
import datetime
|
||||
import os
|
||||
from application.utils import num_tokens_from_string
|
||||
|
||||
mongo = MongoDB.get_client()
|
||||
db = mongo["docsgpt"]
|
||||
attachments_collection = db["attachments"]
|
||||
|
||||
file_entries = []
|
||||
total_tokens = 0
|
||||
total_files = len(saved_files)
|
||||
|
||||
job_name = saved_files[0]["folder"] if saved_files else "attachment_job"
|
||||
logging.info(f"Processing attachments: {job_name}", extra={"user": user, "job": job_name})
|
||||
|
||||
for idx, file_info in enumerate(saved_files):
|
||||
|
||||
progress = int(((idx + 1) / total_files) * 100)
|
||||
self.update_state(state="PROGRESS", meta={"current": progress})
|
||||
|
||||
folder_name = file_info["folder"]
|
||||
filename = file_info["filename"]
|
||||
|
||||
base_dir = os.path.join(directory, user, "attachments", folder_name)
|
||||
file_path = os.path.join(base_dir, filename)
|
||||
|
||||
logging.info(f"Processing file: {file_path}", extra={"user": user, "job": job_name})
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
logging.warning(f"File not found: {file_path}", extra={"user": user, "job": job_name})
|
||||
continue
|
||||
|
||||
try:
|
||||
reader = SimpleDirectoryReader(
|
||||
input_files=[file_path]
|
||||
)
|
||||
|
||||
documents = reader.load_data()
|
||||
|
||||
if documents:
|
||||
content = documents[0].text
|
||||
token_count = num_tokens_from_string(content)
|
||||
total_tokens += token_count
|
||||
|
||||
file_entries.append({
|
||||
"path": f"{user}/attachments/{folder_name}/{filename}",
|
||||
"content": content,
|
||||
"token_count": token_count
|
||||
})
|
||||
logging.info(f"Successfully processed {filename} with {token_count} tokens",
|
||||
extra={"user": user, "job": job_name})
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing file {filename}: {e}",
|
||||
extra={"user": user, "job": job_name}, exc_info=True)
|
||||
|
||||
if file_entries:
|
||||
attachment_id = attachments_collection.insert_one({
|
||||
"user": user,
|
||||
"files": file_entries,
|
||||
"total_tokens": total_tokens,
|
||||
"date": datetime.datetime.now(),
|
||||
}).inserted_id
|
||||
|
||||
logging.info(f"Stored attachment with ID: {attachment_id}",
|
||||
extra={"user": user, "job": job_name})
|
||||
|
||||
return {
|
||||
"attachment_id": str(attachment_id),
|
||||
"files": [{"filename": fe["filename"], "folder": fe["folder"], "path": fe["path"]} for fe in file_entries],
|
||||
"total_tokens": total_tokens,
|
||||
"file_contents": [{"filename": fe["filename"], "token_count": fe["token_count"]} for fe in file_entries]
|
||||
}
|
||||
else:
|
||||
logging.warning("No files were successfully processed",
|
||||
extra={"user": user, "job": job_name})
|
||||
return {"error": "No files were successfully processed"}
|
||||
|
||||
Reference in New Issue
Block a user