diff --git a/application/api/user/routes.py b/application/api/user/routes.py index c6edec6f..dfe9a8aa 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -887,28 +887,33 @@ class UploadRemote(Resource): session_token = config.get("session_token") + # Process file_ids file_ids = config.get("file_ids", []) if isinstance(file_ids, str): file_ids = [id.strip() for id in file_ids.split(',') if id.strip()] elif not isinstance(file_ids, list): file_ids = [] - folder_id = config.get("folder_id", "") - if not isinstance(folder_id, str): - folder_id = str(folder_id) if folder_id else "" + + folder_ids = config.get("folder_ids", []) + if isinstance(folder_ids, str): + folder_ids = [id.strip() for id in folder_ids.split(',') if id.strip()] + elif not isinstance(folder_ids, list): + folder_ids = [] - recursive = bool(config.get("recursive", False)) + # Ensure at least one file or folder is selected + if not file_ids and not folder_ids: + return make_response(jsonify({ + "success": False, + "error": "No files or folders selected" + }), 400) - clean_config = { - "session_token": session_token, - "file_ids": file_ids, - "folder_id": folder_id, - "recursive": recursive - } + config["file_ids"] = file_ids + config["folder_ids"] = folder_ids from application.api.user.tasks import ingest_connector_task task = ingest_connector_task.delay( - source_config=clean_config, + source_config=config, job_name=data["name"], user=decoded_token.get("sub"), source_type="google_drive" diff --git a/application/parser/remote/google_drive_loader.py b/application/parser/remote/google_drive_loader.py index b2be6c4c..a5d5cc9f 100644 --- a/application/parser/remote/google_drive_loader.py +++ b/application/parser/remote/google_drive_loader.py @@ -404,37 +404,58 @@ class GoogleDriveLoader(BaseRemote): def _download_folder_recursive(self, folder_id: str, local_dir: str, recursive: bool = True) -> int: files_downloaded = 0 - query = f"'{folder_id}' in parents and trashed=false" - - page_token = None - while True: - results = self.service.files().list( - q=query, - fields='nextPageToken,files(id,name,mimeType)', - pageToken=page_token - ).execute() - - files = results.get('files', []) - - for file_metadata in files: - if file_metadata['mimeType'] == 'application/vnd.google-apps.folder': - if recursive: - subfolder_path = os.path.join(local_dir, file_metadata['name']) - os.makedirs(subfolder_path, exist_ok=True) - files_downloaded += self._download_folder_recursive( - file_metadata['id'], - subfolder_path, - recursive - ) - else: - if self._download_single_file(file_metadata['id'], local_dir): - files_downloaded += 1 - - page_token = results.get('nextPageToken') - if not page_token: - break - - return files_downloaded + try: + os.makedirs(local_dir, exist_ok=True) + + query = f"'{folder_id}' in parents and trashed=false" + page_token = None + + while True: + results = self.service.files().list( + q=query, + fields='nextPageToken, files(id, name, mimeType)', + pageToken=page_token, + pageSize=1000 + ).execute() + + items = results.get('files', []) + logging.info(f"Found {len(items)} items in folder {folder_id}") + + for item in items: + item_name = item['name'] + item_id = item['id'] + mime_type = item['mimeType'] + + if mime_type == 'application/vnd.google-apps.folder': + if recursive: + # Create subfolder and recurse + subfolder_path = os.path.join(local_dir, item_name) + os.makedirs(subfolder_path, exist_ok=True) + subfolder_files = self._download_folder_recursive( + item_id, + subfolder_path, + recursive + ) + files_downloaded += subfolder_files + logging.info(f"Downloaded {subfolder_files} files from subfolder {item_name}") + else: + # Download file + success = self._download_single_file(item_id, local_dir) + if success: + files_downloaded += 1 + logging.info(f"Downloaded file: {item_name}") + else: + logging.warning(f"Failed to download file: {item_name}") + + page_token = results.get('nextPageToken') + if not page_token: + break + + return files_downloaded + + except Exception as e: + logging.error(f"Error in _download_folder_recursive for folder {folder_id}: {e}", exc_info=True) + return files_downloaded def _get_extension_for_mime_type(self, mime_type: str) -> str: extensions = { @@ -461,14 +482,15 @@ class GoogleDriveLoader(BaseRemote): source_config = {} config = source_config if source_config else getattr(self, 'config', {}) - files_downloaded = 0 try: - folder_id = config.get('folder_id') + folder_ids = config.get('folder_ids', []) file_ids = config.get('file_ids', []) recursive = config.get('recursive', True) + self._ensure_service() + if file_ids: if isinstance(file_ids, str): file_ids = [file_ids] @@ -477,11 +499,33 @@ class GoogleDriveLoader(BaseRemote): if self._download_file_to_directory(file_id, local_dir): files_downloaded += 1 - elif folder_id: - files_downloaded = self._download_folder_contents(folder_id, local_dir, recursive) + # Process folders + if folder_ids: + if isinstance(folder_ids, str): + folder_ids = [folder_ids] - else: - raise ValueError("No folder_id or file_ids provided for download") + for folder_id in folder_ids: + try: + folder_metadata = self.service.files().get( + fileId=folder_id, + fields='name' + ).execute() + folder_name = folder_metadata.get('name', '') + folder_path = os.path.join(local_dir, folder_name) + os.makedirs(folder_path, exist_ok=True) + + folder_files = self._download_folder_recursive( + folder_id, + folder_path, + recursive + ) + files_downloaded += folder_files + logging.info(f"Downloaded {folder_files} files from folder {folder_name}") + except Exception as e: + logging.error(f"Error downloading folder {folder_id}: {e}", exc_info=True) + + if not file_ids and not folder_ids: + raise ValueError("No folder_ids or file_ids provided for download") return { "files_downloaded": files_downloaded, @@ -493,9 +537,10 @@ class GoogleDriveLoader(BaseRemote): except Exception as e: return { - "files_downloaded": 0, + "files_downloaded": files_downloaded, "directory_path": local_dir, "empty_result": True, - "error": str(e), - "source_type": "google_drive" + "source_type": "google_drive", + "config_used": config, + "error": str(e) } diff --git a/application/worker.py b/application/worker.py index a9503734..99dc2635 100755 --- a/application/worker.py +++ b/application/worker.py @@ -854,11 +854,11 @@ def ingest_connector( """ logging.info(f"Starting remote ingestion from {source_type} for user: {user}, job: {job_name}") self.update_state(state="PROGRESS", meta={"current": 1}) - + with tempfile.TemporaryDirectory() as temp_dir: try: - # Step 1: Get the appropriate remote loader - logging.info(f"source_config {source_config}") + # Step 1: Initialize the appropriate loader + self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing connector"}) if source_type == "google_drive": session_token = source_config.get("session_token") @@ -871,11 +871,10 @@ def ingest_connector( # Create a clean config for storage that excludes the session token api_source_config = { "file_ids": source_config.get("file_ids", []), - "folder_id": source_config.get("folder_id", ""), + "folder_ids": source_config.get("folder_ids", []), + "recursive": source_config.get("recursive", True) } - if source_config.get("recursive") is not None: - api_source_config["recursive"] = source_config.get("recursive") else: remote_loader = RemoteCreator.create_loader(source_type, source_config) api_source_config = source_config diff --git a/frontend/src/upload/Upload.tsx b/frontend/src/upload/Upload.tsx index f17c7c95..c2cf87ec 100644 --- a/frontend/src/upload/Upload.tsx +++ b/frontend/src/upload/Upload.tsx @@ -447,32 +447,24 @@ function Upload({ if (ingestor.type === 'google_drive') { const sessionToken = localStorage.getItem('google_drive_session_token'); + const selectedItems = googleDriveFiles.filter(file => selectedFiles.includes(file.id)); + const selectedFolderIds = selectedItems + .filter(item => item.type === 'application/vnd.google-apps.folder' || item.isFolder) + .map(folder => folder.id); + + const selectedFileIds = selectedItems + .filter(item => item.type !== 'application/vnd.google-apps.folder' && !item.isFolder) + .map(file => file.id); + configData = { - file_ids: selectedFiles, + file_ids: selectedFileIds, + folder_ids: selectedFolderIds, recursive: ingestor.config.recursive, session_token: sessionToken || null }; } else { - const defaultConfig = IngestorDefaultConfigs[ingestor.type].config; - const mergedConfig = { ...defaultConfig, ...ingestor.config }; - configData = Object.entries(mergedConfig).reduce( - (acc, [key, value]) => { - const field = IngestorFormSchemas[ingestor.type].find( - (f) => f.name === key, - ); - // Include the field if: - // 1. It's required, or - // 2. It's optional and has a non-empty value - if ( - field?.required || - (value !== undefined && value !== null && value !== '') - ) { - acc[key] = value; - } - return acc; - }, - {} as Record, - ); + + configData = { ...ingestor.config }; } formData.append('data', JSON.stringify(configData));