(refactor:ingestion) pass file path once

This commit is contained in:
ManishMadan2882
2025-07-01 04:00:57 +05:30
parent 42f48649b9
commit ade704d065
4 changed files with 30 additions and 33 deletions

View File

@@ -48,7 +48,7 @@ def upload_index_files():
remote_data = request.form["remote_data"] if "remote_data" in request.form else None
sync_frequency = request.form["sync_frequency"] if "sync_frequency" in request.form else None
original_file_path = request.form.get("original_file_path")
file_path = request.form.get("file_path")
storage = StorageCreator.get_storage()
index_base_path = f"indexes/{id}"
@@ -87,7 +87,7 @@ def upload_index_files():
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": original_file_path,
"file_path": file_path,
}
},
)
@@ -105,7 +105,7 @@ def upload_index_files():
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
"file_path": original_file_path,
"file_path": file_path,
}
)
return {"status": "ok"}

View File

@@ -553,11 +553,12 @@ class UploadFile(Resource):
if len(files) > 1:
temp_files = []
for file in files:
filename = safe_filename(file.filename)
temp_path = f"{base_path}/temp/{filename}"
original_filename = file.filename
safe_file = safe_filename(original_filename)
temp_path = f"{base_path}/temp/{safe_file}"
storage.save_file(file, temp_path)
temp_files.append(temp_path)
print(f"Saved file: {filename}")
temp_files.append({"path": temp_path, "original_name": original_filename})
print(f"Saved file: {original_filename}")
zip_filename = f"{dir_name}.zip"
zip_path = f"{base_path}/{zip_filename}"
zip_temp_path = None
@@ -625,14 +626,12 @@ class UploadFile(Resource):
".jpeg",
],
job_name,
zip_filename,
user,
dir_name,
safe_user,
file_path=zip_path,
filename=zip_filename
)
finally:
# Clean up temporary files
for temp_path in temp_files:
try:
storage.delete_file(temp_path)
@@ -642,15 +641,15 @@ class UploadFile(Resource):
exc_info=True,
)
# Clean up the zip file if it was created
if zip_temp_path and os.path.exists(zip_temp_path):
os.remove(zip_temp_path)
else: # Keep this else block for single file upload
# For single file
file = files[0]
filename = safe_filename(file.filename)
file_path = f"{base_path}/{filename}"
original_filename = file.filename
safe_file = safe_filename(original_filename)
file_path = f"{base_path}/{safe_file}"
storage.save_file(file, file_path)
@@ -674,10 +673,9 @@ class UploadFile(Resource):
".jpeg",
],
job_name,
filename, # Corrected variable for single-file case
user,
dir_name,
safe_user,
file_path=file_path,
filename=original_filename
)
except Exception as err:
current_app.logger.error(f"Error uploading file: {err}", exc_info=True)

View File

@@ -11,8 +11,8 @@ from application.worker import (
@celery.task(bind=True)
def ingest(self, directory, formats, job_name, filename, user, dir_name, user_dir):
resp = ingest_worker(self, directory, formats, job_name, filename, user, dir_name, user_dir)
def ingest(self, directory, formats, job_name, user, file_path, filename):
resp = ingest_worker(self, directory, formats, job_name, file_path, filename, user)
return resp

View File

@@ -194,7 +194,8 @@ def run_agent_logic(agent_config, input_data):
# Define the main function for ingesting and processing documents.
def ingest_worker(
self, directory, formats, job_name, filename, user, dir_name=None, user_dir=None, retriever="classic"
self, directory, formats, job_name, file_path, filename, user,
retriever="classic"
):
"""
Ingest and process documents.
@@ -204,10 +205,9 @@ def ingest_worker(
directory (str): Specifies the directory for ingesting ('inputs' or 'temp').
formats (list of str): List of file extensions to consider for ingestion (e.g., [".rst", ".md"]).
job_name (str): Name of the job for this ingestion task (original, unsanitized).
filename (str): Name of the file to be ingested.
file_path (str): Complete file path to use consistently throughout the pipeline.
filename (str): Original unsanitized filename provided by the user.
user (str): Identifier for the user initiating the ingestion (original, unsanitized).
dir_name (str, optional): Sanitized directory name for filesystem operations.
user_dir (str, optional): Sanitized user ID for filesystem operations.
retriever (str): Type of retriever to use for processing the documents.
Returns:
@@ -220,11 +220,8 @@ def ingest_worker(
sample = False
storage = StorageCreator.get_storage()
full_path = os.path.join(directory, user_dir, dir_name)
source_file_path = os.path.join(full_path, filename)
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": job_name})
logging.info(f"Ingest file: {file_path}", extra={"user": user, "job": job_name})
# Create temporary working directory
with tempfile.TemporaryDirectory() as temp_dir:
@@ -232,8 +229,10 @@ def ingest_worker(
os.makedirs(temp_dir, exist_ok=True)
# Download file from storage to temp directory
temp_file_path = os.path.join(temp_dir, filename)
file_data = storage.get_file(source_file_path)
temp_filename = os.path.basename(file_path)
temp_file_path = os.path.join(temp_dir, temp_filename)
file_data = storage.get_file(file_path)
with open(temp_file_path, "wb") as f:
f.write(file_data.read())
@@ -241,8 +240,8 @@ def ingest_worker(
self.update_state(state="PROGRESS", meta={"current": 1})
# Handle zip files
if filename.endswith(".zip"):
logging.info(f"Extracting zip file: {filename}")
if temp_filename.endswith(".zip"):
logging.info(f"Extracting zip file: {temp_filename}")
extract_zip_recursive(
temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH
)
@@ -292,7 +291,7 @@ def ingest_worker(
"retriever": retriever,
"id": str(id),
"type": "local",
"original_file_path": source_file_path,
"file_path": file_path,
}
upload_index(vector_store_path, file_data)