Revert "(feat:storage) file, indexes uploads"

This reverts commit 64c42f0ddf.
This commit is contained in:
ManishMadan2882
2025-04-23 00:52:55 +05:30
parent 24c8b24b1f
commit 637d3a24a1
3 changed files with 106 additions and 142 deletions

View File

@@ -6,7 +6,7 @@ from bson.objectid import ObjectId
from application.core.mongo_db import MongoDB from application.core.mongo_db import MongoDB
from application.core.settings import settings from application.core.settings import settings
from application.storage.storage_creator import StorageCreator
mongo = MongoDB.get_client() mongo = MongoDB.get_client()
db = mongo["docsgpt"] db = mongo["docsgpt"]
conversations_collection = db["conversations"] conversations_collection = db["conversations"]
@@ -45,8 +45,7 @@ def upload_index_files():
remote_data = request.form["remote_data"] if "remote_data" in request.form else None remote_data = request.form["remote_data"] if "remote_data" in request.form else None
sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None
storage = StorageCreator.create_storage(settings.STORAGE_TYPE) save_dir = os.path.join(current_dir, "indexes", str(id))
if settings.VECTOR_STORE == "faiss": if settings.VECTOR_STORE == "faiss":
if "file_faiss" not in request.files: if "file_faiss" not in request.files:
print("No file part") print("No file part")
@@ -60,13 +59,12 @@ def upload_index_files():
file_pkl = request.files["file_pkl"] file_pkl = request.files["file_pkl"]
if file_pkl.filename == "": if file_pkl.filename == "":
return {"status": "no file name"} return {"status": "no file name"}
# saves index files
# Save index files if not os.path.exists(save_dir):
storage_path_faiss = f"indexes/{str(id)}/index.faiss" os.makedirs(save_dir)
storage_path_pkl = f"indexes/{str(id)}/index.pkl" file_faiss.save(os.path.join(save_dir, "index.faiss"))
file_pkl.save(os.path.join(save_dir, "index.pkl"))
storage.save_file(file_faiss, storage_path_faiss)
storage.save_file(file_pkl, storage_path_pkl)
existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry: if existing_entry:
@@ -84,7 +82,6 @@ def upload_index_files():
"retriever": retriever, "retriever": retriever,
"remote_data": remote_data, "remote_data": remote_data,
"sync_frequency": sync_frequency, "sync_frequency": sync_frequency,
"storage_type": settings.STORAGE_TYPE,
} }
}, },
) )
@@ -102,7 +99,6 @@ def upload_index_files():
"retriever": retriever, "retriever": retriever,
"remote_data": remote_data, "remote_data": remote_data,
"sync_frequency": sync_frequency, "sync_frequency": sync_frequency,
"storage_type": settings.STORAGE_TYPE,
} }
) )
return {"status": "ok"} return {"status": "ok"}

View File

