(feat:ingestion) unzip, extract and store

This commit is contained in:
ManishMadan2882
2025-07-15 15:31:26 +05:30
parent a38d71bbfb
commit 5b07c5f2e8
2 changed files with 95 additions and 149 deletions

View File

@@ -548,138 +548,60 @@ class UploadFile(Resource):
# Create safe versions for filesystem operations
safe_user = safe_filename(user)
dir_name = safe_filename(job_name)
base_path = f"{settings.UPLOAD_FOLDER}/{safe_user}/{dir_name}"
try:
storage = StorageCreator.get_storage()
base_path = f"{settings.UPLOAD_FOLDER}/{safe_user}/{dir_name}"
if len(files) > 1:
temp_files = []
for file in files:
original_filename = file.filename
safe_file = safe_filename(original_filename)
temp_path = f"{base_path}/temp/{safe_file}"
storage.save_file(file, temp_path)
temp_files.append({"path": temp_path, "original_name": original_filename})
print(f"Saved file: {original_filename}")
zip_filename = f"{dir_name}.zip"
zip_path = f"{base_path}/{zip_filename}"
zip_temp_path = None
def create_zip_archive(temp_paths, dir_name, storage):
import tempfile
with tempfile.NamedTemporaryFile(
delete=False, suffix=".zip"
) as temp_zip_file:
zip_output_path = temp_zip_file.name
with tempfile.TemporaryDirectory() as stage_dir:
for path in temp_paths:
try:
file_data = storage.get_file(path)
with open(
os.path.join(stage_dir, os.path.basename(path)),
"wb",
) as f:
f.write(file_data.read())
except Exception as e:
current_app.logger.error(
f"Error processing file {path} for zipping: {e}",
exc_info=True,
)
if os.path.exists(zip_output_path):
os.remove(zip_output_path)
raise
try:
shutil.make_archive(
base_name=zip_output_path.replace(".zip", ""),
format="zip",
root_dir=stage_dir,
)
except Exception as e:
current_app.logger.error(
f"Error creating zip archive: {e}", exc_info=True
)
if os.path.exists(zip_output_path):
os.remove(zip_output_path)
raise
return zip_output_path
try:
zip_temp_path = create_zip_archive(temp_files, dir_name, storage)
with open(zip_temp_path, "rb") as zip_file:
storage.save_file(zip_file, zip_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst",
".md",
".pdf",
".txt",
".docx",
".csv",
".epub",
".html",
".mdx",
".json",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
],
job_name,
user,
file_path=zip_path,
filename=zip_filename
)
finally:
# Clean up temporary files
for temp_path in temp_files:
try:
storage.delete_file(temp_path)
except Exception as e:
current_app.logger.error(
f"Error deleting temporary file {temp_path}: {e}",
exc_info=True,
)
# Clean up the zip file if it was created
if zip_temp_path and os.path.exists(zip_temp_path):
os.remove(zip_temp_path)
else: # Keep this else block for single file upload
# For single file
file = files[0]
for file in files:
original_filename = file.filename
safe_file = safe_filename(original_filename)
file_path = f"{base_path}/{safe_file}"
storage.save_file(file, file_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst",
".md",
".pdf",
".txt",
".docx",
".csv",
".epub",
".html",
".mdx",
".json",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
],
job_name,
user,
file_path=file_path,
filename=original_filename
)
with tempfile.TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, safe_file)
file.save(temp_file_path)
if zipfile.is_zipfile(temp_file_path):
try:
with zipfile.ZipFile(temp_file_path, 'r') as zip_ref:
zip_ref.extractall(path=temp_dir)
# Walk through extracted files and upload them
for root, _, files in os.walk(temp_dir):
for extracted_file in files:
if os.path.join(root, extracted_file) == temp_file_path:
continue
rel_path = os.path.relpath(os.path.join(root, extracted_file), temp_dir)
storage_path = f"{base_path}/{rel_path}"
with open(os.path.join(root, extracted_file), 'rb') as f:
storage.save_file(f, storage_path)
except Exception as e:
current_app.logger.error(f"Error extracting zip: {e}", exc_info=True)
# If zip extraction fails, save the original zip file
file_path = f"{base_path}/{safe_file}"
with open(temp_file_path, 'rb') as f:
storage.save_file(f, file_path)
else:
# For non-zip files, save directly
file_path = f"{base_path}/{safe_file}"
with open(temp_file_path, 'rb') as f:
storage.save_file(f, file_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
],
job_name,
user,
file_path=base_path,
filename=dir_name
)
except Exception as err:
current_app.logger.error(f"Error uploading file: {err}", exc_info=True)
return make_response(jsonify({"success": False}), 400)
@@ -831,6 +753,7 @@ class PaginatedSources(Resource):
"tokens": doc.get("tokens", ""),
"retriever": doc.get("retriever", "classic"),
"syncFrequency": doc.get("sync_frequency", ""),
"isNested": bool(doc.get("directory_structure"))
}
paginated_docs.append(doc_data)
response = {
@@ -878,6 +801,7 @@ class CombinedJson(Resource):
"tokens": index.get("tokens", ""),
"retriever": index.get("retriever", "classic"),
"syncFrequency": index.get("sync_frequency", ""),
"is_nested": bool(index.get("directory_structure"))
}
)
except Exception as err:
@@ -3327,7 +3251,7 @@ class GetChunks(Resource):
"id": "The document ID",
"page": "Page number for pagination",
"per_page": "Number of chunks per page",
"file_path": "Optional: Filter chunks by relative file path"
"path": "Optional: Filter chunks by relative file path"
},
)
def get(self):
@@ -3338,7 +3262,7 @@ class GetChunks(Resource):
doc_id = request.args.get("id")
page = int(request.args.get("page", 1))
per_page = int(request.args.get("per_page", 10))
file_path = request.args.get("file_path")
path = request.args.get("path")
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid doc_id"}), 400)
@@ -3351,17 +3275,17 @@ class GetChunks(Resource):
store = get_vector_store(doc_id)
chunks = store.get_chunks()
if file_path:
if path:
filtered_chunks = []
for chunk in chunks:
metadata = chunk.get("metadata", {})
source = metadata.get("source", "")
if isinstance(source, str) and source.endswith(file_path):
if isinstance(source, str) and source.endswith(path):
filtered_chunks.append(chunk)
elif isinstance(source, list):
for src in source:
if isinstance(src, str) and src.endswith(file_path):
if isinstance(src, str) and src.endswith(path):
filtered_chunks.append(chunk)
break
chunks = filtered_chunks
@@ -3378,7 +3302,7 @@ class GetChunks(Resource):
"per_page": per_page,
"total": total_chunks,
"chunks": paginated_chunks,
"file_path": file_path if file_path else None
"path": path if path else None
}
),
200,

