(feat:ingest_connectors) spread config params

This commit is contained in:
ManishMadan2882
2025-08-26 00:56:39 +05:30
parent e25b988dc8
commit 15a9e97a1e
3 changed files with 66 additions and 21 deletions

View File

@@ -839,17 +839,27 @@ def agent_webhook_worker(self, agent_id, payload):
def ingest_connector(
self, job_name: str, user: str, source_type: str,
source_config: Dict[str, Any], retriever: str = "classic"
self,
job_name: str,
user: str,
source_type: str,
session_token=None,
file_ids=None,
folder_ids=None,
recursive=True,
retriever: str = "classic"
) -> Dict[str, Any]:
"""
ingestion for internal knowledge bases(GoogleDrive).
Ingestion for internal knowledge bases (GoogleDrive, etc.).
Args:
job_name: Name of the ingestion job
user: User identifier
source_type: Type of remote source ("google_drive", "dropbox", etc.)
source_config: Configuration specific to the source type
session_token: Authentication token for the service
file_ids: List of file IDs to download
folder_ids: List of folder IDs to download
recursive: Whether to recursively download folders
retriever: Type of retriever to use
"""
logging.info(f"Starting remote ingestion from {source_type} for user: {user}, job: {job_name}")
@@ -861,31 +871,42 @@ def ingest_connector(
self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing connector"})
if source_type == "google_drive":
session_token = source_config.get("session_token")
if not session_token:
raise ValueError("Google Drive connector requires session_token in source_config")
raise ValueError("Google Drive connector requires session_token")
from application.parser.remote.google_drive_loader import GoogleDriveLoader
remote_loader = GoogleDriveLoader(session_token)
# Create a clean config for storage that excludes the session token
api_source_config = {
"file_ids": source_config.get("file_ids", []),
"folder_ids": source_config.get("folder_ids", []),
"recursive": source_config.get("recursive", True)
"file_ids": file_ids or [],
"folder_ids": folder_ids or [],
"recursive": recursive
}
# Step 2: Download files to temp directory
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"})
download_info = remote_loader.download_to_directory(
temp_dir,
{
"file_ids": file_ids or [],
"folder_ids": folder_ids or [],
"recursive": recursive
}
)
else:
# For other connectors, maintain backward compatibility
source_config = {
"session_token": session_token
}
if file_ids:
source_config["file_ids"] = file_ids
if folder_ids:
source_config["folder_ids"] = folder_ids
source_config["recursive"] = recursive
remote_loader = RemoteCreator.create_loader(source_type, source_config)
api_source_config = source_config
# Step 2: Download files to temp directory
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"})
# For Google Drive, pass the source_config to download_to_directory
if source_type == "google_drive":
download_info = remote_loader.download_to_directory(temp_dir, source_config)
else:
download_info = remote_loader.download_to_directory(temp_dir)
if download_info.get("empty_result", False) or not download_info.get("files_downloaded", 0):