From 64c42f0ddf75c0e0b9cca77050278b02672927c5 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Tue, 22 Apr 2025 05:18:07 +0530 Subject: [PATCH] (feat:storage) file, indexes uploads --- application/api/internal/routes.py | 20 ++-- application/api/user/routes.py | 84 ++++++++--------- application/worker.py | 144 ++++++++++++++++++----------- 3 files changed, 142 insertions(+), 106 deletions(-) diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index c8e32d11..e95b6327 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -6,7 +6,7 @@ from bson.objectid import ObjectId from application.core.mongo_db import MongoDB from application.core.settings import settings - +from application.storage.storage_creator import StorageCreator mongo = MongoDB.get_client() db = mongo["docsgpt"] conversations_collection = db["conversations"] @@ -45,7 +45,8 @@ def upload_index_files(): remote_data = request.form["remote_data"] if "remote_data" in request.form else None sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None - save_dir = os.path.join(current_dir, "indexes", str(id)) + storage = StorageCreator.create_storage(settings.STORAGE_TYPE) + if settings.VECTOR_STORE == "faiss": if "file_faiss" not in request.files: print("No file part") @@ -59,12 +60,13 @@ def upload_index_files(): file_pkl = request.files["file_pkl"] if file_pkl.filename == "": return {"status": "no file name"} - # saves index files - - if not os.path.exists(save_dir): - os.makedirs(save_dir) - file_faiss.save(os.path.join(save_dir, "index.faiss")) - file_pkl.save(os.path.join(save_dir, "index.pkl")) + + # Save index files + storage_path_faiss = f"indexes/{str(id)}/index.faiss" + storage_path_pkl = f"indexes/{str(id)}/index.pkl" + + storage.save_file(file_faiss, storage_path_faiss) + storage.save_file(file_pkl, storage_path_pkl) existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) if existing_entry: @@ -82,6 +84,7 @@ def upload_index_files(): "retriever": retriever, "remote_data": remote_data, "sync_frequency": sync_frequency, + "storage_type": settings.STORAGE_TYPE, } }, ) @@ -99,6 +102,7 @@ def upload_index_files(): "retriever": retriever, "remote_data": remote_data, "sync_frequency": sync_frequency, + "storage_type": settings.STORAGE_TYPE, } ) return {"status": "ok"} diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 9e97e2ab..b7d79128 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -4,6 +4,7 @@ import math import os import shutil import uuid +import tempfile from bson.binary import Binary, UuidRepresentation from bson.dbref import DBRef @@ -21,6 +22,7 @@ from application.extensions import api from application.tts.google_tts import GoogleTTS from application.utils import check_required_fields, validate_function_name from application.vectorstore.vector_creator import VectorCreator +from application.storage.storage_creator import StorageCreator mongo = MongoDB.get_client() db = mongo["docsgpt"] @@ -413,54 +415,50 @@ class UploadFile(Resource): user = secure_filename(decoded_token.get("sub")) job_name = secure_filename(request.form["name"]) + storage = StorageCreator.get_storage() + try: - save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) - os.makedirs(save_dir, exist_ok=True) - if len(files) > 1: - temp_dir = os.path.join(save_dir, "temp") - os.makedirs(temp_dir, exist_ok=True) - - for file in files: - filename = secure_filename(file.filename) - file.save(os.path.join(temp_dir, filename)) - print(f"Saved file: {filename}") - zip_path = shutil.make_archive( - base_name=os.path.join(save_dir, job_name), - format="zip", - root_dir=temp_dir, - ) - final_filename = os.path.basename(zip_path) - shutil.rmtree(temp_dir) - task = ingest.delay( - settings.UPLOAD_FOLDER, - [ - ".rst", - ".md", - ".pdf", - ".txt", - ".docx", - ".csv", - ".epub", - ".html", - ".mdx", - ".json", - ".xlsx", - ".pptx", - ".png", - ".jpg", - ".jpeg", - ], - job_name, - final_filename, - user, - ) + temp_dir = tempfile.mkdtemp() + try: + for file in files: + filename = secure_filename(file.filename) + file.save(os.path.join(temp_dir, filename)) + + zip_path = os.path.join(temp_dir, f"{job_name}.zip") + shutil.make_archive( + base_name=os.path.join(temp_dir, job_name), + format="zip", + root_dir=temp_dir, + base_dir="." + ) + + final_filename = f"{job_name}.zip" + relative_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}/{final_filename}" + + with open(zip_path, 'rb') as zip_file: + storage.save_file(zip_file, relative_path) + + task = ingest.delay( + relative_path, + [ + ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", + ".epub", ".html", ".mdx", ".json", ".xlsx", + ".pptx", ".png", ".jpg", ".jpeg", + ], + job_name, + final_filename, + user, + ) + finally: + shutil.rmtree(temp_dir) else: file = files[0] final_filename = secure_filename(file.filename) - file_path = os.path.join(save_dir, final_filename) - file.save(file_path) - + relative_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}/{final_filename}" + + storage.save_file(file, relative_path) + task = ingest.delay( settings.UPLOAD_FOLDER, [ diff --git a/application/worker.py b/application/worker.py index b5caa23e..f8076260 100755 --- a/application/worker.py +++ b/application/worker.py @@ -133,71 +133,105 @@ def ingest_worker( limit = None exclude = True sample = False - full_path = os.path.join(directory, user, name_job) - - logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job}) - file_data = {"name": name_job, "file": filename, "user": user} + storage = StorageCreator.create_storage(settings.STORAGE_TYPE) + temp_dir = tempfile.mkdtemp() + full_path = os.path.join(temp_dir, name_job) + if not os.path.exists(full_path): os.makedirs(full_path) - download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename)) - # check if file is .zip and extract it - if filename.endswith(".zip"): - extract_zip_recursive( - os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH + logging.info(f"Ingest file: {directory}/{user}/{name_job}/{filename}", extra={"user": user, "job": name_job}) + file_data = {"name": name_job, "file": filename, "user": user} + + try: + file_path = f"{directory}/{user}/{name_job}/{filename}" + + try: + file_obj = storage.get_file(file_path) + + local_file_path = os.path.join(full_path, filename) + with open(local_file_path, 'wb') as f: + shutil.copyfileobj(file_obj, f) + + # check if file is .zip and extract it + if filename.endswith(".zip"): + extract_zip_recursive( + os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH + ) + except FileNotFoundError as e: + logging.error(f"File not found in storage: {file_path}") + raise FileNotFoundError(f"File not found: {file_path}") from e + + self.update_state(state="PROGRESS", meta={"current": 1}) + + raw_docs = SimpleDirectoryReader( + input_dir=full_path, + input_files=input_files, + recursive=recursive, + required_exts=formats, + num_files_limit=limit, + exclude_hidden=exclude, + file_metadata=metadata_from_filename, + ).load_data() + + chunker = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False ) + raw_docs = chunker.chunk(documents=raw_docs) - self.update_state(state="PROGRESS", meta={"current": 1}) + docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] + id = ObjectId() - raw_docs = SimpleDirectoryReader( - input_dir=full_path, - input_files=input_files, - recursive=recursive, - required_exts=formats, - num_files_limit=limit, - exclude_hidden=exclude, - file_metadata=metadata_from_filename, - ).load_data() + vector_dir = os.path.join(temp_dir, "vector_store") + os.makedirs(vector_dir, exist_ok=True) + + embed_and_store_documents(docs, vector_dir, str(id), self) + tokens = count_tokens_docs(docs) + self.update_state(state="PROGRESS", meta={"current": 100}) - chunker = Chunker( - chunking_strategy="classic_chunk", - max_tokens=MAX_TOKENS, - min_tokens=MIN_TOKENS, - duplicate_headers=False - ) - raw_docs = chunker.chunk(documents=raw_docs) + if sample: + for i in range(min(5, len(raw_docs))): + logging.info(f"Sample document {i}: {raw_docs[i]}") - docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - id = ObjectId() + file_data.update({ + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": "local", + }) + + mongo = MongoDB.get_client() + db = mongo["docsgpt"] + sources_collection = db["sources"] + + sources_collection.insert_one({ + "_id": id, + "name": name_job, + "user": user, + "date": datetime.datetime.now(), + "tokens": tokens, + "retriever": retriever, + "type": "local", + "storage_type": settings.STORAGE_TYPE, + "original_file_path": file_path + }) - embed_and_store_documents(docs, full_path, id, self) - tokens = count_tokens_docs(docs) - self.update_state(state="PROGRESS", meta={"current": 100}) - - if sample: - for i in range(min(5, len(raw_docs))): - logging.info(f"Sample document {i}: {raw_docs[i]}") - - file_data.update({ - "tokens": tokens, - "retriever": retriever, - "id": str(id), - "type": "local", - }) - upload_index(full_path, file_data) - - # delete local - shutil.rmtree(full_path) - - return { - "directory": directory, - "formats": formats, - "name_job": name_job, - "filename": filename, - "user": user, - "limited": False, - } + return { + "directory": directory, + "formats": formats, + "name_job": name_job, + "filename": filename, + "user": user, + "limited": False, + } + + finally: + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) def remote_worker( self,