(fix:driveLoader) folder ingesting

This commit is contained in:
ManishMadan2882
2025-08-22 19:07:52 +05:30
parent b1210c4902
commit 2410bd8654
4 changed files with 119 additions and 78 deletions

View File

@@ -887,28 +887,33 @@ class UploadRemote(Resource):
session_token = config.get("session_token")
# Process file_ids
file_ids = config.get("file_ids", [])
if isinstance(file_ids, str):
file_ids = [id.strip() for id in file_ids.split(',') if id.strip()]
elif not isinstance(file_ids, list):
file_ids = []
folder_id = config.get("folder_id", "")
if not isinstance(folder_id, str):
folder_id = str(folder_id) if folder_id else ""
folder_ids = config.get("folder_ids", [])
if isinstance(folder_ids, str):
folder_ids = [id.strip() for id in folder_ids.split(',') if id.strip()]
elif not isinstance(folder_ids, list):
folder_ids = []
recursive = bool(config.get("recursive", False))
# Ensure at least one file or folder is selected
if not file_ids and not folder_ids:
return make_response(jsonify({
"success": False,
"error": "No files or folders selected"
}), 400)
clean_config = {
"session_token": session_token,
"file_ids": file_ids,
"folder_id": folder_id,
"recursive": recursive
}
config["file_ids"] = file_ids
config["folder_ids"] = folder_ids
from application.api.user.tasks import ingest_connector_task
task = ingest_connector_task.delay(
source_config=clean_config,
source_config=config,
job_name=data["name"],
user=decoded_token.get("sub"),
source_type="google_drive"

View File

@@ -404,37 +404,58 @@ class GoogleDriveLoader(BaseRemote):
def _download_folder_recursive(self, folder_id: str, local_dir: str, recursive: bool = True) -> int:
files_downloaded = 0
query = f"'{folder_id}' in parents and trashed=false"
page_token = None
while True:
results = self.service.files().list(
q=query,
fields='nextPageToken,files(id,name,mimeType)',
pageToken=page_token
).execute()
files = results.get('files', [])
for file_metadata in files:
if file_metadata['mimeType'] == 'application/vnd.google-apps.folder':
if recursive:
subfolder_path = os.path.join(local_dir, file_metadata['name'])
os.makedirs(subfolder_path, exist_ok=True)
files_downloaded += self._download_folder_recursive(
file_metadata['id'],
subfolder_path,
recursive
)
else:
if self._download_single_file(file_metadata['id'], local_dir):
files_downloaded += 1
page_token = results.get('nextPageToken')
if not page_token:
break
return files_downloaded
try:
os.makedirs(local_dir, exist_ok=True)
query = f"'{folder_id}' in parents and trashed=false"
page_token = None
while True:
results = self.service.files().list(
q=query,
fields='nextPageToken, files(id, name, mimeType)',
pageToken=page_token,
pageSize=1000
).execute()
items = results.get('files', [])
logging.info(f"Found {len(items)} items in folder {folder_id}")
for item in items:
item_name = item['name']
item_id = item['id']
mime_type = item['mimeType']
if mime_type == 'application/vnd.google-apps.folder':
if recursive:
# Create subfolder and recurse
subfolder_path = os.path.join(local_dir, item_name)
os.makedirs(subfolder_path, exist_ok=True)
subfolder_files = self._download_folder_recursive(
item_id,
subfolder_path,
recursive
)
files_downloaded += subfolder_files
logging.info(f"Downloaded {subfolder_files} files from subfolder {item_name}")
else:
# Download file
success = self._download_single_file(item_id, local_dir)
if success:
files_downloaded += 1
logging.info(f"Downloaded file: {item_name}")
else:
logging.warning(f"Failed to download file: {item_name}")
page_token = results.get('nextPageToken')
if not page_token:
break
return files_downloaded
except Exception as e:
logging.error(f"Error in _download_folder_recursive for folder {folder_id}: {e}", exc_info=True)
return files_downloaded
def _get_extension_for_mime_type(self, mime_type: str) -> str:
extensions = {
@@ -461,14 +482,15 @@ class GoogleDriveLoader(BaseRemote):
source_config = {}
config = source_config if source_config else getattr(self, 'config', {})
files_downloaded = 0
try:
folder_id = config.get('folder_id')
folder_ids = config.get('folder_ids', [])
file_ids = config.get('file_ids', [])
recursive = config.get('recursive', True)
self._ensure_service()
if file_ids:
if isinstance(file_ids, str):
file_ids = [file_ids]
@@ -477,11 +499,33 @@ class GoogleDriveLoader(BaseRemote):
if self._download_file_to_directory(file_id, local_dir):
files_downloaded += 1
elif folder_id:
files_downloaded = self._download_folder_contents(folder_id, local_dir, recursive)
# Process folders
if folder_ids:
if isinstance(folder_ids, str):
folder_ids = [folder_ids]
else:
raise ValueError("No folder_id or file_ids provided for download")
for folder_id in folder_ids:
try:
folder_metadata = self.service.files().get(
fileId=folder_id,
fields='name'
).execute()
folder_name = folder_metadata.get('name', '')
folder_path = os.path.join(local_dir, folder_name)
os.makedirs(folder_path, exist_ok=True)
folder_files = self._download_folder_recursive(
folder_id,
folder_path,
recursive
)
files_downloaded += folder_files
logging.info(f"Downloaded {folder_files} files from folder {folder_name}")
except Exception as e:
logging.error(f"Error downloading folder {folder_id}: {e}", exc_info=True)
if not file_ids and not folder_ids:
raise ValueError("No folder_ids or file_ids provided for download")
return {
"files_downloaded": files_downloaded,
@@ -493,9 +537,10 @@ class GoogleDriveLoader(BaseRemote):
except Exception as e:
return {
"files_downloaded": 0,
"files_downloaded": files_downloaded,
"directory_path": local_dir,
"empty_result": True,
"error": str(e),
"source_type": "google_drive"
"source_type": "google_drive",
"config_used": config,
"error": str(e)
}

View File

@@ -854,11 +854,11 @@ def ingest_connector(
"""
logging.info(f"Starting remote ingestion from {source_type} for user: {user}, job: {job_name}")
self.update_state(state="PROGRESS", meta={"current": 1})
with tempfile.TemporaryDirectory() as temp_dir:
try:
# Step 1: Get the appropriate remote loader
logging.info(f"source_config {source_config}")
# Step 1: Initialize the appropriate loader
self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing connector"})
if source_type == "google_drive":
session_token = source_config.get("session_token")
@@ -871,11 +871,10 @@ def ingest_connector(
# Create a clean config for storage that excludes the session token
api_source_config = {
"file_ids": source_config.get("file_ids", []),
"folder_id": source_config.get("folder_id", ""),
"folder_ids": source_config.get("folder_ids", []),
"recursive": source_config.get("recursive", True)
}
if source_config.get("recursive") is not None:
api_source_config["recursive"] = source_config.get("recursive")
else:
remote_loader = RemoteCreator.create_loader(source_type, source_config)
api_source_config = source_config

View File

@@ -447,32 +447,24 @@ function Upload({
if (ingestor.type === 'google_drive') {
const sessionToken = localStorage.getItem('google_drive_session_token');
const selectedItems = googleDriveFiles.filter(file => selectedFiles.includes(file.id));
const selectedFolderIds = selectedItems
.filter(item => item.type === 'application/vnd.google-apps.folder' || item.isFolder)
.map(folder => folder.id);
const selectedFileIds = selectedItems
.filter(item => item.type !== 'application/vnd.google-apps.folder' && !item.isFolder)
.map(file => file.id);
configData = {
file_ids: selectedFiles,
file_ids: selectedFileIds,
folder_ids: selectedFolderIds,
recursive: ingestor.config.recursive,
session_token: sessionToken || null
};
} else {
const defaultConfig = IngestorDefaultConfigs[ingestor.type].config;
const mergedConfig = { ...defaultConfig, ...ingestor.config };
configData = Object.entries(mergedConfig).reduce(
(acc, [key, value]) => {
const field = IngestorFormSchemas[ingestor.type].find(
(f) => f.name === key,
);
// Include the field if:
// 1. It's required, or
// 2. It's optional and has a non-empty value
if (
field?.required ||
(value !== undefined && value !== null && value !== '')
) {
acc[key] = value;
}
return acc;
},
{} as Record<string, any>,
);
configData = { ...ingestor.config };
}
formData.append('data', JSON.stringify(configData));