diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 2a7e9119..ae696952 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -913,10 +913,14 @@ class UploadRemote(Resource): from application.api.user.tasks import ingest_connector_task task = ingest_connector_task.delay( - source_config=config, job_name=data["name"], user=decoded_token.get("sub"), - source_type="google_drive" + source_type="google_drive", + session_token=session_token, + file_ids=file_ids, + folder_ids=folder_ids, + recursive=config.get("recursive", False), + retriever=config.get("retriever", "classic") ) return make_response(jsonify({"success": True, "task_id": task.id}), 200) task = ingest_remote.delay( diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index bfed7f5a..833edbff 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -48,9 +48,29 @@ def process_agent_webhook(self, agent_id, payload): @celery.task(bind=True) -def ingest_connector_task(self, source_config, job_name, user, source_type, retriever="classic"): +def ingest_connector_task( + self, + job_name, + user, + source_type, + session_token=None, + file_ids=None, + folder_ids=None, + recursive=True, + retriever="classic" +): from application.worker import ingest_connector - resp = ingest_connector(self, job_name, user, source_type, source_config, retriever) + resp = ingest_connector( + self, + job_name, + user, + source_type, + session_token=session_token, + file_ids=file_ids, + folder_ids=folder_ids, + recursive=recursive, + retriever=retriever + ) return resp diff --git a/application/worker.py b/application/worker.py index 99dc2635..fe386a2d 100755 --- a/application/worker.py +++ b/application/worker.py @@ -839,17 +839,27 @@ def agent_webhook_worker(self, agent_id, payload): def ingest_connector( - self, job_name: str, user: str, source_type: str, - source_config: Dict[str, Any], retriever: str = "classic" + self, + job_name: str, + user: str, + source_type: str, + session_token=None, + file_ids=None, + folder_ids=None, + recursive=True, + retriever: str = "classic" ) -> Dict[str, Any]: """ - ingestion for internal knowledge bases(GoogleDrive). + Ingestion for internal knowledge bases (GoogleDrive, etc.). Args: job_name: Name of the ingestion job user: User identifier source_type: Type of remote source ("google_drive", "dropbox", etc.) - source_config: Configuration specific to the source type + session_token: Authentication token for the service + file_ids: List of file IDs to download + folder_ids: List of folder IDs to download + recursive: Whether to recursively download folders retriever: Type of retriever to use """ logging.info(f"Starting remote ingestion from {source_type} for user: {user}, job: {job_name}") @@ -861,31 +871,42 @@ def ingest_connector( self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing connector"}) if source_type == "google_drive": - session_token = source_config.get("session_token") if not session_token: - raise ValueError("Google Drive connector requires session_token in source_config") + raise ValueError("Google Drive connector requires session_token") from application.parser.remote.google_drive_loader import GoogleDriveLoader remote_loader = GoogleDriveLoader(session_token) # Create a clean config for storage that excludes the session token api_source_config = { - "file_ids": source_config.get("file_ids", []), - "folder_ids": source_config.get("folder_ids", []), - "recursive": source_config.get("recursive", True) + "file_ids": file_ids or [], + "folder_ids": folder_ids or [], + "recursive": recursive } + # Step 2: Download files to temp directory + self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"}) + download_info = remote_loader.download_to_directory( + temp_dir, + { + "file_ids": file_ids or [], + "folder_ids": folder_ids or [], + "recursive": recursive + } + ) else: + # For other connectors, maintain backward compatibility + source_config = { + "session_token": session_token + } + if file_ids: + source_config["file_ids"] = file_ids + if folder_ids: + source_config["folder_ids"] = folder_ids + source_config["recursive"] = recursive + remote_loader = RemoteCreator.create_loader(source_type, source_config) api_source_config = source_config - - # Step 2: Download files to temp directory - self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"}) - - # For Google Drive, pass the source_config to download_to_directory - if source_type == "google_drive": - download_info = remote_loader.download_to_directory(temp_dir, source_config) - else: download_info = remote_loader.download_to_directory(temp_dir) if download_info.get("empty_result", False) or not download_info.get("files_downloaded", 0):