From ade704d065bae686b3e18c826a1bed144625ac0c Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Tue, 1 Jul 2025 04:00:57 +0530 Subject: [PATCH] (refactor:ingestion) pass file path once --- application/api/internal/routes.py | 6 +++--- application/api/user/routes.py | 26 ++++++++++++-------------- application/api/user/tasks.py | 4 ++-- application/worker.py | 27 +++++++++++++-------------- 4 files changed, 30 insertions(+), 33 deletions(-) diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index b4422e26..20ce31c7 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -48,7 +48,7 @@ def upload_index_files(): remote_data = request.form["remote_data"] if "remote_data" in request.form else None sync_frequency = request.form["sync_frequency"] if "sync_frequency" in request.form else None - original_file_path = request.form.get("original_file_path") + file_path = request.form.get("file_path") storage = StorageCreator.get_storage() index_base_path = f"indexes/{id}" @@ -87,7 +87,7 @@ def upload_index_files(): "retriever": retriever, "remote_data": remote_data, "sync_frequency": sync_frequency, - "file_path": original_file_path, + "file_path": file_path, } }, ) @@ -105,7 +105,7 @@ def upload_index_files(): "retriever": retriever, "remote_data": remote_data, "sync_frequency": sync_frequency, - "file_path": original_file_path, + "file_path": file_path, } ) return {"status": "ok"} diff --git a/application/api/user/routes.py b/application/api/user/routes.py index c2f89761..fc052421 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -553,11 +553,12 @@ class UploadFile(Resource): if len(files) > 1: temp_files = [] for file in files: - filename = safe_filename(file.filename) - temp_path = f"{base_path}/temp/{filename}" + original_filename = file.filename + safe_file = safe_filename(original_filename) + temp_path = f"{base_path}/temp/{safe_file}" storage.save_file(file, temp_path) - temp_files.append(temp_path) - print(f"Saved file: {filename}") + temp_files.append({"path": temp_path, "original_name": original_filename}) + print(f"Saved file: {original_filename}") zip_filename = f"{dir_name}.zip" zip_path = f"{base_path}/{zip_filename}" zip_temp_path = None @@ -625,14 +626,12 @@ class UploadFile(Resource): ".jpeg", ], job_name, - zip_filename, user, - dir_name, - safe_user, + file_path=zip_path, + filename=zip_filename ) finally: # Clean up temporary files - for temp_path in temp_files: try: storage.delete_file(temp_path) @@ -642,15 +641,15 @@ class UploadFile(Resource): exc_info=True, ) # Clean up the zip file if it was created - if zip_temp_path and os.path.exists(zip_temp_path): os.remove(zip_temp_path) else: # Keep this else block for single file upload # For single file file = files[0] - filename = safe_filename(file.filename) - file_path = f"{base_path}/{filename}" + original_filename = file.filename + safe_file = safe_filename(original_filename) + file_path = f"{base_path}/{safe_file}" storage.save_file(file, file_path) @@ -674,10 +673,9 @@ class UploadFile(Resource): ".jpeg", ], job_name, - filename, # Corrected variable for single-file case user, - dir_name, - safe_user, + file_path=file_path, + filename=original_filename ) except Exception as err: current_app.logger.error(f"Error uploading file: {err}", exc_info=True) diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index c7003ef3..aa40f37b 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -11,8 +11,8 @@ from application.worker import ( @celery.task(bind=True) -def ingest(self, directory, formats, job_name, filename, user, dir_name, user_dir): - resp = ingest_worker(self, directory, formats, job_name, filename, user, dir_name, user_dir) +def ingest(self, directory, formats, job_name, user, file_path, filename): + resp = ingest_worker(self, directory, formats, job_name, file_path, filename, user) return resp diff --git a/application/worker.py b/application/worker.py index c6178931..e685b371 100755 --- a/application/worker.py +++ b/application/worker.py @@ -194,7 +194,8 @@ def run_agent_logic(agent_config, input_data): # Define the main function for ingesting and processing documents. def ingest_worker( - self, directory, formats, job_name, filename, user, dir_name=None, user_dir=None, retriever="classic" + self, directory, formats, job_name, file_path, filename, user, + retriever="classic" ): """ Ingest and process documents. @@ -204,10 +205,9 @@ def ingest_worker( directory (str): Specifies the directory for ingesting ('inputs' or 'temp'). formats (list of str): List of file extensions to consider for ingestion (e.g., [".rst", ".md"]). job_name (str): Name of the job for this ingestion task (original, unsanitized). - filename (str): Name of the file to be ingested. + file_path (str): Complete file path to use consistently throughout the pipeline. + filename (str): Original unsanitized filename provided by the user. user (str): Identifier for the user initiating the ingestion (original, unsanitized). - dir_name (str, optional): Sanitized directory name for filesystem operations. - user_dir (str, optional): Sanitized user ID for filesystem operations. retriever (str): Type of retriever to use for processing the documents. Returns: @@ -220,11 +220,8 @@ def ingest_worker( sample = False storage = StorageCreator.get_storage() - - full_path = os.path.join(directory, user_dir, dir_name) - source_file_path = os.path.join(full_path, filename) - - logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": job_name}) + + logging.info(f"Ingest file: {file_path}", extra={"user": user, "job": job_name}) # Create temporary working directory with tempfile.TemporaryDirectory() as temp_dir: @@ -232,8 +229,10 @@ def ingest_worker( os.makedirs(temp_dir, exist_ok=True) # Download file from storage to temp directory - temp_file_path = os.path.join(temp_dir, filename) - file_data = storage.get_file(source_file_path) + temp_filename = os.path.basename(file_path) + temp_file_path = os.path.join(temp_dir, temp_filename) + + file_data = storage.get_file(file_path) with open(temp_file_path, "wb") as f: f.write(file_data.read()) @@ -241,8 +240,8 @@ def ingest_worker( self.update_state(state="PROGRESS", meta={"current": 1}) # Handle zip files - if filename.endswith(".zip"): - logging.info(f"Extracting zip file: {filename}") + if temp_filename.endswith(".zip"): + logging.info(f"Extracting zip file: {temp_filename}") extract_zip_recursive( temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH ) @@ -292,7 +291,7 @@ def ingest_worker( "retriever": retriever, "id": str(id), "type": "local", - "original_file_path": source_file_path, + "file_path": file_path, } upload_index(vector_store_path, file_data)