@@ -4,7 +4,6 @@ import math
import os import os
import shutil import shutil
import uuid import uuid
import tempfile
from bson.binary import Binary, UuidRepresentation from bson.binary import Binary, UuidRepresentation
from bson.dbref import DBRef from bson.dbref import DBRef
@@ -22,7 +21,6 @@ from application.extensions import api
from application.tts.google_tts import GoogleTTS from application.tts.google_tts import GoogleTTS
from application.utils import check_required_fields, validate_function_name from application.utils import check_required_fields, validate_function_name
from application.vectorstore.vector_creator import VectorCreator from application.vectorstore.vector_creator import VectorCreator
from application.storage.storage_creator import StorageCreator
mongo = MongoDB.get_client() mongo = MongoDB.get_client()
db = mongo["docsgpt"] db = mongo["docsgpt"]
@@ -415,49 +413,53 @@ class UploadFile(Resource):
user = secure_filename(decoded_token.get("sub")) user = secure_filename(decoded_token.get("sub"))
job_name = secure_filename(request.form["name"]) job_name = secure_filename(request.form["name"])
storage = StorageCreator.get_storage()
try: try:
save_dir = os.path.join(current_dir, settings.UPLOAD_FOLDER, user, job_name)
os.makedirs(save_dir, exist_ok=True)
if len(files) > 1: if len(files) > 1:
temp_dir = tempfile.mkdtemp() temp_dir = os.path.join(save_dir, "temp")
try: os.makedirs(temp_dir, exist_ok=True)
for file in files:
filename = secure_filename(file.filename)
file.save(os.path.join(temp_dir, filename))
zip_path = os.path.join(temp_dir, f"{job_name}.zip") for file in files:
shutil.make_archive( filename = secure_filename(file.filename)
base_name=os.path.join(temp_dir, job_name), file.save(os.path.join(temp_dir, filename))
format="zip", print(f"Saved file: {filename}")
root_dir=temp_dir, zip_path = shutil.make_archive(
base_dir="." base_name=os.path.join(save_dir, job_name),
) format="zip",
root_dir=temp_dir,
final_filename = f"{job_name}.zip" )
relative_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}/{final_filename}" final_filename = os.path.basename(zip_path)
shutil.rmtree(temp_dir)
with open(zip_path, 'rb') as zip_file: task = ingest.delay(
storage.save_file(zip_file, relative_path) settings.UPLOAD_FOLDER,
[
task = ingest.delay( ".rst",
relative_path, ".md",
[ ".pdf",
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".txt",
".epub", ".html", ".mdx", ".json", ".xlsx", ".docx",
".pptx", ".png", ".jpg", ".jpeg", ".csv",
], ".epub",
job_name, ".html",
final_filename, ".mdx",
user, ".json",
) ".xlsx",
finally: ".pptx",
shutil.rmtree(temp_dir) ".png",
".jpg",
".jpeg",
],
job_name,
final_filename,
user,
)
else: else:
file = files[0] file = files[0]
final_filename = secure_filename(file.filename) final_filename = secure_filename(file.filename)
relative_path = f"{settings.UPLOAD_FOLDER}/{user}/{job_name}/{final_filename}" file_path = os.path.join(save_dir, final_filename)
file.save(file_path)
storage.save_file(file, relative_path)
task = ingest.delay( task = ingest.delay(
settings.UPLOAD_FOLDER, settings.UPLOAD_FOLDER,

View File

@@ -133,105 +133,71 @@ def ingest_worker(
limit = None limit = None
exclude = True exclude = True
sample = False sample = False
full_path = os.path.join(directory, user, name_job)
storage = StorageCreator.create_storage(settings.STORAGE_TYPE) logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job})
temp_dir = tempfile.mkdtemp() file_data = {"name": name_job, "file": filename, "user": user}
full_path = os.path.join(temp_dir, name_job)
if not os.path.exists(full_path): if not os.path.exists(full_path):
os.makedirs(full_path) os.makedirs(full_path)
download_file(urljoin(settings.API_URL, "/api/download"), file_data, os.path.join(full_path, filename))
logging.info(f"Ingest file: {directory}/{user}/{name_job}/{filename}", extra={"user": user, "job": name_job}) # check if file is .zip and extract it
file_data = {"name": name_job, "file": filename, "user": user} if filename.endswith(".zip"):
extract_zip_recursive(
try: os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
file_path = f"{directory}/{user}/{name_job}/{filename}"
try:
file_obj = storage.get_file(file_path)
local_file_path = os.path.join(full_path, filename)
with open(local_file_path, 'wb') as f:
shutil.copyfileobj(file_obj, f)
# check if file is .zip and extract it
if filename.endswith(".zip"):
extract_zip_recursive(
os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
)
except FileNotFoundError as e:
logging.error(f"File not found in storage: {file_path}")
raise FileNotFoundError(f"File not found: {file_path}") from e
self.update_state(state="PROGRESS", meta={"current": 1})
raw_docs = SimpleDirectoryReader(
input_dir=full_path,
input_files=input_files,
recursive=recursive,
required_exts=formats,
num_files_limit=limit,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
).load_data()
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False
) )
raw_docs = chunker.chunk(documents=raw_docs)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] self.update_state(state="PROGRESS", meta={"current": 1})
id = ObjectId()
vector_dir = os.path.join(temp_dir, "vector_store") raw_docs = SimpleDirectoryReader(
os.makedirs(vector_dir, exist_ok=True) input_dir=full_path,
input_files=input_files,
recursive=recursive,
required_exts=formats,
num_files_limit=limit,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
).load_data()
embed_and_store_documents(docs, vector_dir, str(id), self) chunker = Chunker(
tokens = count_tokens_docs(docs) chunking_strategy="classic_chunk",
self.update_state(state="PROGRESS", meta={"current": 100}) max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False
)
raw_docs = chunker.chunk(documents=raw_docs)
if sample: docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
for i in range(min(5, len(raw_docs))): id = ObjectId()
logging.info(f"Sample document {i}: {raw_docs[i]}")
file_data.update({ embed_and_store_documents(docs, full_path, id, self)
"tokens": tokens, tokens = count_tokens_docs(docs)
"retriever": retriever, self.update_state(state="PROGRESS", meta={"current": 100})
"id": str(id),
"type": "local",
})
mongo = MongoDB.get_client() if sample:
db = mongo["docsgpt"] for i in range(min(5, len(raw_docs))):
sources_collection = db["sources"] logging.info(f"Sample document {i}: {raw_docs[i]}")
sources_collection.insert_one({ file_data.update({
"_id": id, "tokens": tokens,
"name": name_job, "retriever": retriever,
"user": user, "id": str(id),
"date": datetime.datetime.now(), "type": "local",
"tokens": tokens, })
"retriever": retriever, upload_index(full_path, file_data)
"type": "local",
"storage_type": settings.STORAGE_TYPE,
"original_file_path": file_path
})
return { # delete local
"directory": directory, shutil.rmtree(full_path)
"formats": formats,
"name_job": name_job,
"filename": filename,
"user": user,
"limited": False,
}
finally: return {
if os.path.exists(temp_dir): "directory": directory,
shutil.rmtree(temp_dir) "formats": formats,
"name_job": name_job,
"filename": filename,
"user": user,
"limited": False,
}
def remote_worker( def remote_worker(
self, self,