diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index da87655b..89531703 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -77,10 +77,13 @@ def upload_index_files(): file_pkl = request.files["file_pkl"] if file_pkl.filename == "": return {"status": "no file name"} - + # Save index files to storage - storage.save_file(file_faiss, f"{index_base_path}/index.faiss") - storage.save_file(file_pkl, f"{index_base_path}/index.pkl") + faiss_storage_path = f"{index_base_path}/index.faiss" + pkl_storage_path = f"{index_base_path}/index.pkl" + storage.save_file(file_faiss, faiss_storage_path) + storage.save_file(file_pkl, pkl_storage_path) + existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) if existing_entry: diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 9e7b8a6e..d8a19913 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -634,15 +634,16 @@ class ManageSourceFiles(Resource): "ManageSourceFilesModel", { "source_id": fields.String(required=True, description="Source ID to modify"), - "operation": fields.String(required=True, description="Operation: 'add' or 'remove'"), + "operation": fields.String(required=True, description="Operation: 'add', 'remove', or 'remove_directory'"), "file_paths": fields.List(fields.String, required=False, description="File paths to remove (for remove operation)"), + "directory_path": fields.String(required=False, description="Directory path to remove (for remove_directory operation)"), "file": fields.Raw(required=False, description="Files to add (for add operation)"), "parent_dir": fields.String(required=False, description="Parent directory path relative to source root"), }, ) ) @api.doc( - description="Add or remove files from an existing source", + description="Add files, remove files, or remove directories from an existing source", ) def post(self): decoded_token = request.decoded_token @@ -658,9 +659,9 @@ class ManageSourceFiles(Resource): jsonify({"success": False, "message": "source_id and operation are required"}), 400 ) - if operation not in ["add", "remove"]: + if operation not in ["add", "remove", "remove_directory"]: return make_response( - jsonify({"success": False, "message": "operation must be 'add' or 'remove'"}), 400 + jsonify({"success": False, "message": "operation must be 'add', 'remove', or 'remove_directory'"}), 400 ) try: @@ -761,8 +762,78 @@ class ManageSourceFiles(Resource): "reingest_task_id": task.id }), 200) + elif operation == "remove_directory": + directory_path = request.form.get("directory_path") + if not directory_path: + return make_response( + jsonify({"success": False, "message": "directory_path required for remove_directory operation"}), 400 + ) + + # Validate directory path (prevent path traversal) + if directory_path.startswith("/") or ".." in directory_path: + current_app.logger.warning( + f"Invalid directory path attempted for removal. " + f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}" + ) + return make_response( + jsonify({"success": False, "message": "Invalid directory path"}), 400 + ) + + full_directory_path = f"{source_file_path}/{directory_path}" if directory_path else source_file_path + + if not storage.is_directory(full_directory_path): + current_app.logger.warning( + f"Directory not found or is not a directory for removal. " + f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, " + f"Full path: {full_directory_path}" + ) + return make_response( + jsonify({"success": False, "message": "Directory not found or is not a directory"}), 404 + ) + + success = storage.remove_directory(full_directory_path) + + if not success: + current_app.logger.error( + f"Failed to remove directory from storage. " + f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, " + f"Full path: {full_directory_path}" + ) + return make_response( + jsonify({"success": False, "message": "Failed to remove directory"}), 500 + ) + + current_app.logger.info( + f"Successfully removed directory. " + f"User: {user}, Source ID: {source_id}, Directory path: {directory_path}, " + f"Full path: {full_directory_path}" + ) + + # Trigger re-ingestion pipeline + from application.api.user.tasks import reingest_source_task + + task = reingest_source_task.delay(source_id=source_id, user=user) + + return make_response(jsonify({ + "success": True, + "message": f"Successfully removed directory: {directory_path}", + "removed_directory": directory_path, + "reingest_task_id": task.id + }), 200) + except Exception as err: - current_app.logger.error(f"Error managing source files: {err}", exc_info=True) + error_context = f"operation={operation}, user={user}, source_id={source_id}" + if operation == "remove_directory": + directory_path = request.form.get("directory_path", "") + error_context += f", directory_path={directory_path}" + elif operation == "remove": + file_paths_str = request.form.get("file_paths", "") + error_context += f", file_paths={file_paths_str}" + elif operation == "add": + parent_dir = request.form.get("parent_dir", "") + error_context += f", parent_dir={parent_dir}" + + current_app.logger.error(f"Error managing source files: {err} ({error_context})", exc_info=True) return make_response(jsonify({"success": False, "message": "Operation failed"}), 500) @@ -3438,15 +3509,22 @@ class GetChunks(Resource): filtered_chunks = [] for chunk in chunks: metadata = chunk.get("metadata", {}) - - + + # Filter by path if provided + if path: + chunk_source = metadata.get("source", "") + # Check if the chunk's source matches the requested path + if not chunk_source or not chunk_source.endswith(path): + continue + + # Filter by search term if provided if search_term: text_match = search_term in chunk.get("text", "").lower() title_match = search_term in metadata.get("title", "").lower() - + if not (text_match or title_match): continue - + filtered_chunks.append(chunk) chunks = filtered_chunks diff --git a/application/worker.py b/application/worker.py index 12357c72..fe09bd9d 100755 --- a/application/worker.py +++ b/application/worker.py @@ -98,11 +98,23 @@ def download_file(url, params, dest_path): def upload_index(full_path, file_data): + files = None try: if settings.VECTOR_STORE == "faiss": + faiss_path = full_path + "/index.faiss" + pkl_path = full_path + "/index.pkl" + + if not os.path.exists(faiss_path): + logging.error(f"FAISS index file not found: {faiss_path}") + raise FileNotFoundError(f"FAISS index file not found: {faiss_path}") + + if not os.path.exists(pkl_path): + logging.error(f"FAISS pickle file not found: {pkl_path}") + raise FileNotFoundError(f"FAISS pickle file not found: {pkl_path}") + files = { - "file_faiss": open(full_path + "/index.faiss", "rb"), - "file_pkl": open(full_path + "/index.pkl", "rb"), + "file_faiss": open(faiss_path, "rb"), + "file_pkl": open(pkl_path, "rb"), } response = requests.post( urljoin(settings.API_URL, "/api/upload_index"), @@ -114,11 +126,11 @@ def upload_index(full_path, file_data): urljoin(settings.API_URL, "/api/upload_index"), data=file_data ) response.raise_for_status() - except requests.RequestException as e: + except (requests.RequestException, FileNotFoundError) as e: logging.error(f"Error uploading index: {e}") raise finally: - if settings.VECTOR_STORE == "faiss": + if settings.VECTOR_STORE == "faiss" and files is not None: for file in files.values(): file.close() @@ -552,7 +564,7 @@ def reingest_source_worker(self, source_id, user): {"_id": ObjectId(source_id)}, { "$set": { - "directory_structure": json.dumps(directory_structure), + "directory_structure": directory_structure, "date": datetime.datetime.now(), "tokens": total_tokens }