View File

@@ -221,31 +221,53 @@ def ingest_worker(
storage = StorageCreator.get_storage()
logging.info(f"Ingest file: {file_path}", extra={"user": user, "job": job_name})
logging.info(f"Ingest path: {file_path}", extra={"user": user, "job": job_name})
# Create temporary working directory
with tempfile.TemporaryDirectory() as temp_dir:
try:
os.makedirs(temp_dir, exist_ok=True)
# Download file from storage to temp directory
temp_filename = os.path.basename(file_path)
temp_file_path = os.path.join(temp_dir, temp_filename)
file_data = storage.get_file(file_path)
if storage.is_directory(file_path):
# Handle directory case
logging.info(f"Processing directory: {file_path}")
files_list = storage.list_files(file_path)
for storage_file_path in files_list:
if storage.is_directory(storage_file_path):
continue
# Create relative path structure in temp directory
rel_path = os.path.relpath(storage_file_path, file_path)
local_file_path = os.path.join(temp_dir, rel_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
# Download file
try:
file_data = storage.get_file(storage_file_path)
with open(local_file_path, "wb") as f:
f.write(file_data.read())
except Exception as e:
logging.error(f"Error downloading file {storage_file_path}: {e}")
continue
else:
# Handle single file case
temp_filename = os.path.basename(file_path)
temp_file_path = os.path.join(temp_dir, temp_filename)
file_data = storage.get_file(file_path)
with open(temp_file_path, "wb") as f:
f.write(file_data.read())
with open(temp_file_path, "wb") as f:
f.write(file_data.read())
# Handle zip files
if temp_filename.endswith(".zip"):
logging.info(f"Extracting zip file: {temp_filename}")
extract_zip_recursive(
temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH
)
self.update_state(state="PROGRESS", meta={"current": 1})
# Handle zip files
if temp_filename.endswith(".zip"):
logging.info(f"Extracting zip file: {temp_filename}")
extract_zip_recursive(
temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH
)
if sample:
logging.info(f"Sample mode enabled. Using {limit} documents.")