diff --git a/application/api/connector/routes.py b/application/api/connector/routes.py
new file mode 100644
index 00000000..df4c73f4
--- /dev/null
+++ b/application/api/connector/routes.py
@@ -0,0 +1,535 @@
+import datetime
+import json
+import os
+from functools import wraps
+from bson.objectid import ObjectId
+from flask import (
+ Blueprint,
+ current_app,
+ jsonify,
+ make_response,
+ request
+)
+from flask_restx import fields, Namespace, Resource
+
+
+from application.agents.tools.tool_manager import ToolManager
+
+from application.api.user.tasks import (
+ ingest_connector_task,
+)
+from application.core.mongo_db import MongoDB
+from application.core.settings import settings
+from application.api import api
+from application.storage.storage_creator import StorageCreator
+from application.tts.google_tts import GoogleTTS
+from application.utils import (
+ check_required_fields
+)
+from application.utils import num_tokens_from_string
+from application.vectorstore.vector_creator import VectorCreator
+from application.parser.connectors.connector_creator import ConnectorCreator
+
+storage = StorageCreator.get_storage()
+
+mongo = MongoDB.get_client()
+db = mongo[settings.MONGO_DB_NAME]
+sources_collection = db["sources"]
+
+connector = Blueprint("connector", __name__)
+connectors_ns = Namespace("connectors", description="Connector operations", path="/")
+api.add_namespace(connectors_ns)
+
+current_dir = os.path.dirname(
+ os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+)
+
+
+@connectors_ns.route("/api/connectors/upload")
+class UploadConnector(Resource):
+ @api.expect(
+ api.model(
+ "ConnectorUploadModel",
+ {
+ "user": fields.String(required=True, description="User ID"),
+ "source": fields.String(
+ required=True, description="Source type (google_drive, github, etc.)"
+ ),
+ "name": fields.String(required=True, description="Job name"),
+ "data": fields.String(required=True, description="Configuration data"),
+ "repo_url": fields.String(description="GitHub repository URL"),
+ },
+ )
+ )
+ @api.doc(
+ description="Uploads connector source for vectorization",
+ )
+ def post(self):
+ decoded_token = request.decoded_token
+ if not decoded_token:
+ return make_response(jsonify({"success": False}), 401)
+ data = request.form
+ required_fields = ["user", "source", "name", "data"]
+ missing_fields = check_required_fields(data, required_fields)
+ if missing_fields:
+ return missing_fields
+ try:
+ config = json.loads(data["data"])
+ source_data = None
+
+ if data["source"] == "github":
+ source_data = config.get("repo_url")
+ elif data["source"] in ["crawler", "url"]:
+ source_data = config.get("url")
+ elif data["source"] == "reddit":
+ source_data = config
+ elif data["source"] in ConnectorCreator.get_supported_connectors():
+ session_token = config.get("session_token")
+ if not session_token:
+ return make_response(jsonify({
+ "success": False,
+ "error": f"Missing session_token in {data['source']} configuration"
+ }), 400)
+
+ file_ids = config.get("file_ids", [])
+ if isinstance(file_ids, str):
+ file_ids = [id.strip() for id in file_ids.split(',') if id.strip()]
+ elif not isinstance(file_ids, list):
+ file_ids = []
+
+ folder_ids = config.get("folder_ids", [])
+ if isinstance(folder_ids, str):
+ folder_ids = [id.strip() for id in folder_ids.split(',') if id.strip()]
+ elif not isinstance(folder_ids, list):
+ folder_ids = []
+
+ config["file_ids"] = file_ids
+ config["folder_ids"] = folder_ids
+
+ task = ingest_connector_task.delay(
+ job_name=data["name"],
+ user=decoded_token.get("sub"),
+ source_type=data["source"],
+ session_token=session_token,
+ file_ids=file_ids,
+ folder_ids=folder_ids,
+ recursive=config.get("recursive", False),
+ retriever=config.get("retriever", "classic")
+ )
+ return make_response(jsonify({"success": True, "task_id": task.id}), 200)
+ task = ingest_connector_task.delay(
+ source_data=source_data,
+ job_name=data["name"],
+ user=decoded_token.get("sub"),
+ loader=data["source"],
+ )
+ except Exception as err:
+ current_app.logger.error(
+ f"Error uploading connector source: {err}", exc_info=True
+ )
+ return make_response(jsonify({"success": False}), 400)
+ return make_response(jsonify({"success": True, "task_id": task.id}), 200)
+
+
+@connectors_ns.route("/api/connectors/task_status")
+class ConnectorTaskStatus(Resource):
+ task_status_model = api.model(
+ "ConnectorTaskStatusModel",
+ {"task_id": fields.String(required=True, description="Task ID")},
+ )
+
+ @api.expect(task_status_model)
+ @api.doc(description="Get connector task status")
+ def get(self):
+ task_id = request.args.get("task_id")
+ if not task_id:
+ return make_response(
+ jsonify({"success": False, "message": "Task ID is required"}), 400
+ )
+ try:
+ from application.celery_init import celery
+
+ task = celery.AsyncResult(task_id)
+ task_meta = task.info
+ print(f"Task status: {task.status}")
+ if not isinstance(
+ task_meta, (dict, list, str, int, float, bool, type(None))
+ ):
+ task_meta = str(task_meta)
+ except Exception as err:
+ current_app.logger.error(f"Error getting task status: {err}", exc_info=True)
+ return make_response(jsonify({"success": False}), 400)
+ return make_response(jsonify({"status": task.status, "result": task_meta}), 200)
+
+
+@connectors_ns.route("/api/connectors/sources")
+class ConnectorSources(Resource):
+ @api.doc(description="Get connector sources")
+ def get(self):
+ decoded_token = request.decoded_token
+ if not decoded_token:
+ return make_response(jsonify({"success": False}), 401)
+ user = decoded_token.get("sub")
+ try:
+ sources = sources_collection.find({"user": user, "type": "connector"}).sort("date", -1)
+ connector_sources = []
+ for source in sources:
+ connector_sources.append({
+ "id": str(source["_id"]),
+ "name": source.get("name"),
+ "date": source.get("date"),
+ "type": source.get("type"),
+ "source": source.get("source"),
+ "tokens": source.get("tokens", ""),
+ "retriever": source.get("retriever", "classic"),
+ "syncFrequency": source.get("sync_frequency", ""),
+ })
+ except Exception as err:
+ current_app.logger.error(f"Error retrieving connector sources: {err}", exc_info=True)
+ return make_response(jsonify({"success": False}), 400)
+ return make_response(jsonify(connector_sources), 200)
+
+
+@connectors_ns.route("/api/connectors/delete")
+class DeleteConnectorSource(Resource):
+ @api.doc(
+ description="Delete a connector source",
+ params={"source_id": "The source ID to delete"},
+ )
+ def delete(self):
+ decoded_token = request.decoded_token
+ if not decoded_token:
+ return make_response(jsonify({"success": False}), 401)
+ source_id = request.args.get("source_id")
+ if not source_id:
+ return make_response(
+ jsonify({"success": False, "message": "source_id is required"}), 400
+ )
+ try:
+ result = sources_collection.delete_one(
+ {"_id": ObjectId(source_id), "user": decoded_token.get("sub")}
+ )
+ if result.deleted_count == 0:
+ return make_response(
+ jsonify({"success": False, "message": "Source not found"}), 404
+ )
+ except Exception as err:
+ current_app.logger.error(
+ f"Error deleting connector source: {err}", exc_info=True
+ )
+ return make_response(jsonify({"success": False}), 400)
+ return make_response(jsonify({"success": True}), 200)
+
+
+@connectors_ns.route("/api/connectors/auth")
+class ConnectorAuth(Resource):
+ @api.doc(description="Get connector OAuth authorization URL", params={"provider": "Connector provider (e.g., google_drive)"})
+ def get(self):
+ try:
+ provider = request.args.get('provider') or request.args.get('source')
+ if not provider:
+ return make_response(jsonify({"success": False, "error": "Missing provider"}), 400)
+
+ if not ConnectorCreator.is_supported(provider):
+ return make_response(jsonify({"success": False, "error": f"Unsupported provider: {provider}"}), 400)
+
+ import uuid
+ state = str(uuid.uuid4())
+ auth = ConnectorCreator.create_auth(provider)
+ authorization_url = auth.get_authorization_url(state=state)
+ return make_response(jsonify({
+ "success": True,
+ "authorization_url": authorization_url,
+ "state": state
+ }), 200)
+ except Exception as e:
+ current_app.logger.error(f"Error generating connector auth URL: {e}")
+ return make_response(jsonify({"success": False, "error": str(e)}), 500)
+
+
+@connectors_ns.route("/api/connectors/callback")
+class ConnectorsCallback(Resource):
+ @api.doc(description="Handle OAuth callback for external connectors")
+ def get(self):
+ """Handle OAuth callback for external connectors"""
+ try:
+ from application.parser.connectors.connector_creator import ConnectorCreator
+ from flask import request
+ import uuid
+
+ authorization_code = request.args.get('code')
+ _ = request.args.get('state')
+ error = request.args.get('error')
+
+ if error:
+ return make_response(
+ jsonify({"success": False, "error": f"OAuth error: {error}. Please try again and make sure to grant all requested permissions, including offline access."}), 400
+ )
+
+ if not authorization_code:
+ return make_response(
+ jsonify({"success": False, "error": "Authorization code not provided. Please complete the authorization process and make sure to grant offline access."}), 400
+ )
+
+ try:
+ auth = ConnectorCreator.create_auth("google_drive")
+ token_info = auth.exchange_code_for_tokens(authorization_code)
+ current_app.logger.info(f"Token info received from OAuth callback - has refresh_token: {bool(token_info.get('refresh_token'))}, "
+ f"has access_token: {bool(token_info.get('access_token'))}, "
+ f"expiry: {token_info.get('expiry')}")
+
+ safe_token_info = {k: v for k, v in token_info.items() if k not in ['access_token', 'refresh_token', 'client_secret']}
+ current_app.logger.info(f"Full token info structure: {safe_token_info}")
+
+ # Validate that we got token info
+ if not token_info:
+ current_app.logger.error("exchange_code_for_tokens returned None or empty result")
+ return make_response(
+ jsonify({"success": False, "error": "Failed to exchange authorization code for tokens. Please try again and make sure to grant all requested permissions, including offline access."}), 400
+ )
+
+ # Validate required fields in token_info
+ required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret']
+ missing_fields = [field for field in required_fields if not token_info.get(field)]
+ if missing_fields:
+ current_app.logger.error(f"Token info missing required fields: {missing_fields}")
+ return make_response(
+ jsonify({"success": False, "error": f"Token information incomplete. Missing fields: {missing_fields}. Please try again and make sure to grant all requested permissions."}), 400
+ )
+
+ if not token_info.get('refresh_token'):
+ current_app.logger.warning("OAuth flow did not return a refresh token - user will need to re-authenticate when token expires")
+
+ required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret']
+ missing_fields = [field for field in required_fields if not token_info.get(field)]
+ if missing_fields:
+ current_app.logger.error(f"Token info missing required fields: {missing_fields}")
+ return make_response(
+ jsonify({"success": False, "error": f"Token info missing required fields: {missing_fields}"}), 400
+ )
+
+ except Exception as e:
+ current_app.logger.error(f"Error exchanging code for tokens: {str(e)}", exc_info=True)
+
+ if 'refresh' in str(e).lower():
+ current_app.logger.warning(f"Missing refresh token but continuing: {str(e)}")
+
+ else:
+ return make_response(
+ jsonify({"success": False, "error": f"Failed to exchange authorization code for tokens: {str(e)}"}), 400
+ )
+
+ try:
+ credentials = auth.create_credentials_from_token_info(token_info)
+ service = auth.build_drive_service(credentials)
+ user_info = service.about().get(fields="user").execute()
+ user_email = user_info.get('user', {}).get('emailAddress', 'Connected User')
+ except Exception as e:
+ current_app.logger.warning(f"Could not get user info: {e}")
+ if token_info.get('access_token'):
+ try:
+ import requests
+ headers = {'Authorization': f'Bearer {token_info["access_token"]}'}
+ response = requests.get(
+ 'https://www.googleapis.com/drive/v3/about?fields=user',
+ headers=headers
+ )
+ if response.status_code == 200:
+ user_info = response.json()
+ user_email = user_info.get('user', {}).get('emailAddress', 'Connected User')
+ else:
+ user_email = 'Connected User'
+ except Exception as request_error:
+ current_app.logger.warning(f"Could not get user info via direct request: {request_error}")
+ user_email = 'Connected User'
+ else:
+ user_email = 'Connected User'
+
+ session_token = str(uuid.uuid4())
+
+ from application.core.mongo_db import MongoDB
+ mongo = MongoDB.get_client()
+ db = mongo[settings.MONGO_DB_NAME]
+ sessions_collection = db["connector_sessions"]
+
+ sanitized_token_info = {
+ "access_token": token_info.get("access_token"),
+ "refresh_token": token_info.get("refresh_token"),
+ "token_uri": token_info.get("token_uri"),
+ "expiry": token_info.get("expiry"),
+ "scopes": token_info.get("scopes")
+ }
+
+ sessions_collection.insert_one({
+ "session_token": session_token,
+ "token_info": sanitized_token_info,
+ "created_at": datetime.datetime.now(datetime.timezone.utc),
+ "user_email": user_email
+ })
+
+ return make_response(
+ jsonify({
+ "success": True,
+ "message": "Google Drive authentication successful",
+ "session_token": session_token,
+ "user_email": user_email
+ }),
+ 200
+ )
+
+ except Exception as e:
+ current_app.logger.error(f"Error handling connector callback: {e}")
+ return make_response(
+ jsonify({
+ "success": False,
+ "error": f"Failed to complete connector authentication: {str(e)}. Please try again and make sure to grant all requested permissions, including offline access."
+ }), 500
+ )
+
+
+@connectors_ns.route("/api/connectors/refresh")
+class ConnectorRefresh(Resource):
+ @api.expect(api.model("ConnectorRefreshModel", {"provider": fields.String(required=True), "refresh_token": fields.String(required=True)}))
+ @api.doc(description="Refresh connector access token")
+ def post(self):
+ try:
+ data = request.get_json()
+ provider = data.get('provider')
+ refresh_token = data.get('refresh_token')
+
+ if not provider or not refresh_token:
+ return make_response(jsonify({"success": False, "error": "provider and refresh_token are required"}), 400)
+
+ auth = ConnectorCreator.create_auth(provider)
+ token_info = auth.refresh_access_token(refresh_token)
+ return make_response(jsonify({"success": True, "token_info": token_info}), 200)
+ except Exception as e:
+ current_app.logger.error(f"Error refreshing token for connector: {e}")
+ return make_response(jsonify({"success": False, "error": str(e)}), 500)
+
+
+@connectors_ns.route("/api/connectors/files")
+class ConnectorFiles(Resource):
+ @api.expect(api.model("ConnectorFilesModel", {"provider": fields.String(required=True), "session_token": fields.String(required=True), "folder_id": fields.String(required=False), "limit": fields.Integer(required=False)}))
+ @api.doc(description="List files from a connector provider")
+ def post(self):
+ try:
+ data = request.get_json()
+ provider = data.get('provider')
+ session_token = data.get('session_token')
+ folder_id = data.get('folder_id')
+ limit = data.get('limit', 50)
+ if not provider or not session_token:
+ return make_response(jsonify({"success": False, "error": "provider and session_token are required"}), 400)
+
+ loader = ConnectorCreator.create_connector(provider, session_token)
+ documents = loader.load_data({
+ 'limit': limit,
+ 'list_only': True,
+ 'session_token': session_token,
+ 'folder_id': folder_id
+ })
+
+ files = []
+ for doc in documents[:limit]:
+ metadata = doc.extra_info
+ files.append({
+ 'id': doc.doc_id,
+ 'name': metadata.get('file_name', 'Unknown File'),
+ 'type': metadata.get('mime_type', 'unknown'),
+ 'size': metadata.get('size', 'Unknown'),
+ 'modifiedTime': metadata.get('modified_time', 'Unknown'),
+ 'iconUrl': get_file_icon(metadata.get('mime_type', ''))
+ })
+
+ return make_response(jsonify({"success": True, "files": files, "total": len(files)}), 200)
+ except Exception as e:
+ current_app.logger.error(f"Error loading connector files: {e}")
+ return make_response(jsonify({"success": False, "error": f"Failed to load files: {str(e)}"}), 500)
+
+
+@connectors_ns.route("/api/connectors/validate-session")
+class ConnectorValidateSession(Resource):
+ @api.expect(api.model("ConnectorValidateSessionModel", {"provider": fields.String(required=True), "session_token": fields.String(required=True)}))
+ @api.doc(description="Validate connector session token and return user info")
+ def post(self):
+ try:
+ from application.core.mongo_db import MongoDB
+ data = request.get_json()
+ provider = data.get('provider')
+ session_token = data.get('session_token')
+ if not provider or not session_token:
+ return make_response(jsonify({"success": False, "error": "provider and session_token are required"}), 400)
+
+ mongo = MongoDB.get_client()
+ db = mongo[settings.MONGO_DB_NAME]
+ collection_name = "connector_sessions"
+ sessions_collection = db[collection_name]
+
+ session = sessions_collection.find_one({"session_token": session_token})
+ if not session or "token_info" not in session:
+ return make_response(jsonify({"success": False, "error": "Invalid or expired session"}), 401)
+
+ token_info = session["token_info"]
+ auth = ConnectorCreator.create_auth(provider)
+ is_expired = auth.is_token_expired(token_info)
+
+ return make_response(jsonify({
+ "success": True,
+ "expired": is_expired,
+ "user_email": session.get('user_email', 'Connected User')
+ }), 200)
+ except Exception as e:
+ current_app.logger.error(f"Error validating connector session: {e}")
+ return make_response(jsonify({"success": False, "error": str(e)}), 500)
+
+
+@connectors_ns.route("/api/connectors/disconnect")
+class ConnectorDisconnect(Resource):
+ @api.expect(api.model("ConnectorDisconnectModel", {"provider": fields.String(required=True), "session_token": fields.String(required=False)}))
+ @api.doc(description="Disconnect a connector session")
+ def post(self):
+ try:
+ from application.core.mongo_db import MongoDB
+ data = request.get_json()
+ provider = data.get('provider')
+ session_token = data.get('session_token')
+ if not provider:
+ return make_response(jsonify({"success": False, "error": "provider is required"}), 400)
+
+ mongo = MongoDB.get_client()
+ db = mongo[settings.MONGO_DB_NAME]
+ collection_name = "connector_sessions"
+ sessions_collection = db[collection_name]
+
+ if session_token:
+ sessions_collection.delete_one({"session_token": session_token})
+
+ return make_response(jsonify({"success": True}), 200)
+ except Exception as e:
+ current_app.logger.error(f"Error disconnecting connector session: {e}")
+ return make_response(jsonify({"success": False, "error": str(e)}), 500)
+
+
+def get_file_icon(mime_type):
+ """Return appropriate icon URL based on file MIME type"""
+ icon_map = {
+ 'application/vnd.google-apps.document': '/icons/google-docs.png',
+ 'application/vnd.google-apps.spreadsheet': '/icons/google-sheets.png',
+ 'application/vnd.google-apps.presentation': '/icons/google-slides.png',
+ 'application/pdf': '/icons/pdf.png',
+ 'text/plain': '/icons/text.png',
+ 'application/msword': '/icons/word.png',
+ 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '/icons/word.png',
+ 'application/vnd.ms-excel': '/icons/excel.png',
+ 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '/icons/excel.png',
+ 'application/vnd.ms-powerpoint': '/icons/powerpoint.png',
+ 'application/vnd.openxmlformats-officedocument.presentationml.presentation': '/icons/powerpoint.png',
+ 'image/jpeg': '/icons/image.png',
+ 'image/png': '/icons/image.png',
+ 'image/gif': '/icons/image.png',
+ 'video/mp4': '/icons/video.png',
+ 'application/zip': '/icons/archive.png',
+ 'application/x-zip-compressed': '/icons/archive.png',
+ }
+ return icon_map.get(mime_type, '/icons/generic-file.png')
diff --git a/application/api/user/routes.py b/application/api/user/routes.py
index a016155b..0bf6aa2f 100644
--- a/application/api/user/routes.py
+++ b/application/api/user/routes.py
@@ -47,6 +47,7 @@ from application.utils import (
)
from application.utils import num_tokens_from_string
from application.vectorstore.vector_creator import VectorCreator
+from application.parser.connectors.connector_creator import ConnectorCreator
storage = StorageCreator.get_storage()
@@ -493,9 +494,9 @@ class DeleteOldIndexes(Resource):
)
if not doc:
return make_response(jsonify({"status": "not found"}), 404)
-
+
storage = StorageCreator.get_storage()
-
+
try:
# Delete vector index
if settings.VECTOR_STORE == "faiss":
@@ -509,7 +510,7 @@ class DeleteOldIndexes(Resource):
settings.VECTOR_STORE, source_id=str(doc["_id"])
)
vectorstore.delete_index()
-
+
if "file_path" in doc and doc["file_path"]:
file_path = doc["file_path"]
if storage.is_directory(file_path):
@@ -518,7 +519,7 @@ class DeleteOldIndexes(Resource):
storage.delete_file(f)
else:
storage.delete_file(file_path)
-
+
except FileNotFoundError:
pass
except Exception as err:
@@ -526,7 +527,7 @@ class DeleteOldIndexes(Resource):
f"Error deleting files and indexes: {err}", exc_info=True
)
return make_response(jsonify({"success": False}), 400)
-
+
sources_collection.delete_one({"_id": ObjectId(source_id)})
return make_response(jsonify({"success": True}), 200)
@@ -574,30 +575,30 @@ class UploadFile(Resource):
try:
storage = StorageCreator.get_storage()
-
-
+
+
for file in files:
original_filename = file.filename
safe_file = safe_filename(original_filename)
-
+
with tempfile.TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, safe_file)
file.save(temp_file_path)
-
+
if zipfile.is_zipfile(temp_file_path):
try:
with zipfile.ZipFile(temp_file_path, 'r') as zip_ref:
zip_ref.extractall(path=temp_dir)
-
+
# Walk through extracted files and upload them
for root, _, files in os.walk(temp_dir):
for extracted_file in files:
if os.path.join(root, extracted_file) == temp_file_path:
continue
-
+
rel_path = os.path.relpath(os.path.join(root, extracted_file), temp_dir)
storage_path = f"{base_path}/{rel_path}"
-
+
with open(os.path.join(root, extracted_file), 'rb') as f:
storage.save_file(f, storage_path)
except Exception as e:
@@ -611,7 +612,7 @@ class UploadFile(Resource):
file_path = f"{base_path}/{safe_file}"
with open(temp_file_path, 'rb') as f:
storage.save_file(f, file_path)
-
+
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
@@ -687,8 +688,8 @@ class ManageSourceFiles(Resource):
try:
storage = StorageCreator.get_storage()
source_file_path = source.get("file_path", "")
- parent_dir = request.form.get("parent_dir", "")
-
+ parent_dir = request.form.get("parent_dir", "")
+
if parent_dir and (parent_dir.startswith("/") or ".." in parent_dir):
return make_response(
jsonify({"success": False, "message": "Invalid parent directory path"}), 400
@@ -702,7 +703,7 @@ class ManageSourceFiles(Resource):
)
added_files = []
-
+
target_dir = source_file_path
if parent_dir:
target_dir = f"{source_file_path}/{parent_dir}"
@@ -878,44 +879,35 @@ class UploadRemote(Resource):
source_data = config.get("url")
elif data["source"] == "reddit":
source_data = config
- elif data["source"] == "google_drive":
- if "session_token" not in config:
- return make_response(jsonify({
- "success": False,
- "error": "Missing session_token in Google Drive configuration"
- }), 400)
-
+ elif data["source"] in ConnectorCreator.get_supported_connectors():
session_token = config.get("session_token")
-
+ if not session_token:
+ return make_response(jsonify({
+ "success": False,
+ "error": f"Missing session_token in {data['source']} configuration"
+ }), 400)
+
# Process file_ids
file_ids = config.get("file_ids", [])
if isinstance(file_ids, str):
file_ids = [id.strip() for id in file_ids.split(',') if id.strip()]
elif not isinstance(file_ids, list):
file_ids = []
-
+ # Process folder_ids
folder_ids = config.get("folder_ids", [])
if isinstance(folder_ids, str):
folder_ids = [id.strip() for id in folder_ids.split(',') if id.strip()]
elif not isinstance(folder_ids, list):
folder_ids = []
-
- # Ensure at least one file or folder is selected
- if not file_ids and not folder_ids:
- return make_response(jsonify({
- "success": False,
- "error": "No files or folders selected"
- }), 400)
-
+
config["file_ids"] = file_ids
config["folder_ids"] = folder_ids
-
- from application.api.user.tasks import ingest_connector_task
+
task = ingest_connector_task.delay(
job_name=data["name"],
user=decoded_token.get("sub"),
- source_type="google_drive",
+ source_type=data["source"],
session_token=session_token,
file_ids=file_ids,
folder_ids=folder_ids,
@@ -1453,7 +1445,7 @@ class CreateAgent(Resource):
except json.JSONDecodeError:
data["json_schema"] = None
print(f"Received data: {data}")
-
+
# Validate JSON schema if provided
if data.get("json_schema"):
try:
@@ -1461,19 +1453,19 @@ class CreateAgent(Resource):
json_schema = data.get("json_schema")
if not isinstance(json_schema, dict):
return make_response(
- jsonify({"success": False, "message": "JSON schema must be a valid JSON object"}),
+ jsonify({"success": False, "message": "JSON schema must be a valid JSON object"}),
400
)
-
+
# Validate that it has either a 'schema' property or is itself a schema
if "schema" not in json_schema and "type" not in json_schema:
return make_response(
- jsonify({"success": False, "message": "JSON schema must contain either a 'schema' property or be a valid JSON schema with 'type' property"}),
+ jsonify({"success": False, "message": "JSON schema must contain either a 'schema' property or be a valid JSON schema with 'type' property"}),
400
)
except Exception as e:
return make_response(
- jsonify({"success": False, "message": f"Invalid JSON schema: {str(e)}"}),
+ jsonify({"success": False, "message": f"Invalid JSON schema: {str(e)}"}),
400
)
@@ -3607,7 +3599,7 @@ class GetChunks(Resource):
try:
store = get_vector_store(doc_id)
chunks = store.get_chunks()
-
+
filtered_chunks = []
for chunk in chunks:
metadata = chunk.get("metadata", {})
@@ -3628,9 +3620,9 @@ class GetChunks(Resource):
continue
filtered_chunks.append(chunk)
-
+
chunks = filtered_chunks
-
+
total_chunks = len(chunks)
start = (page - 1) * per_page
end = start + per_page
@@ -3951,27 +3943,27 @@ class DirectoryStructure(Resource):
decoded_token = request.decoded_token
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
-
+
user = decoded_token.get("sub")
doc_id = request.args.get("id")
-
+
if not doc_id:
return make_response(
jsonify({"error": "Document ID is required"}), 400
)
-
+
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid document ID"}), 400)
-
+
try:
doc = sources_collection.find_one({"_id": ObjectId(doc_id), "user": user})
if not doc:
return make_response(
jsonify({"error": "Document not found or access denied"}), 404
)
-
+
directory_structure = doc.get("directory_structure", {})
-
+
return make_response(
jsonify({
"success": True,
@@ -3979,7 +3971,7 @@ class DirectoryStructure(Resource):
"base_path": doc.get("file_path", "")
}), 200
)
-
+
except Exception as e:
current_app.logger.error(
f"Error retrieving directory structure: {e}", exc_info=True
@@ -3989,409 +3981,4 @@ class DirectoryStructure(Resource):
)
-@user_ns.route("/api/google-drive/auth")
-class GoogleDriveAuth(Resource):
- @api.doc(description="Get Google Drive OAuth authorization URL")
- def get(self):
- """Get Google Drive OAuth authorization URL"""
- try:
- from application.parser.connectors.connector_creator import ConnectorCreator
- auth = ConnectorCreator.create_auth("google_drive")
-
- # Generate state parameter for CSRF protection
- import uuid
- state = str(uuid.uuid4())
-
- # Store state in session or database for validation
- # For now, we'll include it in the URL and validate on callback
- authorization_url = auth.get_authorization_url(state=state)
- current_app.logger.info(f"Generated authorization URL: {authorization_url}")
- return make_response(
- jsonify({
- "success": True,
- "authorization_url": authorization_url,
- "state": state
- }),
- 200
- )
-
- except Exception as e:
- current_app.logger.error(f"Error generating Google Drive auth URL: {e}")
- return make_response(
- jsonify({"success": False, "error": str(e)}), 500
- )
-
-
-@user_ns.route("/api/google-drive/callback")
-class GoogleDriveCallback(Resource):
- @api.doc(description="Handle Google Drive OAuth callback")
- def get(self):
- """Handle Google Drive OAuth callback"""
- try:
- from application.parser.connectors.connector_creator import ConnectorCreator
- from flask import request
- import uuid
-
- # Get authorization code and state from query parameters
- authorization_code = request.args.get('code')
- _ = request.args.get('state') # We don't currently use state, but capture it to avoid unused variable warning
- error = request.args.get('error')
-
- if error:
- return make_response(
- jsonify({"success": False, "error": f"OAuth error: {error}. Please try again and make sure to grant all requested permissions, including offline access."}), 400
- )
-
- if not authorization_code:
- return make_response(
- jsonify({"success": False, "error": "Authorization code not provided. Please complete the authorization process and make sure to grant offline access."}), 400
- )
-
- # Exchange code for tokens
- try:
- auth = ConnectorCreator.create_auth("google_drive")
- token_info = auth.exchange_code_for_tokens(authorization_code)
-
- # Log detailed information about the token_info we received
- current_app.logger.info(f"Token info received from OAuth callback - has refresh_token: {bool(token_info.get('refresh_token'))}, "
- f"has access_token: {bool(token_info.get('access_token'))}, "
- f"expiry: {token_info.get('expiry')}")
-
- # Log the full token_info structure (without sensitive data)
- safe_token_info = {k: v for k, v in token_info.items() if k not in ['access_token', 'refresh_token', 'client_secret']}
- current_app.logger.info(f"Full token info structure: {safe_token_info}")
-
- # Validate that we got token info
- if not token_info:
- current_app.logger.error("exchange_code_for_tokens returned None or empty result")
- return make_response(
- jsonify({"success": False, "error": "Failed to exchange authorization code for tokens. Please try again and make sure to grant all requested permissions, including offline access."}), 400
- )
-
- # Validate required fields in token_info
- required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret']
- missing_fields = [field for field in required_fields if not token_info.get(field)]
- if missing_fields:
- current_app.logger.error(f"Token info missing required fields: {missing_fields}")
- return make_response(
- jsonify({"success": False, "error": f"Token information incomplete. Missing fields: {missing_fields}. Please try again and make sure to grant all requested permissions."}), 400
- )
-
- # Check if refresh_token is present - this is critical for long-term access
- if not token_info.get('refresh_token'):
- return make_response(
- jsonify({
- "success": False,
- "error": "OAuth flow did not return a refresh token. This typically happens when offline access wasn't granted. "
- "Please reconnect your Google Drive account and ensure you grant offline access when prompted. "
- "Make sure to check 'Allow offline access' during the authorization process."
- }), 400
- )
-
- # Validate required fields in token_info
- required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret']
- missing_fields = [field for field in required_fields if not token_info.get(field)]
- if missing_fields:
- current_app.logger.error(f"Token info missing required fields: {missing_fields}")
- return make_response(
- jsonify({"success": False, "error": f"Token info missing required fields: {missing_fields}"}), 400
- )
-
- except Exception as e:
- current_app.logger.error(f"Error exchanging code for tokens: {e}", exc_info=True)
- return make_response(
- jsonify({"success": False, "error": f"Failed to exchange authorization code for tokens: {str(e)}"}), 400
- )
-
- # Get user information
- try:
- credentials = auth.create_credentials_from_token_info(token_info)
- service = auth.build_drive_service(credentials)
- user_info = service.about().get(fields="user").execute()
- user_email = user_info.get('user', {}).get('emailAddress', 'Connected User')
- except Exception as e:
- current_app.logger.warning(f"Could not get user info: {e}")
- # Try to get user info without building service if we have access token
- if token_info.get('access_token'):
- try:
- import requests
- headers = {'Authorization': f'Bearer {token_info["access_token"]}'}
- response = requests.get(
- 'https://www.googleapis.com/drive/v3/about?fields=user',
- headers=headers
- )
- if response.status_code == 200:
- user_info = response.json()
- user_email = user_info.get('user', {}).get('emailAddress', 'Connected User')
- else:
- user_email = 'Connected User'
- except Exception as request_error:
- current_app.logger.warning(f"Could not get user info via direct request: {request_error}")
- user_email = 'Connected User'
- else:
- user_email = 'Connected User'
-
- # Generate a session token
- session_token = str(uuid.uuid4())
-
- # Store token_info in MongoDB
- from application.core.mongo_db import MongoDB
- mongo = MongoDB.get_client()
- db = mongo[settings.MONGO_DB_NAME]
- sessions_collection = db["drive_sessions"]
-
- # Store only necessary token info, removing sensitive fields
- sanitized_token_info = {
- "access_token": token_info.get("access_token"),
- "refresh_token": token_info.get("refresh_token"),
- "token_uri": token_info.get("token_uri"),
- "expiry": token_info.get("expiry"),
- "scopes": token_info.get("scopes")
- }
-
- # Store the sanitized token info with the session token
- sessions_collection.insert_one({
- "session_token": session_token,
- "token_info": sanitized_token_info,
- "created_at": datetime.datetime.now(datetime.timezone.utc),
- "user_email": user_email
- })
-
- # Return only the session token and user email to the client
- return make_response(
- jsonify({
- "success": True,
- "message": "Google Drive authentication successful",
- "session_token": session_token,
- "user_email": user_email
- }),
- 200
- )
-
- except Exception as e:
- current_app.logger.error(f"Error handling Google Drive callback: {e}")
- return make_response(
- jsonify({
- "success": False,
- "error": f"Failed to complete Google Drive authentication: {str(e)}. Please try again and make sure to grant all requested permissions, including offline access."
- }), 500
- )
-
-
-@user_ns.route("/api/google-drive/refresh")
-class GoogleDriveRefresh(Resource):
- @api.expect(
- api.model(
- "GoogleDriveRefreshModel",
- {
- "refresh_token": fields.String(required=True, description="Refresh token")
- }
- )
- )
- @api.doc(description="Refresh Google Drive access token")
- def post(self):
- """Refresh Google Drive access token"""
- try:
- from application.parser.connectors.connector_creator import ConnectorCreator
-
- data = request.get_json()
- refresh_token = data.get('refresh_token')
-
- if not refresh_token:
- return make_response(
- jsonify({"success": False, "error": "Refresh token not provided"}), 400
- )
-
- auth = ConnectorCreator.create_auth("google_drive")
- token_info = auth.refresh_access_token(refresh_token)
-
- return make_response(
- jsonify({
- "success": True,
- "message": "Token refreshed successfully",
- "token_info": token_info
- }),
- 200
- )
-
- except Exception as e:
- current_app.logger.error(f"Error refreshing Google Drive token: {e}")
- return make_response(
- jsonify({
- "success": False,
- "error": f"Failed to refresh Google Drive token: {str(e)}. Please reconnect your Google Drive account and make sure to grant offline access."
- }), 500
- )
-
-
-@user_ns.route("/api/google-drive/files")
-class GoogleDriveFiles(Resource):
- @api.expect(
- api.model(
- "GoogleDriveFilesModel",
- {
- "session_token": fields.String(required=True, description="Google Drive session token"),
- "folder_id": fields.String(description="Google Drive folder ID to fetch files from. If not provided, fetches from root", required=False),
- "limit": fields.Integer(description="Maximum number of files to return", default=50)
- }
- )
- )
- @api.doc(description="Get list of files from Google Drive")
- def post(self):
- """Get list of files from Google Drive"""
- try:
- from application.parser.connectors.connector_creator import ConnectorCreator
-
- data = request.get_json()
- session_token = data.get('session_token')
- folder_id = data.get('folder_id')
- limit = data.get('limit', 50)
-
- if not session_token:
- return make_response(
- jsonify({"success": False, "error": "Session token not provided"}), 400
- )
-
- # Create Google Drive loader with session token only
- loader = ConnectorCreator.create_connector("google_drive", session_token)
-
- # Get files from Google Drive (limit to first N files, metadata only)
- files_config = {
- 'limit': limit,
- 'list_only': True,
- 'session_token': session_token,
- 'folder_id': folder_id
- }
- documents = loader.load_data(files_config)
-
- # Convert documents to file list format
- files = []
- for doc in documents[:limit]:
- # Use extra_info instead of doc_metadata
- metadata = doc.extra_info
- files.append({
- 'id': doc.doc_id,
- 'name': metadata.get('file_name', 'Unknown File'),
- 'type': metadata.get('mime_type', 'unknown'),
- 'size': metadata.get('size', 'Unknown'),
- 'modifiedTime': metadata.get('modified_time', 'Unknown'),
- 'iconUrl': get_file_icon(metadata.get('mime_type', ''))
- })
-
- return make_response(
- jsonify({
- "success": True,
- "files": files,
- "total": len(files)
- }),
- 200
- )
-
- except Exception as e:
- current_app.logger.error(f"Error loading Google Drive files: {e}")
- return make_response(
- jsonify({
- "success": False,
- "error": f"Failed to load files: {str(e)}. Please make sure your Google Drive account is properly connected and you granted offline access during authorization."
- }), 500
- )
-
-def get_file_icon(mime_type: str) -> str:
- """Get appropriate icon for file type"""
- if 'pdf' in mime_type:
- return '📄'
- elif 'word' in mime_type or 'document' in mime_type:
- return '📝'
- elif 'presentation' in mime_type or 'powerpoint' in mime_type:
- return '📊'
- elif 'spreadsheet' in mime_type or 'excel' in mime_type:
- return '📈'
- elif 'text' in mime_type:
- return '📄'
- elif 'image' in mime_type:
- return '🖼️'
- else:
- return '📄'
-
-@user_ns.route("/api/google-drive/validate-session")
-class GoogleDriveValidateSession(Resource):
- @api.expect(
- api.model(
- "GoogleDriveValidateSessionModel",
- {
- "session_token": fields.String(required=True, description="Google Drive session token")
- }
- )
- )
- @api.doc(description="Validate Google Drive session token")
- def post(self):
- """Validate Google Drive session token and return user info"""
- try:
- from application.core.mongo_db import MongoDB
- from application.parser.connectors.connector_creator import ConnectorCreator
-
- data = request.get_json()
- session_token = data.get('session_token')
-
- if not session_token:
- return make_response(
- jsonify({"success": False, "error": "Session token not provided"}), 400
- )
-
- # Retrieve session from MongoDB using session token
- mongo = MongoDB.get_client()
- db = mongo[settings.MONGO_DB_NAME]
- sessions_collection = db["drive_sessions"]
-
- session = sessions_collection.find_one({"session_token": session_token})
- if not session or "token_info" not in session:
- return make_response(
- jsonify({"success": False, "error": "Invalid or expired session"}), 401
- )
-
- # Get token info and check if it's expired
- token_info = session["token_info"]
- auth = ConnectorCreator.create_auth("google_drive")
-
- # Check if token is expired using our improved method
- is_expired = auth.is_token_expired(token_info)
-
- # Attempt to refresh token if needed
- if is_expired and 'refresh_token' in token_info:
- try:
- current_app.logger.info("Refreshing expired Google Drive token")
- refreshed_token_info = auth.refresh_access_token(token_info['refresh_token'])
-
- # Update token in database
- sessions_collection.update_one(
- {"session_token": session_token},
- {"$set": {"token_info": refreshed_token_info}}
- )
-
- # Use the refreshed token info
- token_info = refreshed_token_info
- except Exception as e:
- current_app.logger.error(f"Error refreshing token: {e}", exc_info=True)
- return make_response(
- jsonify({"success": False, "error": "Session expired and could not be refreshed"}), 401
- )
-
- # Return success with user email
- return make_response(
- jsonify({
- "success": True,
- "user_email": session.get("user_email", "Connected User"),
- "message": "Session is valid"
- }),
- 200
- )
-
- except Exception as e:
- current_app.logger.error(f"Error validating Google Drive session: {e}", exc_info=True)
- return make_response(
- jsonify({
- "success": False,
- "error": f"Failed to validate session: {str(e)}. Please reconnect your Google Drive account and make sure to grant offline access during authorization."
- }), 500
- )
diff --git a/application/app.py b/application/app.py
index 4159a2bb..489ec840 100644
--- a/application/app.py
+++ b/application/app.py
@@ -16,6 +16,7 @@ from application.api import api # noqa: E402
from application.api.answer import answer # noqa: E402
from application.api.internal.routes import internal # noqa: E402
from application.api.user.routes import user # noqa: E402
+from application.api.connector.routes import connector # noqa: E402
from application.celery_init import celery # noqa: E402
from application.core.settings import settings # noqa: E402
@@ -30,6 +31,7 @@ app = Flask(__name__)
app.register_blueprint(user)
app.register_blueprint(answer)
app.register_blueprint(internal)
+app.register_blueprint(connector)
app.config.update(
UPLOAD_FOLDER="inputs",
CELERY_BROKER_URL=settings.CELERY_BROKER_URL,
diff --git a/frontend/public/google-drive-callback.html b/frontend/public/google-drive-callback.html
index 0272af9a..d2113624 100644
--- a/frontend/public/google-drive-callback.html
+++ b/frontend/public/google-drive-callback.html
@@ -70,10 +70,10 @@
}
try {
- // Exchange code for tokens
- // Use the backend API URL directly since this is a static HTML file
const backendApiUrl = window.location.protocol + '//' + window.location.hostname + ':7091';
- const response = await fetch(backendApiUrl + '/api/google-drive/callback?' + window.location.search.substring(1));
+ const urlParams = new URLSearchParams(window.location.search);
+ urlParams.set('provider', 'google_drive');
+ const response = await fetch(backendApiUrl + '/api/connectors/callback?' + urlParams.toString());
const data = await response.json();
if (data.success) {
@@ -81,15 +81,13 @@
if (data.session_token) {
localStorage.setItem('google_drive_session_token', data.session_token);
}
-
- // Extract user email
let userEmail = data.user_email || 'Connected User';
statusDiv.className = 'success';
statusDiv.innerHTML = `Authentication successful as ${userEmail}!
You can close this window. Your Google Drive is now connected and ready to use.`;
- // Notify parent window with session token instead of token_info
+
if (window.opener) {
window.opener.postMessage({
type: 'google_drive_auth_success',
@@ -110,7 +108,6 @@
}
}
- // Run when page loads
handleCallback();