From f82be23ca94f678b6f04c336bc9e37a74b56debd Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Fri, 22 Aug 2025 13:33:21 +0530 Subject: [PATCH] (feat:ingestion) external drive connect --- application/api/user/routes.py | 445 +++++++++++++++++++++++++++++++++ application/api/user/tasks.py | 7 + application/worker.py | 144 +++++++++++ 3 files changed, 596 insertions(+) diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 9a2febbc..c6edec6f 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -28,6 +28,7 @@ from application.agents.tools.tool_manager import ToolManager from application.api.user.tasks import ( ingest, + ingest_connector_task, ingest_remote, process_agent_webhook, store_attachment, @@ -877,6 +878,42 @@ class UploadRemote(Resource): source_data = config.get("url") elif data["source"] == "reddit": source_data = config + elif data["source"] == "google_drive": + if "session_token" not in config: + return make_response(jsonify({ + "success": False, + "error": "Missing session_token in Google Drive configuration" + }), 400) + + session_token = config.get("session_token") + + file_ids = config.get("file_ids", []) + if isinstance(file_ids, str): + file_ids = [id.strip() for id in file_ids.split(',') if id.strip()] + elif not isinstance(file_ids, list): + file_ids = [] + + folder_id = config.get("folder_id", "") + if not isinstance(folder_id, str): + folder_id = str(folder_id) if folder_id else "" + + recursive = bool(config.get("recursive", False)) + + clean_config = { + "session_token": session_token, + "file_ids": file_ids, + "folder_id": folder_id, + "recursive": recursive + } + + from application.api.user.tasks import ingest_connector_task + task = ingest_connector_task.delay( + source_config=clean_config, + job_name=data["name"], + user=decoded_token.get("sub"), + source_type="google_drive" + ) + return make_response(jsonify({"success": True, "task_id": task.id}), 200) task = ingest_remote.delay( source_data=source_data, job_name=data["name"], @@ -3936,3 +3973,411 @@ class DirectoryStructure(Resource): return make_response( jsonify({"success": False, "error": str(e)}), 500 ) + + +@user_ns.route("/api/google-drive/auth") +class GoogleDriveAuth(Resource): + @api.doc(description="Get Google Drive OAuth authorization URL") + def get(self): + """Get Google Drive OAuth authorization URL""" + try: + from application.parser.remote.google_auth import GoogleDriveAuth + + auth = GoogleDriveAuth() + + # Generate state parameter for CSRF protection + import uuid + state = str(uuid.uuid4()) + + # Store state in session or database for validation + # For now, we'll include it in the URL and validate on callback + authorization_url = auth.get_authorization_url(state=state) + current_app.logger.info(f"Generated authorization URL: {authorization_url}") + return make_response( + jsonify({ + "success": True, + "authorization_url": authorization_url, + "state": state + }), + 200 + ) + + except Exception as e: + current_app.logger.error(f"Error generating Google Drive auth URL: {e}") + return make_response( + jsonify({"success": False, "error": str(e)}), 500 + ) + + +@user_ns.route("/api/google-drive/callback") +class GoogleDriveCallback(Resource): + @api.doc(description="Handle Google Drive OAuth callback") + def get(self): + """Handle Google Drive OAuth callback""" + try: + from application.parser.remote.google_auth import GoogleDriveAuth + from flask import request + import uuid + + # Get authorization code and state from query parameters + authorization_code = request.args.get('code') + _ = request.args.get('state') # We don't currently use state, but capture it to avoid unused variable warning + error = request.args.get('error') + + if error: + return make_response( + jsonify({"success": False, "error": f"OAuth error: {error}. Please try again and make sure to grant all requested permissions, including offline access."}), 400 + ) + + if not authorization_code: + return make_response( + jsonify({"success": False, "error": "Authorization code not provided. Please complete the authorization process and make sure to grant offline access."}), 400 + ) + + # Exchange code for tokens + try: + auth = GoogleDriveAuth() + token_info = auth.exchange_code_for_tokens(authorization_code) + + # Log detailed information about the token_info we received + current_app.logger.info(f"Token info received from OAuth callback - has refresh_token: {bool(token_info.get('refresh_token'))}, " + f"has access_token: {bool(token_info.get('access_token'))}, " + f"expiry: {token_info.get('expiry')}") + + # Log the full token_info structure (without sensitive data) + safe_token_info = {k: v for k, v in token_info.items() if k not in ['access_token', 'refresh_token', 'client_secret']} + current_app.logger.info(f"Full token info structure: {safe_token_info}") + + # Validate that we got token info + if not token_info: + current_app.logger.error("exchange_code_for_tokens returned None or empty result") + return make_response( + jsonify({"success": False, "error": "Failed to exchange authorization code for tokens. Please try again and make sure to grant all requested permissions, including offline access."}), 400 + ) + + # Validate required fields in token_info + required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret'] + missing_fields = [field for field in required_fields if not token_info.get(field)] + if missing_fields: + current_app.logger.error(f"Token info missing required fields: {missing_fields}") + return make_response( + jsonify({"success": False, "error": f"Token information incomplete. Missing fields: {missing_fields}. Please try again and make sure to grant all requested permissions."}), 400 + ) + + # Check if refresh_token is present - this is critical for long-term access + if not token_info.get('refresh_token'): + return make_response( + jsonify({ + "success": False, + "error": "OAuth flow did not return a refresh token. This typically happens when offline access wasn't granted. " + "Please reconnect your Google Drive account and ensure you grant offline access when prompted. " + "Make sure to check 'Allow offline access' during the authorization process." + }), 400 + ) + + # Validate required fields in token_info + required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret'] + missing_fields = [field for field in required_fields if not token_info.get(field)] + if missing_fields: + current_app.logger.error(f"Token info missing required fields: {missing_fields}") + return make_response( + jsonify({"success": False, "error": f"Token info missing required fields: {missing_fields}"}), 400 + ) + + except Exception as e: + current_app.logger.error(f"Error exchanging code for tokens: {e}", exc_info=True) + return make_response( + jsonify({"success": False, "error": f"Failed to exchange authorization code for tokens: {str(e)}"}), 400 + ) + + # Get user information + try: + credentials = auth.create_credentials_from_token_info(token_info) + service = auth.build_drive_service(credentials) + user_info = service.about().get(fields="user").execute() + user_email = user_info.get('user', {}).get('emailAddress', 'Connected User') + except Exception as e: + current_app.logger.warning(f"Could not get user info: {e}") + # Try to get user info without building service if we have access token + if token_info.get('access_token'): + try: + import requests + headers = {'Authorization': f'Bearer {token_info["access_token"]}'} + response = requests.get( + 'https://www.googleapis.com/drive/v3/about?fields=user', + headers=headers + ) + if response.status_code == 200: + user_info = response.json() + user_email = user_info.get('user', {}).get('emailAddress', 'Connected User') + else: + user_email = 'Connected User' + except Exception as request_error: + current_app.logger.warning(f"Could not get user info via direct request: {request_error}") + user_email = 'Connected User' + else: + user_email = 'Connected User' + + # Generate a session token + session_token = str(uuid.uuid4()) + + # Store token_info in MongoDB + from application.core.mongo_db import MongoDB + mongo = MongoDB.get_client() + db = mongo[settings.MONGO_DB_NAME] + sessions_collection = db["drive_sessions"] + + # Store only necessary token info, removing sensitive fields + sanitized_token_info = { + "access_token": token_info.get("access_token"), + "refresh_token": token_info.get("refresh_token"), + "token_uri": token_info.get("token_uri"), + "expiry": token_info.get("expiry"), + "scopes": token_info.get("scopes") + } + + # Store the sanitized token info with the session token + sessions_collection.insert_one({ + "session_token": session_token, + "token_info": sanitized_token_info, + "created_at": datetime.datetime.now(datetime.timezone.utc), + "user_email": user_email + }) + + # Return only the session token and user email to the client + return make_response( + jsonify({ + "success": True, + "message": "Google Drive authentication successful", + "session_token": session_token, + "user_email": user_email + }), + 200 + ) + + except Exception as e: + current_app.logger.error(f"Error handling Google Drive callback: {e}") + return make_response( + jsonify({ + "success": False, + "error": f"Failed to complete Google Drive authentication: {str(e)}. Please try again and make sure to grant all requested permissions, including offline access." + }), 500 + ) + + +@user_ns.route("/api/google-drive/refresh") +class GoogleDriveRefresh(Resource): + @api.expect( + api.model( + "GoogleDriveRefreshModel", + { + "refresh_token": fields.String(required=True, description="Refresh token") + } + ) + ) + @api.doc(description="Refresh Google Drive access token") + def post(self): + """Refresh Google Drive access token""" + try: + from application.parser.remote.google_auth import GoogleDriveAuth + + data = request.get_json() + refresh_token = data.get('refresh_token') + + if not refresh_token: + return make_response( + jsonify({"success": False, "error": "Refresh token not provided"}), 400 + ) + + auth = GoogleDriveAuth() + token_info = auth.refresh_access_token(refresh_token) + + return make_response( + jsonify({ + "success": True, + "message": "Token refreshed successfully", + "token_info": token_info + }), + 200 + ) + + except Exception as e: + current_app.logger.error(f"Error refreshing Google Drive token: {e}") + return make_response( + jsonify({ + "success": False, + "error": f"Failed to refresh Google Drive token: {str(e)}. Please reconnect your Google Drive account and make sure to grant offline access." + }), 500 + ) + + +@user_ns.route("/api/google-drive/files") +class GoogleDriveFiles(Resource): + @api.expect( + api.model( + "GoogleDriveFilesModel", + { + "session_token": fields.String(required=True, description="Google Drive session token"), + "folder_id": fields.String(description="Google Drive folder ID to fetch files from. If not provided, fetches from root", required=False), + "limit": fields.Integer(description="Maximum number of files to return", default=50) + } + ) + ) + @api.doc(description="Get list of files from Google Drive") + def post(self): + """Get list of files from Google Drive""" + try: + from application.parser.remote.google_drive_loader import GoogleDriveLoader + + data = request.get_json() + session_token = data.get('session_token') + folder_id = data.get('folder_id') + limit = data.get('limit', 50) + + if not session_token: + return make_response( + jsonify({"success": False, "error": "Session token not provided"}), 400 + ) + + # Create Google Drive loader with session token only + loader = GoogleDriveLoader(session_token) + + # Get files from Google Drive (limit to first N files, metadata only) + files_config = { + 'limit': limit, + 'list_only': True, + 'session_token': session_token, + 'folder_id': folder_id + } + documents = loader.load_data(files_config) + + # Convert documents to file list format + files = [] + for doc in documents[:limit]: + # Use extra_info instead of doc_metadata + metadata = doc.extra_info + files.append({ + 'id': doc.doc_id, + 'name': metadata.get('file_name', 'Unknown File'), + 'type': metadata.get('mime_type', 'unknown'), + 'size': metadata.get('size', 'Unknown'), + 'modifiedTime': metadata.get('modified_time', 'Unknown'), + 'iconUrl': get_file_icon(metadata.get('mime_type', '')) + }) + + return make_response( + jsonify({ + "success": True, + "files": files, + "total": len(files) + }), + 200 + ) + + except Exception as e: + current_app.logger.error(f"Error loading Google Drive files: {e}") + return make_response( + jsonify({ + "success": False, + "error": f"Failed to load files: {str(e)}. Please make sure your Google Drive account is properly connected and you granted offline access during authorization." + }), 500 + ) + +def get_file_icon(mime_type: str) -> str: + """Get appropriate icon for file type""" + if 'pdf' in mime_type: + return '📄' + elif 'word' in mime_type or 'document' in mime_type: + return '📝' + elif 'presentation' in mime_type or 'powerpoint' in mime_type: + return '📊' + elif 'spreadsheet' in mime_type or 'excel' in mime_type: + return '📈' + elif 'text' in mime_type: + return '📄' + elif 'image' in mime_type: + return '🖼️' + else: + return '📄' + +@user_ns.route("/api/google-drive/validate-session") +class GoogleDriveValidateSession(Resource): + @api.expect( + api.model( + "GoogleDriveValidateSessionModel", + { + "session_token": fields.String(required=True, description="Google Drive session token") + } + ) + ) + @api.doc(description="Validate Google Drive session token") + def post(self): + """Validate Google Drive session token and return user info""" + try: + from application.core.mongo_db import MongoDB + from application.parser.remote.google_auth import GoogleDriveAuth + + data = request.get_json() + session_token = data.get('session_token') + + if not session_token: + return make_response( + jsonify({"success": False, "error": "Session token not provided"}), 400 + ) + + # Retrieve session from MongoDB using session token + mongo = MongoDB.get_client() + db = mongo[settings.MONGO_DB_NAME] + sessions_collection = db["drive_sessions"] + + session = sessions_collection.find_one({"session_token": session_token}) + if not session or "token_info" not in session: + return make_response( + jsonify({"success": False, "error": "Invalid or expired session"}), 401 + ) + + # Get token info and check if it's expired + token_info = session["token_info"] + auth = GoogleDriveAuth() + + # Check if token is expired using our improved method + is_expired = auth.is_token_expired(token_info) + + # Attempt to refresh token if needed + if is_expired and 'refresh_token' in token_info: + try: + current_app.logger.info("Refreshing expired Google Drive token") + refreshed_token_info = auth.refresh_access_token(token_info['refresh_token']) + + # Update token in database + sessions_collection.update_one( + {"session_token": session_token}, + {"$set": {"token_info": refreshed_token_info}} + ) + + # Use the refreshed token info + token_info = refreshed_token_info + except Exception as e: + current_app.logger.error(f"Error refreshing token: {e}", exc_info=True) + return make_response( + jsonify({"success": False, "error": "Session expired and could not be refreshed"}), 401 + ) + + # Return success with user email + return make_response( + jsonify({ + "success": True, + "user_email": session.get("user_email", "Connected User"), + "message": "Session is valid" + }), + 200 + ) + + except Exception as e: + current_app.logger.error(f"Error validating Google Drive session: {e}", exc_info=True) + return make_response( + jsonify({ + "success": False, + "error": f"Failed to validate session: {str(e)}. Please reconnect your Google Drive account and make sure to grant offline access during authorization." + }), 500 + ) diff --git a/application/api/user/tasks.py b/application/api/user/tasks.py index 28a78c0d..bfed7f5a 100644 --- a/application/api/user/tasks.py +++ b/application/api/user/tasks.py @@ -47,6 +47,13 @@ def process_agent_webhook(self, agent_id, payload): return resp +@celery.task(bind=True) +def ingest_connector_task(self, source_config, job_name, user, source_type, retriever="classic"): + from application.worker import ingest_connector + resp = ingest_connector(self, job_name, user, source_type, source_config, retriever) + return resp + + @celery.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task( diff --git a/application/worker.py b/application/worker.py index 7309806d..a9503734 100755 --- a/application/worker.py +++ b/application/worker.py @@ -6,6 +6,7 @@ import os import shutil import string import tempfile +from typing import Any, Dict import zipfile from collections import Counter @@ -835,3 +836,146 @@ def agent_webhook_worker(self, agent_id, payload): f"Webhook processed for agent {agent_id}", extra={"agent_id": agent_id} ) return {"status": "success", "result": result} + + +def ingest_connector( + self, job_name: str, user: str, source_type: str, + source_config: Dict[str, Any], retriever: str = "classic" +) -> Dict[str, Any]: + """ + ingestion for internal knowledge bases(GoogleDrive). + + Args: + job_name: Name of the ingestion job + user: User identifier + source_type: Type of remote source ("google_drive", "dropbox", etc.) + source_config: Configuration specific to the source type + retriever: Type of retriever to use + """ + logging.info(f"Starting remote ingestion from {source_type} for user: {user}, job: {job_name}") + self.update_state(state="PROGRESS", meta={"current": 1}) + + with tempfile.TemporaryDirectory() as temp_dir: + try: + # Step 1: Get the appropriate remote loader + logging.info(f"source_config {source_config}") + + if source_type == "google_drive": + session_token = source_config.get("session_token") + if not session_token: + raise ValueError("Google Drive connector requires session_token in source_config") + + from application.parser.remote.google_drive_loader import GoogleDriveLoader + remote_loader = GoogleDriveLoader(session_token) + + # Create a clean config for storage that excludes the session token + api_source_config = { + "file_ids": source_config.get("file_ids", []), + "folder_id": source_config.get("folder_id", ""), + } + + if source_config.get("recursive") is not None: + api_source_config["recursive"] = source_config.get("recursive") + else: + remote_loader = RemoteCreator.create_loader(source_type, source_config) + api_source_config = source_config + + # Step 2: Download files to temp directory + self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"}) + + # For Google Drive, pass the source_config to download_to_directory + if source_type == "google_drive": + download_info = remote_loader.download_to_directory(temp_dir, source_config) + else: + download_info = remote_loader.download_to_directory(temp_dir) + + if download_info.get("empty_result", False) or not download_info.get("files_downloaded", 0): + logging.warning(f"No files were downloaded from {source_type}") + # Create empty result directly instead of calling a separate method + return { + "name": job_name, + "user": user, + "tokens": 0, + "type": source_type, + "source_config": source_config, + "directory_structure": "{}", + } + + # Step 3: Use SimpleDirectoryReader to process downloaded files + self.update_state(state="PROGRESS", meta={"current": 40, "status": "Processing files"}) + reader = SimpleDirectoryReader( + input_dir=temp_dir, + recursive=True, + required_exts=[ + ".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub", + ".html", ".mdx", ".json", ".xlsx", ".pptx", ".png", + ".jpg", ".jpeg", + ], + exclude_hidden=True, + file_metadata=metadata_from_filename, + ) + raw_docs = reader.load_data() + directory_structure = getattr(reader, 'directory_structure', {}) + + # Step 4: Process documents (chunking, embedding, etc.) + self.update_state(state="PROGRESS", meta={"current": 60, "status": "Processing documents"}) + + chunker = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False, + ) + raw_docs = chunker.chunk(documents=raw_docs) + + # Preserve source information in document metadata + for doc in raw_docs: + if hasattr(doc, 'extra_info') and doc.extra_info: + source = doc.extra_info.get('source') + if source and os.path.isabs(source): + # Convert absolute path to relative path + doc.extra_info['source'] = os.path.relpath(source, start=temp_dir) + + docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] + + # Step 5: Store in vector database + id = ObjectId() + vector_store_path = os.path.join(temp_dir, "vector_store") + os.makedirs(vector_store_path, exist_ok=True) + + self.update_state(state="PROGRESS", meta={"current": 80, "status": "Storing documents"}) + embed_and_store_documents(docs, vector_store_path, id, self) + + tokens = count_tokens_docs(docs) + + # Step 6: Upload index files + file_data = { + "user": user, + "name": job_name, + "tokens": tokens, + "retriever": retriever, + "id": str(id), + "type": source_type, + "remote_data": json.dumps(api_source_config), + "directory_structure": json.dumps(directory_structure) + } + + upload_index(vector_store_path, file_data) + + # Ensure we mark the task as complete + self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"}) + + logging.info(f"Remote ingestion completed: {job_name}") + + return { + "user": user, + "name": job_name, + "tokens": tokens, + "type": source_type, + "id": str(id), + "status": "complete" + } + + except Exception as e: + logging.error(f"Error during remote ingestion: {e}", exc_info=True) + raise