diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index e95b6327..c8e32d11 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -6,7 +6,7 @@ from bson.objectid import ObjectId from application.core.mongo_db import MongoDB from application.core.settings import settings -from application.storage.storage_creator import StorageCreator + mongo = MongoDB.get_client() db = mongo["docsgpt"] conversations_collection = db["conversations"] @@ -45,8 +45,7 @@ def upload_index_files(): remote_data = request.form["remote_data"] if "remote_data" in request.form else None sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None - storage = StorageCreator.create_storage(settings.STORAGE_TYPE) - + save_dir = os.path.join(current_dir, "indexes", str(id)) if settings.VECTOR_STORE == "faiss": if "file_faiss" not in request.files: print("No file part") @@ -60,13 +59,12 @@ def upload_index_files(): file_pkl = request.files["file_pkl"] if file_pkl.filename == "": return {"status": "no file name"} - - # Save index files - storage_path_faiss = f"indexes/{str(id)}/index.faiss" - storage_path_pkl = f"indexes/{str(id)}/index.pkl" - - storage.save_file(file_faiss, storage_path_faiss) - storage.save_file(file_pkl, storage_path_pkl) + # saves index files + + if not os.path.exists(save_dir): + os.makedirs(save_dir) + file_faiss.save(os.path.join(save_dir, "index.faiss")) + file_pkl.save(os.path.join(save_dir, "index.pkl")) existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) if existing_entry: @@ -84,7 +82,6 @@ def upload_index_files(): "retriever": retriever, "remote_data": remote_data, "sync_frequency": sync_frequency, - "storage_type": settings.STORAGE_TYPE, } }, ) @@ -102,7 +99,6 @@ def upload_index_files(): "retriever": retriever, "remote_data": remote_data, "sync_frequency": sync_frequency, - "storage_type": settings.STORAGE_TYPE, } ) return {"status": "ok"} diff --git a/application/api/user/routes.py b/application/api/user/routes.py index b7d79128..9e97e2ab 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -4,7 +4,6 @@ import math import os import shutil import uuid -import tempfile from bson.binary import Binary, UuidRepresentation from bson.dbref import DBRef @@ -22,7 +21,6 @@ from application.extensions import api from application.tts.google_tts import GoogleTTS from application.utils import check_required_fields, validate_function_name from application.vectorstore.vector_creator import VectorCreator -from application.storage.storage_creator import StorageCreator mongo = MongoDB.get_client() db = mongo["docsgpt"] @@ -415,50 +413,54 @@ class UploadFile(Resource): user = secure_filename(decoded_token.get("sub")) job_name = secure_filename(request.form["name"]) - storage = StorageCreator.get_storage() - try: + save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name) + os.makedirs(save_dir, exist_ok=True) + if len(files) > 1: - temp_dir = tempfile.mkdtemp() - try: - for file in files: - filename = secure_filename(file.filename) - file.save(os.path.join(temp_dir, filename)) - - zip_path = os.path.join(temp_dir, f"{job_name}.zip") - shutil.make_archive( - base_name=os.path.join(temp_dir, job_name), - format="zip", - root_dir=temp_dir, - base_dir="." - ) - - final_filename = f"{job_name}.zip" - relative_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}/{final_filename}" - - with open(zip_path, 'rb') as zip_file: - storage.save_file(zip_file, relative_path) - - task = ingest.delay( - relative_path, - [ - ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", - ".epub", ".html", ".mdx", ".json", ".xlsx", - ".pptx", ".png", ".jpg", ".jpeg", - ], - job_name, - final_filename, - user, - ) - finally: - shutil.rmtree(temp_dir) + temp_dir = os.path.join(save_dir, "temp") + os.makedirs(temp_dir, exist_ok=True) + + for file in files: + filename = secure_filename(file.filename) + file.save(os.path.join(temp_dir, filename)) + print(f"Saved file: {filename}") + zip_path = shutil.make_archive( + base_name=os.path.join(save_dir, job_name), + format="zip", + root_dir=temp_dir, + ) + final_filename = os.path.basename(zip_path) + shutil.rmtree(temp_dir) + task = ingest.delay( + settings.UPLOAD_FOLDER, + [ + ".rst", + ".md", + ".pdf", + ".txt", + ".docx", + ".csv", + ".epub", + ".html", + ".mdx", + ".json", + ".xlsx", + ".pptx", + ".png", + ".jpg", + ".jpeg", + ], + job_name, + final_filename, + user, + ) else: file = files[0] final_filename = secure_filename(file.filename) - relative_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}/{final_filename}" - - storage.save_file(file, relative_path) - + file_path = os.path.join(save_dir, final_filename) + file.save(file_path) + task = ingest.delay( settings.UPLOAD_FOLDER, [ diff --git a/application/worker.py b/application/worker.py index f8076260..b5caa23e 100755 --- a/application/worker.py +++ b/application/worker.py @@ -133,105 +133,71 @@ def ingest_worker( limit = None exclude = True sample = False + full_path = os.path.join(directory, user, name_job) + + logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job}) + file_data = {"name": name_job, "file": filename, "user": user} - storage = StorageCreator.create_storage(settings.STORAGE_TYPE) - temp_dir = tempfile.mkdtemp() - full_path = os.path.join(temp_dir, name_job) - if not os.path.exists(full_path): os.makedirs(full_path) + download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename)) - logging.info(f"Ingest file: {directory}/{user}/{name_job}/{filename}", extra={"user": user, "job": name_job}) - file_data = {"name": name_job, "file": filename, "user": user} - - try: - file_path = f"{directory}/{user}/{name_job}/{filename}" - - try: - file_obj = storage.get_file(file_path) - - local_file_path = os.path.join(full_path, filename) - with open(local_file_path, 'wb') as f: - shutil.copyfileobj(file_obj, f) - - # check if file is .zip and extract it - if filename.endswith(".zip"): - extract_zip_recursive( - os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH - ) - except FileNotFoundError as e: - logging.error(f"File not found in storage: {file_path}") - raise FileNotFoundError(f"File not found: {file_path}") from e - - self.update_state(state="PROGRESS", meta={"current": 1}) - - raw_docs = SimpleDirectoryReader( - input_dir=full_path, - input_files=input_files, - recursive=recursive, - required_exts=formats, - num_files_limit=limit, - exclude_hidden=exclude, - file_metadata=metadata_from_filename, - ).load_data() - - chunker = Chunker( - chunking_strategy="classic_chunk", - max_tokens=MAX_TOKENS, - min_tokens=MIN_TOKENS, - duplicate_headers=False + # check if file is .zip and extract it + if filename.endswith(".zip"): + extract_zip_recursive( + os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH ) - raw_docs = chunker.chunk(documents=raw_docs) - docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - id = ObjectId() + self.update_state(state="PROGRESS", meta={"current": 1}) - vector_dir = os.path.join(temp_dir, "vector_store") - os.makedirs(vector_dir, exist_ok=True) - - embed_and_store_documents(docs, vector_dir, str(id), self) - tokens = count_tokens_docs(docs) - self.update_state(state="PROGRESS", meta={"current": 100}) + raw_docs = SimpleDirectoryReader( + input_dir=full_path, + input_files=input_files, + recursive=recursive, + required_exts=formats, + num_files_limit=limit, + exclude_hidden=exclude, + file_metadata=metadata_from_filename, + ).load_data() - if sample: - for i in range(min(5, len(raw_docs))): - logging.info(f"Sample document {i}: {raw_docs[i]}") + chunker = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False + ) + raw_docs = chunker.chunk(documents=raw_docs) - file_data.update({ - "tokens": tokens, - "retriever": retriever, - "id": str(id), - "type": "local", - }) - - mongo = MongoDB.get_client() - db = mongo["docsgpt"] - sources_collection = db["sources"] - - sources_collection.insert_one({ - "_id": id, - "name": name_job, - "user": user, - "date": datetime.datetime.now(), - "tokens": tokens, - "retriever": retriever, - "type": "local", - "storage_type": settings.STORAGE_TYPE, - "original_file_path": file_path - }) + docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] + id = ObjectId() - return { - "directory": directory, - "formats": formats, - "name_job": name_job, - "filename": filename, - "user": user, - "limited": False, - } - - finally: - if os.path.exists(temp_dir): - shutil.rmtree(temp_dir) + embed_and_store_documents(docs, full_path, id, self) + tokens = count_tokens_docs(docs) + self.update_state(state="PROGRESS", meta={"current": 100}) + + if sample: + for i in range(min(5, len(raw_docs))): + logging.info(f"Sample document {i}: {raw_docs[i]}") + + file_data.update({ + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": "local", + }) + upload_index(full_path, file_data) + + # delete local + shutil.rmtree(full_path) + + return { + "directory": directory, + "formats": formats, + "name_job": name_job, + "filename": filename, + "user": user, + "limited": False, + } def remote_worker( self,