diff --git a/application/api/user/routes.py b/application/api/user/routes.py index d7b923bf..68936dcc 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -548,138 +548,60 @@ class UploadFile(Resource): # Create safe versions for filesystem operations safe_user = safe_filename(user) dir_name = safe_filename(job_name) + base_path = f"{settings.UPLOAD_FOLDER}/{safe_user}/{dir_name}" try: storage = StorageCreator.get_storage() - base_path = f"{settings.UPLOAD_FOLDER}/{safe_user}/{dir_name}" - - if len(files) > 1: - temp_files = [] - for file in files: - original_filename = file.filename - safe_file = safe_filename(original_filename) - temp_path = f"{base_path}/temp/{safe_file}" - storage.save_file(file, temp_path) - temp_files.append({"path": temp_path, "original_name": original_filename}) - print(f"Saved file: {original_filename}") - zip_filename = f"{dir_name}.zip" - zip_path = f"{base_path}/{zip_filename}" - zip_temp_path = None - - def create_zip_archive(temp_paths, dir_name, storage): - import tempfile - - with tempfile.NamedTemporaryFile( - delete=False, suffix=".zip" - ) as temp_zip_file: - zip_output_path = temp_zip_file.name - with tempfile.TemporaryDirectory() as stage_dir: - for path in temp_paths: - try: - file_data = storage.get_file(path) - with open( - os.path.join(stage_dir, os.path.basename(path)), - "wb", - ) as f: - f.write(file_data.read()) - except Exception as e: - current_app.logger.error( - f"Error processing file {path} for zipping: {e}", - exc_info=True, - ) - if os.path.exists(zip_output_path): - os.remove(zip_output_path) - raise - try: - shutil.make_archive( - base_name=zip_output_path.replace(".zip", ""), - format="zip", - root_dir=stage_dir, - ) - except Exception as e: - current_app.logger.error( - f"Error creating zip archive: {e}", exc_info=True - ) - if os.path.exists(zip_output_path): - os.remove(zip_output_path) - raise - return zip_output_path - - try: - zip_temp_path = create_zip_archive(temp_files, dir_name, storage) - with open(zip_temp_path, "rb") as zip_file: - storage.save_file(zip_file, zip_path) - task = ingest.delay( - settings.UPLOAD_FOLDER, - [ - ".rst", - ".md", - ".pdf", - ".txt", - ".docx", - ".csv", - ".epub", - ".html", - ".mdx", - ".json", - ".xlsx", - ".pptx", - ".png", - ".jpg", - ".jpeg", - ], - job_name, - user, - file_path=zip_path, - filename=zip_filename - ) - finally: - # Clean up temporary files - for temp_path in temp_files: - try: - storage.delete_file(temp_path) - except Exception as e: - current_app.logger.error( - f"Error deleting temporary file {temp_path}: {e}", - exc_info=True, - ) - # Clean up the zip file if it was created - if zip_temp_path and os.path.exists(zip_temp_path): - os.remove(zip_temp_path) - else: # Keep this else block for single file upload - # For single file - - file = files[0] + + + for file in files: original_filename = file.filename safe_file = safe_filename(original_filename) - file_path = f"{base_path}/{safe_file}" - - storage.save_file(file, file_path) - - task = ingest.delay( - settings.UPLOAD_FOLDER, - [ - ".rst", - ".md", - ".pdf", - ".txt", - ".docx", - ".csv", - ".epub", - ".html", - ".mdx", - ".json", - ".xlsx", - ".pptx", - ".png", - ".jpg", - ".jpeg", - ], - job_name, - user, - file_path=file_path, - filename=original_filename - ) + + with tempfile.TemporaryDirectory() as temp_dir: + temp_file_path = os.path.join(temp_dir, safe_file) + file.save(temp_file_path) + + if zipfile.is_zipfile(temp_file_path): + try: + with zipfile.ZipFile(temp_file_path, 'r') as zip_ref: + zip_ref.extractall(path=temp_dir) + + # Walk through extracted files and upload them + for root, _, files in os.walk(temp_dir): + for extracted_file in files: + if os.path.join(root, extracted_file) == temp_file_path: + continue + + rel_path = os.path.relpath(os.path.join(root, extracted_file), temp_dir) + storage_path = f"{base_path}/{rel_path}" + + with open(os.path.join(root, extracted_file), 'rb') as f: + storage.save_file(f, storage_path) + except Exception as e: + current_app.logger.error(f"Error extracting zip: {e}", exc_info=True) + # If zip extraction fails, save the original zip file + file_path = f"{base_path}/{safe_file}" + with open(temp_file_path, 'rb') as f: + storage.save_file(f, file_path) + else: + # For non-zip files, save directly + file_path = f"{base_path}/{safe_file}" + with open(temp_file_path, 'rb') as f: + storage.save_file(f, file_path) + + task = ingest.delay( + settings.UPLOAD_FOLDER, + [ + ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", + ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", + ".jpg", ".jpeg", + ], + job_name, + user, + file_path=base_path, + filename=dir_name + ) except Exception as err: current_app.logger.error(f"Error uploading file: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) @@ -831,6 +753,7 @@ class PaginatedSources(Resource): "tokens": doc.get("tokens", ""), "retriever": doc.get("retriever", "classic"), "syncFrequency": doc.get("sync_frequency", ""), + "isNested": bool(doc.get("directory_structure")) } paginated_docs.append(doc_data) response = { @@ -878,6 +801,7 @@ class CombinedJson(Resource): "tokens": index.get("tokens", ""), "retriever": index.get("retriever", "classic"), "syncFrequency": index.get("sync_frequency", ""), + "is_nested": bool(index.get("directory_structure")) } ) except Exception as err: @@ -3327,7 +3251,7 @@ class GetChunks(Resource): "id": "The document ID", "page": "Page number for pagination", "per_page": "Number of chunks per page", - "file_path": "Optional: Filter chunks by relative file path" + "path": "Optional: Filter chunks by relative file path" }, ) def get(self): @@ -3338,7 +3262,7 @@ class GetChunks(Resource): doc_id = request.args.get("id") page = int(request.args.get("page", 1)) per_page = int(request.args.get("per_page", 10)) - file_path = request.args.get("file_path") + path = request.args.get("path") if not ObjectId.is_valid(doc_id): return make_response(jsonify({"error": "Invalid doc_id"}), 400) @@ -3351,17 +3275,17 @@ class GetChunks(Resource): store = get_vector_store(doc_id) chunks = store.get_chunks() - if file_path: + if path: filtered_chunks = [] for chunk in chunks: metadata = chunk.get("metadata", {}) source = metadata.get("source", "") - if isinstance(source, str) and source.endswith(file_path): + if isinstance(source, str) and source.endswith(path): filtered_chunks.append(chunk) elif isinstance(source, list): for src in source: - if isinstance(src, str) and src.endswith(file_path): + if isinstance(src, str) and src.endswith(path): filtered_chunks.append(chunk) break chunks = filtered_chunks @@ -3378,7 +3302,7 @@ class GetChunks(Resource): "per_page": per_page, "total": total_chunks, "chunks": paginated_chunks, - "file_path": file_path if file_path else None + "path": path if path else None } ), 200, diff --git a/application/worker.py b/application/worker.py index 8cc7ac20..6e3cb1ae 100755 --- a/application/worker.py +++ b/application/worker.py @@ -221,31 +221,53 @@ def ingest_worker( storage = StorageCreator.get_storage() - logging.info(f"Ingest file: {file_path}", extra={"user": user, "job": job_name}) + logging.info(f"Ingest path: {file_path}", extra={"user": user, "job": job_name}) # Create temporary working directory with tempfile.TemporaryDirectory() as temp_dir: try: os.makedirs(temp_dir, exist_ok=True) - # Download file from storage to temp directory - temp_filename = os.path.basename(file_path) - temp_file_path = os.path.join(temp_dir, temp_filename) - - file_data = storage.get_file(file_path) + if storage.is_directory(file_path): + # Handle directory case + logging.info(f"Processing directory: {file_path}") + files_list = storage.list_files(file_path) + + for storage_file_path in files_list: + if storage.is_directory(storage_file_path): + continue + + # Create relative path structure in temp directory + rel_path = os.path.relpath(storage_file_path, file_path) + local_file_path = os.path.join(temp_dir, rel_path) + + os.makedirs(os.path.dirname(local_file_path), exist_ok=True) + + # Download file + try: + file_data = storage.get_file(storage_file_path) + with open(local_file_path, "wb") as f: + f.write(file_data.read()) + except Exception as e: + logging.error(f"Error downloading file {storage_file_path}: {e}") + continue + else: + # Handle single file case + temp_filename = os.path.basename(file_path) + temp_file_path = os.path.join(temp_dir, temp_filename) + + file_data = storage.get_file(file_path) + with open(temp_file_path, "wb") as f: + f.write(file_data.read()) - with open(temp_file_path, "wb") as f: - f.write(file_data.read()) + # Handle zip files + if temp_filename.endswith(".zip"): + logging.info(f"Extracting zip file: {temp_filename}") + extract_zip_recursive( + temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH + ) self.update_state(state="PROGRESS", meta={"current": 1}) - - # Handle zip files - if temp_filename.endswith(".zip"): - logging.info(f"Extracting zip file: {temp_filename}") - extract_zip_recursive( - temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH - ) - if sample: logging.info(f"Sample mode enabled. Using {limit} documents.")