import datetime import json from bson.objectid import ObjectId from flask import ( Blueprint, current_app, jsonify, make_response, request ) from flask_restx import fields, Namespace, Resource from application.api.user.tasks import ( ingest_connector_task, ) from application.core.mongo_db import MongoDB from application.core.settings import settings from application.api import api from application.utils import ( check_required_fields ) from application.parser.connectors.connector_creator import ConnectorCreator mongo = MongoDB.get_client() db = mongo[settings.MONGO_DB_NAME] sources_collection = db["sources"] connector = Blueprint("connector", __name__) connectors_ns = Namespace("connectors", description="Connector operations", path="/") api.add_namespace(connectors_ns) @connectors_ns.route("/api/connectors/upload") class UploadConnector(Resource): @api.expect( api.model( "ConnectorUploadModel", { "user": fields.String(required=True, description="User ID"), "source": fields.String( required=True, description="Source type (google_drive, github, etc.)" ), "name": fields.String(required=True, description="Job name"), "data": fields.String(required=True, description="Configuration data"), "repo_url": fields.String(description="GitHub repository URL"), }, ) ) @api.doc( description="Uploads connector source for vectorization", ) def post(self): decoded_token = request.decoded_token if not decoded_token: return make_response(jsonify({"success": False}), 401) data = request.form required_fields = ["user", "source", "name", "data"] missing_fields = check_required_fields(data, required_fields) if missing_fields: return missing_fields try: config = json.loads(data["data"]) source_data = None if data["source"] == "github": source_data = config.get("repo_url") elif data["source"] in ["crawler", "url"]: source_data = config.get("url") elif data["source"] == "reddit": source_data = config elif data["source"] in ConnectorCreator.get_supported_connectors(): session_token = config.get("session_token") if not session_token: return make_response(jsonify({ "success": False, "error": f"Missing session_token in {data['source']} configuration" }), 400) file_ids = config.get("file_ids", []) if isinstance(file_ids, str): file_ids = [id.strip() for id in file_ids.split(',') if id.strip()] elif not isinstance(file_ids, list): file_ids = [] folder_ids = config.get("folder_ids", []) if isinstance(folder_ids, str): folder_ids = [id.strip() for id in folder_ids.split(',') if id.strip()] elif not isinstance(folder_ids, list): folder_ids = [] config["file_ids"] = file_ids config["folder_ids"] = folder_ids task = ingest_connector_task.delay( job_name=data["name"], user=decoded_token.get("sub"), source_type=data["source"], session_token=session_token, file_ids=file_ids, folder_ids=folder_ids, recursive=config.get("recursive", False), retriever=config.get("retriever", "classic") ) return make_response(jsonify({"success": True, "task_id": task.id}), 200) task = ingest_connector_task.delay( source_data=source_data, job_name=data["name"], user=decoded_token.get("sub"), loader=data["source"], ) except Exception as err: current_app.logger.error( f"Error uploading connector source: {err}", exc_info=True ) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True, "task_id": task.id}), 200) @connectors_ns.route("/api/connectors/task_status") class ConnectorTaskStatus(Resource): task_status_model = api.model( "ConnectorTaskStatusModel", {"task_id": fields.String(required=True, description="Task ID")}, ) @api.expect(task_status_model) @api.doc(description="Get connector task status") def get(self): task_id = request.args.get("task_id") if not task_id: return make_response( jsonify({"success": False, "message": "Task ID is required"}), 400 ) try: from application.celery_init import celery task = celery.AsyncResult(task_id) task_meta = task.info print(f"Task status: {task.status}") if not isinstance( task_meta, (dict, list, str, int, float, bool, type(None)) ): task_meta = str(task_meta) except Exception as err: current_app.logger.error(f"Error getting task status: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"status": task.status, "result": task_meta}), 200) @connectors_ns.route("/api/connectors/sources") class ConnectorSources(Resource): @api.doc(description="Get connector sources") def get(self): decoded_token = request.decoded_token if not decoded_token: return make_response(jsonify({"success": False}), 401) user = decoded_token.get("sub") try: sources = sources_collection.find({"user": user, "type": "connector"}).sort("date", -1) connector_sources = [] for source in sources: connector_sources.append({ "id": str(source["_id"]), "name": source.get("name"), "date": source.get("date"), "type": source.get("type"), "source": source.get("source"), "tokens": source.get("tokens", ""), "retriever": source.get("retriever", "classic"), "syncFrequency": source.get("sync_frequency", ""), }) except Exception as err: current_app.logger.error(f"Error retrieving connector sources: {err}", exc_info=True) return make_response(jsonify({"success": False}), 400) return make_response(jsonify(connector_sources), 200) @connectors_ns.route("/api/connectors/delete") class DeleteConnectorSource(Resource): @api.doc( description="Delete a connector source", params={"source_id": "The source ID to delete"}, ) def delete(self): decoded_token = request.decoded_token if not decoded_token: return make_response(jsonify({"success": False}), 401) source_id = request.args.get("source_id") if not source_id: return make_response( jsonify({"success": False, "message": "source_id is required"}), 400 ) try: result = sources_collection.delete_one( {"_id": ObjectId(source_id), "user": decoded_token.get("sub")} ) if result.deleted_count == 0: return make_response( jsonify({"success": False, "message": "Source not found"}), 404 ) except Exception as err: current_app.logger.error( f"Error deleting connector source: {err}", exc_info=True ) return make_response(jsonify({"success": False}), 400) return make_response(jsonify({"success": True}), 200) @connectors_ns.route("/api/connectors/auth") class ConnectorAuth(Resource): @api.doc(description="Get connector OAuth authorization URL", params={"provider": "Connector provider (e.g., google_drive)"}) def get(self): try: provider = request.args.get('provider') or request.args.get('source') if not provider: return make_response(jsonify({"success": False, "error": "Missing provider"}), 400) if not ConnectorCreator.is_supported(provider): return make_response(jsonify({"success": False, "error": f"Unsupported provider: {provider}"}), 400) import uuid state = str(uuid.uuid4()) auth = ConnectorCreator.create_auth(provider) authorization_url = auth.get_authorization_url(state=state) return make_response(jsonify({ "success": True, "authorization_url": authorization_url, "state": state }), 200) except Exception as e: current_app.logger.error(f"Error generating connector auth URL: {e}") return make_response(jsonify({"success": False, "error": str(e)}), 500) @connectors_ns.route("/api/connectors/callback") class ConnectorsCallback(Resource): @api.doc(description="Handle OAuth callback for external connectors") def get(self): """Handle OAuth callback for external connectors""" try: from application.parser.connectors.connector_creator import ConnectorCreator from flask import request import uuid authorization_code = request.args.get('code') _ = request.args.get('state') error = request.args.get('error') if error: return make_response( jsonify({"success": False, "error": f"OAuth error: {error}. Please try again and make sure to grant all requested permissions, including offline access."}), 400 ) if not authorization_code: return make_response( jsonify({"success": False, "error": "Authorization code not provided. Please complete the authorization process and make sure to grant offline access."}), 400 ) try: auth = ConnectorCreator.create_auth("google_drive") token_info = auth.exchange_code_for_tokens(authorization_code) current_app.logger.info(f"Token info received from OAuth callback - has refresh_token: {bool(token_info.get('refresh_token'))}, " f"has access_token: {bool(token_info.get('access_token'))}, " f"expiry: {token_info.get('expiry')}") safe_token_info = {k: v for k, v in token_info.items() if k not in ['access_token', 'refresh_token', 'client_secret']} current_app.logger.info(f"Full token info structure: {safe_token_info}") # Validate that we got token info if not token_info: current_app.logger.error("exchange_code_for_tokens returned None or empty result") return make_response( jsonify({"success": False, "error": "Failed to exchange authorization code for tokens. Please try again and make sure to grant all requested permissions, including offline access."}), 400 ) # Validate required fields in token_info required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret'] missing_fields = [field for field in required_fields if not token_info.get(field)] if missing_fields: current_app.logger.error(f"Token info missing required fields: {missing_fields}") return make_response( jsonify({"success": False, "error": f"Token information incomplete. Missing fields: {missing_fields}. Please try again and make sure to grant all requested permissions."}), 400 ) if not token_info.get('refresh_token'): current_app.logger.warning("OAuth flow did not return a refresh token - user will need to re-authenticate when token expires") required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret'] missing_fields = [field for field in required_fields if not token_info.get(field)] if missing_fields: current_app.logger.error(f"Token info missing required fields: {missing_fields}") return make_response( jsonify({"success": False, "error": f"Token info missing required fields: {missing_fields}"}), 400 ) except Exception as e: current_app.logger.error(f"Error exchanging code for tokens: {str(e)}", exc_info=True) if 'refresh' in str(e).lower(): current_app.logger.warning(f"Missing refresh token but continuing: {str(e)}") else: return make_response( jsonify({"success": False, "error": f"Failed to exchange authorization code for tokens: {str(e)}"}), 400 ) try: credentials = auth.create_credentials_from_token_info(token_info) service = auth.build_drive_service(credentials) user_info = service.about().get(fields="user").execute() user_email = user_info.get('user', {}).get('emailAddress', 'Connected User') except Exception as e: current_app.logger.warning(f"Could not get user info: {e}") if token_info.get('access_token'): try: import requests headers = {'Authorization': f'Bearer {token_info["access_token"]}'} response = requests.get( 'https://www.googleapis.com/drive/v3/about?fields=user', headers=headers ) if response.status_code == 200: user_info = response.json() user_email = user_info.get('user', {}).get('emailAddress', 'Connected User') else: user_email = 'Connected User' except Exception as request_error: current_app.logger.warning(f"Could not get user info via direct request: {request_error}") user_email = 'Connected User' else: user_email = 'Connected User' session_token = str(uuid.uuid4()) from application.core.mongo_db import MongoDB mongo = MongoDB.get_client() db = mongo[settings.MONGO_DB_NAME] sessions_collection = db["connector_sessions"] sanitized_token_info = { "access_token": token_info.get("access_token"), "refresh_token": token_info.get("refresh_token"), "token_uri": token_info.get("token_uri"), "expiry": token_info.get("expiry"), "scopes": token_info.get("scopes") } sessions_collection.insert_one({ "session_token": session_token, "token_info": sanitized_token_info, "created_at": datetime.datetime.now(datetime.timezone.utc), "user_email": user_email }) return make_response( jsonify({ "success": True, "message": "Google Drive authentication successful", "session_token": session_token, "user_email": user_email }), 200 ) except Exception as e: current_app.logger.error(f"Error handling connector callback: {e}") return make_response( jsonify({ "success": False, "error": f"Failed to complete connector authentication: {str(e)}. Please try again and make sure to grant all requested permissions, including offline access." }), 500 ) @connectors_ns.route("/api/connectors/refresh") class ConnectorRefresh(Resource): @api.expect(api.model("ConnectorRefreshModel", {"provider": fields.String(required=True), "refresh_token": fields.String(required=True)})) @api.doc(description="Refresh connector access token") def post(self): try: data = request.get_json() provider = data.get('provider') refresh_token = data.get('refresh_token') if not provider or not refresh_token: return make_response(jsonify({"success": False, "error": "provider and refresh_token are required"}), 400) auth = ConnectorCreator.create_auth(provider) token_info = auth.refresh_access_token(refresh_token) return make_response(jsonify({"success": True, "token_info": token_info}), 200) except Exception as e: current_app.logger.error(f"Error refreshing token for connector: {e}") return make_response(jsonify({"success": False, "error": str(e)}), 500) @connectors_ns.route("/api/connectors/files") class ConnectorFiles(Resource): @api.expect(api.model("ConnectorFilesModel", {"provider": fields.String(required=True), "session_token": fields.String(required=True), "folder_id": fields.String(required=False), "limit": fields.Integer(required=False)})) @api.doc(description="List files from a connector provider") def post(self): try: data = request.get_json() provider = data.get('provider') session_token = data.get('session_token') folder_id = data.get('folder_id') limit = data.get('limit', 50) if not provider or not session_token: return make_response(jsonify({"success": False, "error": "provider and session_token are required"}), 400) loader = ConnectorCreator.create_connector(provider, session_token) documents = loader.load_data({ 'limit': limit, 'list_only': True, 'session_token': session_token, 'folder_id': folder_id }) files = [] for doc in documents[:limit]: metadata = doc.extra_info files.append({ 'id': doc.doc_id, 'name': metadata.get('file_name', 'Unknown File'), 'type': metadata.get('mime_type', 'unknown'), 'size': metadata.get('size', 'Unknown'), 'modifiedTime': metadata.get('modified_time', 'Unknown') }) return make_response(jsonify({"success": True, "files": files, "total": len(files)}), 200) except Exception as e: current_app.logger.error(f"Error loading connector files: {e}") return make_response(jsonify({"success": False, "error": f"Failed to load files: {str(e)}"}), 500) @connectors_ns.route("/api/connectors/validate-session") class ConnectorValidateSession(Resource): @api.expect(api.model("ConnectorValidateSessionModel", {"provider": fields.String(required=True), "session_token": fields.String(required=True)})) @api.doc(description="Validate connector session token and return user info") def post(self): try: from application.core.mongo_db import MongoDB data = request.get_json() provider = data.get('provider') session_token = data.get('session_token') if not provider or not session_token: return make_response(jsonify({"success": False, "error": "provider and session_token are required"}), 400) mongo = MongoDB.get_client() db = mongo[settings.MONGO_DB_NAME] collection_name = "connector_sessions" sessions_collection = db[collection_name] session = sessions_collection.find_one({"session_token": session_token}) if not session or "token_info" not in session: return make_response(jsonify({"success": False, "error": "Invalid or expired session"}), 401) token_info = session["token_info"] auth = ConnectorCreator.create_auth(provider) is_expired = auth.is_token_expired(token_info) return make_response(jsonify({ "success": True, "expired": is_expired, "user_email": session.get('user_email', 'Connected User') }), 200) except Exception as e: current_app.logger.error(f"Error validating connector session: {e}") return make_response(jsonify({"success": False, "error": str(e)}), 500) @connectors_ns.route("/api/connectors/disconnect") class ConnectorDisconnect(Resource): @api.expect(api.model("ConnectorDisconnectModel", {"provider": fields.String(required=True), "session_token": fields.String(required=False)})) @api.doc(description="Disconnect a connector session") def post(self): try: from application.core.mongo_db import MongoDB data = request.get_json() provider = data.get('provider') session_token = data.get('session_token') if not provider: return make_response(jsonify({"success": False, "error": "provider is required"}), 400) mongo = MongoDB.get_client() db = mongo[settings.MONGO_DB_NAME] collection_name = "connector_sessions" sessions_collection = db[collection_name] if session_token: sessions_collection.delete_one({"session_token": session_token}) return make_response(jsonify({"success": True}), 200) except Exception as e: current_app.logger.error(f"Error disconnecting connector session: {e}") return make_response(jsonify({"success": False, "error": str(e)}), 500)