diff --git a/application/api/connector/routes.py b/application/api/connector/routes.py index 65a5d8c5..014fc71f 100644 --- a/application/api/connector/routes.py +++ b/application/api/connector/routes.py @@ -74,6 +74,7 @@ class UploadConnector(Resource): try: config = json.loads(data["data"]) source_data = None + sync_frequency = config.get("sync_frequency", "never") if data["source"] == "github": source_data = config.get("repo_url") @@ -112,7 +113,8 @@ class UploadConnector(Resource): file_ids=file_ids, folder_ids=folder_ids, recursive=config.get("recursive", False), - retriever=config.get("retriever", "classic") + retriever=config.get("retriever", "classic"), + sync_frequency=sync_frequency ) return make_response(jsonify({"success": True, "task_id": task.id}), 200) task = ingest_connector_task.delay( @@ -120,6 +122,7 @@ class UploadConnector(Resource): job_name=data["name"], user=decoded_token.get("sub"), loader=data["source"], + sync_frequency=sync_frequency ) except Exception as err: current_app.logger.error( @@ -511,3 +514,113 @@ class ConnectorDisconnect(Resource): return make_response(jsonify({"success": False, "error": str(e)}), 500) +@connectors_ns.route("/api/connectors/sync") +class ConnectorSync(Resource): + @api.expect( + api.model( + "ConnectorSyncModel", + { + "source_id": fields.String(required=True, description="Source ID to sync"), + "session_token": fields.String(required=True, description="Authentication token") + }, + ) + ) + @api.doc(description="Sync connector source to check for modifications") + def post(self): + decoded_token = request.decoded_token + if not decoded_token: + return make_response(jsonify({"success": False}), 401) + + try: + data = request.get_json() + source_id = data.get('source_id') + session_token = data.get('session_token') + + if not all([source_id, session_token]): + return make_response( + jsonify({ + "success": False, + "error": "source_id and session_token are required" + }), + 400 + ) + source = sources_collection.find_one({"_id": ObjectId(source_id)}) + if not source: + return make_response( + jsonify({ + "success": False, + "error": "Source not found" + }), + 404 + ) + + if source.get('user') != decoded_token.get('sub'): + return make_response( + jsonify({ + "success": False, + "error": "Unauthorized access to source" + }), + 403 + ) + + remote_data = {} + try: + if source.get('remote_data'): + remote_data = json.loads(source.get('remote_data')) + except json.JSONDecodeError: + current_app.logger.error(f"Invalid remote_data format for source {source_id}") + remote_data = {} + + source_type = remote_data.get('provider') + if not source_type: + return make_response( + jsonify({ + "success": False, + "error": "Source provider not found in remote_data" + }), + 400 + ) + + # Extract configuration from remote_data + file_ids = remote_data.get('file_ids', []) + folder_ids = remote_data.get('folder_ids', []) + recursive = remote_data.get('recursive', True) + + # Start the sync task + task = ingest_connector_task.delay( + job_name=source.get('name'), + user=decoded_token.get('sub'), + source_type=source_type, + session_token=session_token, + file_ids=file_ids, + folder_ids=folder_ids, + recursive=recursive, + retriever=source.get('retriever', 'classic'), + operation_mode="sync", + doc_id=source_id, + sync_frequency=source.get('sync_frequency', 'never') + ) + + return make_response( + jsonify({ + "success": True, + "task_id": task.id + }), + 200 + ) + + except Exception as err: + current_app.logger.error( + f"Error syncing connector source: {err}", + exc_info=True + ) + return make_response( + jsonify({ + "success": False, + "error": str(err) + }), + 400 + ) + + + diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index 833edbff..3519b701 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -49,27 +49,33 @@ def process_agent_webhook(self, agent_id, payload): @celery.task(bind=True) def ingest_connector_task( - self, - job_name, - user, - source_type, - session_token=None, - file_ids=None, - folder_ids=None, + self, + job_name, + user, + source_type, + session_token=None, + file_ids=None, + folder_ids=None, recursive=True, - retriever="classic" + retriever="classic", + operation_mode="upload", + doc_id=None, + sync_frequency="never" ): from application.worker import ingest_connector resp = ingest_connector( - self, - job_name, - user, - source_type, + self, + job_name, + user, + source_type, session_token=session_token, file_ids=file_ids, folder_ids=folder_ids, recursive=recursive, - retriever=retriever + retriever=retriever, + operation_mode=operation_mode, + doc_id=doc_id, + sync_frequency=sync_frequency ) return resp diff --git a/application/parser/connectors/google_drive/loader.py b/application/parser/connectors/google_drive/loader.py index a81ad4d4..06737748 100644 --- a/application/parser/connectors/google_drive/loader.py +++ b/application/parser/connectors/google_drive/loader.py @@ -146,6 +146,116 @@ class GoogleDriveLoader(BaseConnectorLoader): logging.error(f"Error loading data from Google Drive: {e}", exc_info=True) raise + def scan_drive_contents(self, file_ids: List[str], folder_ids: List[str], + modified_after: str = "2024-01-01T00:00:00Z") -> Dict[str, Any]: + """ + Scan Google Drive contents and check for files/folders modified after a specific date. + + Args: + file_ids: List of specific file IDs to check + folder_ids: List of folder IDs to scan for modified contents + modified_after: ISO 8601 formatted date string (default: "2024-01-01T00:00:00Z") + + Returns: + Dictionary containing: + - 'modified_files': List of file IDs that were modified after the given date + - 'modified_folders': List of folder IDs that were modified after the given date + - 'scan_summary': Summary of the scan results + """ + self._ensure_service() + + modified_files = [] + modified_folders = [] + + try: + for file_id in file_ids: + try: + file_metadata = self.service.files().get( + fileId=file_id, + fields='id,name,modifiedTime,mimeType' + ).execute() + + modified_time = file_metadata.get('modifiedTime', '') + if modified_time > modified_after: + modified_files.append({ + 'id': file_id, + 'name': file_metadata.get('name', 'Unknown'), + 'modifiedTime': modified_time, + 'mimeType': file_metadata.get('mimeType', '') + }) + + except Exception as e: + logging.warning(f"Error checking file {file_id}: {e}") + continue + + for folder_id in folder_ids: + try: + folder_metadata = self.service.files().get( + fileId=folder_id, + fields='id,name,modifiedTime,mimeType' + ).execute() + + folder_modified_time = folder_metadata.get('modifiedTime', '') + if folder_modified_time > modified_after: + modified_folders.append({ + 'id': folder_id, + 'name': folder_metadata.get('name', 'Unknown'), + 'modifiedTime': folder_modified_time, + 'mimeType': folder_metadata.get('mimeType', '') + }) + + query = f"'{folder_id}' in parents and modifiedTime > '{modified_after}'" + + page_token = None + while True: + results = self.service.files().list( + q=query, + spaces='drive', + fields='nextPageToken, files(id, name, modifiedTime, mimeType)', + pageToken=page_token + ).execute() + + items = results.get('files', []) + + for item in items: + item_info = { + 'id': item['id'], + 'name': item['name'], + 'modifiedTime': item['modifiedTime'], + 'mimeType': item['mimeType'] + } + + if item['mimeType'] == 'application/vnd.google-apps.folder': + modified_folders.append(item_info) + else: + modified_files.append(item_info) + + page_token = results.get('nextPageToken') + if not page_token: + break + + except Exception as e: + logging.warning(f"Error scanning folder {folder_id}: {e}") + continue + + summary = { + 'total_modified_files': len(modified_files), + 'total_modified_folders': len(modified_folders), + 'scan_date': modified_after + } + + logging.info(f"Drive scan completed: {summary['total_modified_files']} files and {summary['total_modified_folders']} folders modified after {modified_after}") + + return { + 'modified_files': modified_files, + 'modified_folders': modified_folders, + 'scan_summary': summary + } + + except Exception as e: + logging.error(f"Error scanning drive contents: {e}", exc_info=True) + raise + def _load_file_by_id(self, file_id: str, load_content: bool = True) -> Optional[Document]: self._ensure_service() diff --git a/application/worker.py b/application/worker.py index e231474c..75519df6 100755 --- a/application/worker.py +++ b/application/worker.py @@ -650,8 +650,11 @@ def remote_worker( "id": str(id), "type": loader, "remote_data": source_data, - "sync_frequency": sync_frequency, + "sync_frequency": sync_frequency } + + if operation_mode == "sync": + file_data["last_sync"] = datetime.datetime.now() upload_index(full_path, file_data) except Exception as e: logging.error("Error in remote_worker task: %s", str(e), exc_info=True) @@ -708,7 +711,7 @@ def sync_worker(self, frequency): self, source_data, name, user, source_type, frequency, retriever, doc_id ) sync_counts["total_sync_count"] += 1 - sync_counts[ + sync_counts[ "sync_success" if resp["status"] == "success" else "sync_failure" ] += 1 return { @@ -745,7 +748,7 @@ def attachment_worker(self, file_info, user): input_files=[local_path], exclude_hidden=True, errors="ignore" ) .load_data()[0] - .text, + .text, ) @@ -839,15 +842,18 @@ def agent_webhook_worker(self, agent_id, payload): def ingest_connector( - self, - job_name: str, - user: str, + self, + job_name: str, + user: str, source_type: str, session_token=None, - file_ids=None, - folder_ids=None, + file_ids=None, + folder_ids=None, recursive=True, - retriever: str = "classic" + retriever: str = "classic", + operation_mode: str = "upload", + doc_id=None, + sync_frequency: str = "never", ) -> Dict[str, Any]: """ Ingestion for internal knowledge bases (GoogleDrive, etc.). @@ -861,6 +867,9 @@ def ingest_connector( folder_ids: List of folder IDs to download recursive: Whether to recursively download folders retriever: Type of retriever to use + operation_mode: "upload" for initial ingestion, "sync" for incremental sync + doc_id: Document ID for sync operations (required when operation_mode="sync") + sync_frequency: How often to sync ("never", "daily", "weekly", "monthly") """ logging.info(f"Starting remote ingestion from {source_type} for user: {user}, job: {job_name}") self.update_state(state="PROGRESS", meta={"current": 1}) @@ -869,25 +878,73 @@ def ingest_connector( try: # Step 1: Initialize the appropriate loader self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing connector"}) - + + # Handle incremental sync using Google Drive API directly + current_sync_time = datetime.datetime.now().isoformat() + 'Z' + + if operation_mode == "sync": + if source_type == "google_drive": + from application.parser.connectors.connector_creator import ConnectorCreator + remote_loader = ConnectorCreator.create_connector("google_drive", session_token) + + source = sources_collection.find_one({"_id": ObjectId(doc_id)}) + + last_sync_time = source.get("last_sync") + if not last_sync_time: + last_sync_time = source.get("date") + + + scan_results = remote_loader.scan_drive_contents( + file_ids or [], + folder_ids or [], + modified_after=last_sync_time + ) + + modified_files = scan_results.get('modified_files', []) + modified_folders = scan_results.get('modified_folders', []) + + # Log atomic changes detected via Google Drive API + if modified_files: + logging.info(f"Files modified since last sync: {len(modified_files)} files") + for f in modified_files: + logging.info(f" - {f['name']} (ID: {f['id']}, Modified: {f['modifiedTime']})") + + if modified_folders: + logging.info(f"Folders modified since last sync: {len(modified_folders)} folders") + for f in modified_folders: + logging.info(f" - {f['name']} (ID: {f['id']}, Modified: {f['modifiedTime']})") + + if not modified_files and not modified_folders: + logging.info("No changes detected via Google Drive API") + return { + "user": user, + "name": job_name, + "tokens": 0, + "type": source_type, + "status": "no_changes" + } + + file_ids = [f['id'] for f in modified_files] + folder_ids = [f['id'] for f in modified_folders] + if source_type == "google_drive": if not session_token: raise ValueError("Google Drive connector requires session_token") from application.parser.connectors.connector_creator import ConnectorCreator remote_loader = ConnectorCreator.create_connector("google_drive", session_token) - + # Create a clean config for storage that excludes the session token api_source_config = { "file_ids": file_ids or [], "folder_ids": folder_ids or [], "recursive": recursive } - + # Step 2: Download files to temp directory self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"}) download_info = remote_loader.download_to_directory( - temp_dir, + temp_dir, { "file_ids": file_ids or [], "folder_ids": folder_ids or [], @@ -942,6 +999,8 @@ def ingest_connector( ) raw_docs = reader.load_data() directory_structure = getattr(reader, 'directory_structure', {}) + + # Step 4: Process documents (chunking, embedding, etc.) self.update_state(state="PROGRESS", meta={"current": 60, "status": "Processing documents"}) @@ -964,8 +1023,16 @@ def ingest_connector( docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] - # Step 5: Store in vector database - id = ObjectId() + if operation_mode == "upload": + id = ObjectId() + elif operation_mode == "sync": + if not doc_id or not ObjectId.is_valid(doc_id): + logging.error("Invalid doc_id provided for sync operation: %s", doc_id) + raise ValueError("doc_id must be provided for sync operation.") + id = ObjectId(doc_id) + else: + raise ValueError(f"Invalid operation_mode: {operation_mode}") + vector_store_path = os.path.join(temp_dir, "vector_store") os.makedirs(vector_store_path, exist_ok=True) @@ -986,16 +1053,22 @@ def ingest_connector( "provider": source_type, **api_source_config }), - "directory_structure": json.dumps(directory_structure) + "directory_structure": json.dumps(directory_structure), + "sync_frequency": sync_frequency } - + + if operation_mode == "sync": + file_data["last_sync"] = datetime.datetime.now() + else: + file_data["last_sync"] = datetime.datetime.now() + upload_index(vector_store_path, file_data) - + # Ensure we mark the task as complete self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"}) - + logging.info(f"Remote ingestion completed: {job_name}") - + return { "user": user, "name": job_name,