diff --git a/application/parser/connectors/google_drive/loader.py b/application/parser/connectors/google_drive/loader.py index 06737748..22d6acc9 100644 --- a/application/parser/connectors/google_drive/loader.py +++ b/application/parser/connectors/google_drive/loader.py @@ -146,115 +146,7 @@ class GoogleDriveLoader(BaseConnectorLoader): logging.error(f"Error loading data from Google Drive: {e}", exc_info=True) raise - def scan_drive_contents(self, file_ids: List[str], folder_ids: List[str], - modified_after: str = "2024-01-01T00:00:00Z") -> Dict[str, Any]: - """ - Scan Google Drive contents and check for files/folders modified after a specific date. - Args: - file_ids: List of specific file IDs to check - folder_ids: List of folder IDs to scan for modified contents - modified_after: ISO 8601 formatted date string (default: "2024-01-01T00:00:00Z") - - Returns: - Dictionary containing: - - 'modified_files': List of file IDs that were modified after the given date - - 'modified_folders': List of folder IDs that were modified after the given date - - 'scan_summary': Summary of the scan results - """ - self._ensure_service() - - modified_files = [] - modified_folders = [] - - try: - for file_id in file_ids: - try: - file_metadata = self.service.files().get( - fileId=file_id, - fields='id,name,modifiedTime,mimeType' - ).execute() - - modified_time = file_metadata.get('modifiedTime', '') - if modified_time > modified_after: - modified_files.append({ - 'id': file_id, - 'name': file_metadata.get('name', 'Unknown'), - 'modifiedTime': modified_time, - 'mimeType': file_metadata.get('mimeType', '') - }) - - except Exception as e: - logging.warning(f"Error checking file {file_id}: {e}") - continue - - for folder_id in folder_ids: - try: - folder_metadata = self.service.files().get( - fileId=folder_id, - fields='id,name,modifiedTime,mimeType' - ).execute() - - folder_modified_time = folder_metadata.get('modifiedTime', '') - if folder_modified_time > modified_after: - modified_folders.append({ - 'id': folder_id, - 'name': folder_metadata.get('name', 'Unknown'), - 'modifiedTime': folder_modified_time, - 'mimeType': folder_metadata.get('mimeType', '') - }) - - query = f"'{folder_id}' in parents and modifiedTime > '{modified_after}'" - - page_token = None - while True: - results = self.service.files().list( - q=query, - spaces='drive', - fields='nextPageToken, files(id, name, modifiedTime, mimeType)', - pageToken=page_token - ).execute() - - items = results.get('files', []) - - for item in items: - item_info = { - 'id': item['id'], - 'name': item['name'], - 'modifiedTime': item['modifiedTime'], - 'mimeType': item['mimeType'] - } - - if item['mimeType'] == 'application/vnd.google-apps.folder': - modified_folders.append(item_info) - else: - modified_files.append(item_info) - - page_token = results.get('nextPageToken') - if not page_token: - break - - except Exception as e: - logging.warning(f"Error scanning folder {folder_id}: {e}") - continue - - summary = { - 'total_modified_files': len(modified_files), - 'total_modified_folders': len(modified_folders), - 'scan_date': modified_after - } - - logging.info(f"Drive scan completed: {summary['total_modified_files']} files and {summary['total_modified_folders']} folders modified after {modified_after}") - - return { - 'modified_files': modified_files, - 'modified_folders': modified_folders, - 'scan_summary': summary - } - - except Exception as e: - logging.error(f"Error scanning drive contents: {e}", exc_info=True) - raise def _load_file_by_id(self, file_id: str, load_content: bool = True) -> Optional[Document]: self._ensure_service() diff --git a/application/worker.py b/application/worker.py index 75519df6..10fb6c2b 100755 --- a/application/worker.py +++ b/application/worker.py @@ -22,6 +22,7 @@ from application.api.answer.services.stream_processor import get_prompt from application.core.mongo_db import MongoDB from application.core.settings import settings from application.parser.chunking import Chunker +from application.parser.connectors.connector_creator import ConnectorCreator from application.parser.embedding_pipeline import embed_and_store_documents from application.parser.file.bulk import SimpleDirectoryReader from application.parser.remote.remote_creator import RemoteCreator @@ -879,98 +880,27 @@ def ingest_connector( # Step 1: Initialize the appropriate loader self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing connector"}) - # Handle incremental sync using Google Drive API directly - current_sync_time = datetime.datetime.now().isoformat() + 'Z' + if not session_token: + raise ValueError(f"{source_type} connector requires session_token") - if operation_mode == "sync": - if source_type == "google_drive": - from application.parser.connectors.connector_creator import ConnectorCreator - remote_loader = ConnectorCreator.create_connector("google_drive", session_token) + if not ConnectorCreator.is_supported(source_type): + raise ValueError(f"Unsupported connector type: {source_type}. Supported types: {ConnectorCreator.get_supported_connectors()}") - source = sources_collection.find_one({"_id": ObjectId(doc_id)}) - - last_sync_time = source.get("last_sync") - if not last_sync_time: - last_sync_time = source.get("date") - + remote_loader = ConnectorCreator.create_connector(source_type, session_token) - scan_results = remote_loader.scan_drive_contents( - file_ids or [], - folder_ids or [], - modified_after=last_sync_time - ) + # Create a clean config for storage + api_source_config = { + "file_ids": file_ids or [], + "folder_ids": folder_ids or [], + "recursive": recursive + } - modified_files = scan_results.get('modified_files', []) - modified_folders = scan_results.get('modified_folders', []) - - # Log atomic changes detected via Google Drive API - if modified_files: - logging.info(f"Files modified since last sync: {len(modified_files)} files") - for f in modified_files: - logging.info(f" - {f['name']} (ID: {f['id']}, Modified: {f['modifiedTime']})") - - if modified_folders: - logging.info(f"Folders modified since last sync: {len(modified_folders)} folders") - for f in modified_folders: - logging.info(f" - {f['name']} (ID: {f['id']}, Modified: {f['modifiedTime']})") - - if not modified_files and not modified_folders: - logging.info("No changes detected via Google Drive API") - return { - "user": user, - "name": job_name, - "tokens": 0, - "type": source_type, - "status": "no_changes" - } - - file_ids = [f['id'] for f in modified_files] - folder_ids = [f['id'] for f in modified_folders] - - if source_type == "google_drive": - if not session_token: - raise ValueError("Google Drive connector requires session_token") - - from application.parser.connectors.connector_creator import ConnectorCreator - remote_loader = ConnectorCreator.create_connector("google_drive", session_token) - - # Create a clean config for storage that excludes the session token - api_source_config = { - "file_ids": file_ids or [], - "folder_ids": folder_ids or [], - "recursive": recursive - } - - # Step 2: Download files to temp directory - self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"}) - download_info = remote_loader.download_to_directory( - temp_dir, - { - "file_ids": file_ids or [], - "folder_ids": folder_ids or [], - "recursive": recursive - } - ) - else: - # For other external knowledge base connectors (future: dropbox, onedrive, etc.) - from application.parser.connectors.connector_creator import ConnectorCreator - - if not ConnectorCreator.is_supported(source_type): - raise ValueError(f"Unsupported connector type: {source_type}. Supported types: {ConnectorCreator.get_supported_connectors()}") - - # Create connector with session token and other parameters - remote_loader = ConnectorCreator.create_connector(source_type, session_token) - - api_source_config = { - "file_ids": file_ids or [], - "folder_ids": folder_ids or [], - "recursive": recursive - } - - download_info = remote_loader.download_to_directory( - temp_dir, - api_source_config - ) + # Step 2: Download files to temp directory + self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"}) + download_info = remote_loader.download_to_directory( + temp_dir, + api_source_config + ) if download_info.get("empty_result", False) or not download_info.get("files_downloaded", 0): logging.warning(f"No files were downloaded from {source_type}") diff --git a/frontend/src/components/ConnectorTreeComponent.tsx b/frontend/src/components/ConnectorTreeComponent.tsx index 96e235c6..9e8ccdf9 100644 --- a/frontend/src/components/ConnectorTreeComponent.tsx +++ b/frontend/src/components/ConnectorTreeComponent.tsx @@ -10,7 +10,7 @@ import FolderIcon from '../assets/folder.svg'; import ArrowLeft from '../assets/arrow-left.svg'; import ThreeDots from '../assets/three-dots.svg'; import EyeView from '../assets/eye-view.svg'; -import SearchIcon from '../assets/search.svg'; +import SyncIcon from '../assets/sync.svg'; import { useOutsideAlerter } from '../hooks'; interface FileNode { @@ -59,6 +59,8 @@ const ConnectorTreeComponent: React.FC = ({ const [searchQuery, setSearchQuery] = useState(''); const [searchResults, setSearchResults] = useState([]); const searchDropdownRef = useRef(null); + const [isSyncing, setIsSyncing] = useState(false); + const [syncProgress, setSyncProgress] = useState(0); useOutsideAlerter( searchDropdownRef, @@ -78,6 +80,71 @@ const ConnectorTreeComponent: React.FC = ({ }); }; + const handleSync = async () => { + if (isSyncing) return; + + setIsSyncing(true); + setSyncProgress(0); + + try { + const response = await userService.syncConnector(docId, token); + const data = await response.json(); + + if (data.success) { + console.log('Sync started successfully:', data.task_id); + setSyncProgress(10); + + // Poll task status using userService + const maxAttempts = 30; + const pollInterval = 2000; + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + try { + const statusResponse = await userService.getTaskStatus(data.task_id, token); + const statusData = await statusResponse.json(); + + console.log(`Task status (attempt ${attempt + 1}):`, statusData.status); + + if (statusData.status === 'SUCCESS') { + setSyncProgress(100); + console.log('Sync completed successfully'); + + // Refresh directory structure + try { + const refreshResponse = await userService.getDirectoryStructure(docId, token); + const refreshData = await refreshResponse.json(); + if (refreshData && refreshData.directory_structure) { + setDirectoryStructure(refreshData.directory_structure); + } + } catch (err) { + console.error('Error refreshing directory structure:', err); + } + break; + } else if (statusData.status === 'FAILURE') { + console.error('Sync task failed:', statusData.result); + break; + } else if (statusData.status === 'PROGRESS') { + const progress = statusData.meta?.current || 0; + setSyncProgress(Math.max(10, progress)); // Ensure minimum 10% after start + } + + await new Promise((resolve) => setTimeout(resolve, pollInterval)); + } catch (error) { + console.error('Error polling task status:', error); + break; + } + } + } else { + console.error('Sync failed:', data.error); + } + } catch (err) { + console.error('Error syncing connector:', err); + } finally { + setIsSyncing(false); + setSyncProgress(0); + } + }; + useEffect(() => { const fetchDirectoryStructure = async () => { try { @@ -247,7 +314,27 @@ const ConnectorTreeComponent: React.FC = ({
+ {renderFileSearch()} + + {/* Sync button */} +
); diff --git a/frontend/src/locale/en.json b/frontend/src/locale/en.json index d0d1b4b3..39e2bee7 100644 --- a/frontend/src/locale/en.json +++ b/frontend/src/locale/en.json @@ -67,6 +67,7 @@ "preLoaded": "Pre-loaded", "private": "Private", "sync": "Sync", + "syncing": "Syncing...", "syncFrequency": { "never": "Never", "daily": "Daily", diff --git a/frontend/src/locale/es.json b/frontend/src/locale/es.json index 64e204dd..1c8afa6c 100644 --- a/frontend/src/locale/es.json +++ b/frontend/src/locale/es.json @@ -67,6 +67,7 @@ "preLoaded": "Precargado", "private": "Privado", "sync": "Sincronizar", + "syncing": "Sincronizando...", "syncFrequency": { "never": "Nunca", "daily": "Diario", diff --git a/frontend/src/locale/jp.json b/frontend/src/locale/jp.json index 5b93d182..ef29bd78 100644 --- a/frontend/src/locale/jp.json +++ b/frontend/src/locale/jp.json @@ -67,6 +67,7 @@ "preLoaded": "プリロード済み", "private": "プライベート", "sync": "同期", + "syncing": "同期中...", "syncFrequency": { "never": "なし", "daily": "毎日", diff --git a/frontend/src/locale/ru.json b/frontend/src/locale/ru.json index 6a83f9a3..a8506451 100644 --- a/frontend/src/locale/ru.json +++ b/frontend/src/locale/ru.json @@ -67,6 +67,7 @@ "preLoaded": "Предзагруженный", "private": "Частный", "sync": "Синхронизация", + "syncing": "Синхронизация...", "syncFrequency": { "never": "Никогда", "daily": "Ежедневно", diff --git a/frontend/src/locale/zh-TW.json b/frontend/src/locale/zh-TW.json index 8c734ff3..4f9c623b 100644 --- a/frontend/src/locale/zh-TW.json +++ b/frontend/src/locale/zh-TW.json @@ -67,6 +67,7 @@ "preLoaded": "預載入", "private": "私人", "sync": "同步", + "syncing": "同步中...", "syncFrequency": { "never": "從不", "daily": "每天", diff --git a/frontend/src/locale/zh.json b/frontend/src/locale/zh.json index eb216c86..014e8256 100644 --- a/frontend/src/locale/zh.json +++ b/frontend/src/locale/zh.json @@ -67,6 +67,7 @@ "preLoaded": "预加载", "private": "私有", "sync": "同步", + "syncing": "同步中...", "syncFrequency": { "never": "从不", "daily": "每天",