(feat:connectors) separate routes, namespace

This commit is contained in:
ManishMadan2882
2025-08-28 00:51:09 +05:30
parent 578c68205a
commit 4065041a9f
4 changed files with 584 additions and 463 deletions

View File

@@ -47,6 +47,7 @@ from application.utils import (
)
from application.utils import num_tokens_from_string
from application.vectorstore.vector_creator import VectorCreator
from application.parser.connectors.connector_creator import ConnectorCreator
storage = StorageCreator.get_storage()
@@ -493,9 +494,9 @@ class DeleteOldIndexes(Resource):
)
if not doc:
return make_response(jsonify({"status": "not found"}), 404)
storage = StorageCreator.get_storage()
try:
# Delete vector index
if settings.VECTOR_STORE == "faiss":
@@ -509,7 +510,7 @@ class DeleteOldIndexes(Resource):
settings.VECTOR_STORE, source_id=str(doc["_id"])
)
vectorstore.delete_index()
if "file_path" in doc and doc["file_path"]:
file_path = doc["file_path"]
if storage.is_directory(file_path):
@@ -518,7 +519,7 @@ class DeleteOldIndexes(Resource):
storage.delete_file(f)
else:
storage.delete_file(file_path)
except FileNotFoundError:
pass
except Exception as err:
@@ -526,7 +527,7 @@ class DeleteOldIndexes(Resource):
f"Error deleting files and indexes: {err}", exc_info=True
)
return make_response(jsonify({"success": False}), 400)
sources_collection.delete_one({"_id": ObjectId(source_id)})
return make_response(jsonify({"success": True}), 200)
@@ -574,30 +575,30 @@ class UploadFile(Resource):
try:
storage = StorageCreator.get_storage()
for file in files:
original_filename = file.filename
safe_file = safe_filename(original_filename)
with tempfile.TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, safe_file)
file.save(temp_file_path)
if zipfile.is_zipfile(temp_file_path):
try:
with zipfile.ZipFile(temp_file_path, 'r') as zip_ref:
zip_ref.extractall(path=temp_dir)
# Walk through extracted files and upload them
for root, _, files in os.walk(temp_dir):
for extracted_file in files:
if os.path.join(root, extracted_file) == temp_file_path:
continue
rel_path = os.path.relpath(os.path.join(root, extracted_file), temp_dir)
storage_path = f"{base_path}/{rel_path}"
with open(os.path.join(root, extracted_file), 'rb') as f:
storage.save_file(f, storage_path)
except Exception as e:
@@ -611,7 +612,7 @@ class UploadFile(Resource):
file_path = f"{base_path}/{safe_file}"
with open(temp_file_path, 'rb') as f:
storage.save_file(f, file_path)
task = ingest.delay(
settings.UPLOAD_FOLDER,
[
@@ -687,8 +688,8 @@ class ManageSourceFiles(Resource):
try:
storage = StorageCreator.get_storage()
source_file_path = source.get("file_path", "")
parent_dir = request.form.get("parent_dir", "")
parent_dir = request.form.get("parent_dir", "")
if parent_dir and (parent_dir.startswith("/") or ".." in parent_dir):
return make_response(
jsonify({"success": False, "message": "Invalid parent directory path"}), 400
@@ -702,7 +703,7 @@ class ManageSourceFiles(Resource):
)
added_files = []
target_dir = source_file_path
if parent_dir:
target_dir = f"{source_file_path}/{parent_dir}"
@@ -878,44 +879,35 @@ class UploadRemote(Resource):
source_data = config.get("url")
elif data["source"] == "reddit":
source_data = config
elif data["source"] == "google_drive":
if "session_token" not in config:
return make_response(jsonify({
"success": False,
"error": "Missing session_token in Google Drive configuration"
}), 400)
elif data["source"] in ConnectorCreator.get_supported_connectors():
session_token = config.get("session_token")
if not session_token:
return make_response(jsonify({
"success": False,
"error": f"Missing session_token in {data['source']} configuration"
}), 400)
# Process file_ids
file_ids = config.get("file_ids", [])
if isinstance(file_ids, str):
file_ids = [id.strip() for id in file_ids.split(',') if id.strip()]
elif not isinstance(file_ids, list):
file_ids = []
# Process folder_ids
folder_ids = config.get("folder_ids", [])
if isinstance(folder_ids, str):
folder_ids = [id.strip() for id in folder_ids.split(',') if id.strip()]
elif not isinstance(folder_ids, list):
folder_ids = []
# Ensure at least one file or folder is selected
if not file_ids and not folder_ids:
return make_response(jsonify({
"success": False,
"error": "No files or folders selected"
}), 400)
config["file_ids"] = file_ids
config["folder_ids"] = folder_ids
from application.api.user.tasks import ingest_connector_task
task = ingest_connector_task.delay(
job_name=data["name"],
user=decoded_token.get("sub"),
source_type="google_drive",
source_type=data["source"],
session_token=session_token,
file_ids=file_ids,
folder_ids=folder_ids,
@@ -1453,7 +1445,7 @@ class CreateAgent(Resource):
except json.JSONDecodeError:
data["json_schema"] = None
print(f"Received data: {data}")
# Validate JSON schema if provided
if data.get("json_schema"):
try:
@@ -1461,19 +1453,19 @@ class CreateAgent(Resource):
json_schema = data.get("json_schema")
if not isinstance(json_schema, dict):
return make_response(
jsonify({"success": False, "message": "JSON schema must be a valid JSON object"}),
jsonify({"success": False, "message": "JSON schema must be a valid JSON object"}),
400
)
# Validate that it has either a 'schema' property or is itself a schema
if "schema" not in json_schema and "type" not in json_schema:
return make_response(
jsonify({"success": False, "message": "JSON schema must contain either a 'schema' property or be a valid JSON schema with 'type' property"}),
jsonify({"success": False, "message": "JSON schema must contain either a 'schema' property or be a valid JSON schema with 'type' property"}),
400
)
except Exception as e:
return make_response(
jsonify({"success": False, "message": f"Invalid JSON schema: {str(e)}"}),
jsonify({"success": False, "message": f"Invalid JSON schema: {str(e)}"}),
400
)
@@ -3607,7 +3599,7 @@ class GetChunks(Resource):
try:
store = get_vector_store(doc_id)
chunks = store.get_chunks()
filtered_chunks = []
for chunk in chunks:
metadata = chunk.get("metadata", {})
@@ -3628,9 +3620,9 @@ class GetChunks(Resource):
continue
filtered_chunks.append(chunk)
chunks = filtered_chunks
total_chunks = len(chunks)
start = (page - 1) * per_page
end = start + per_page
@@ -3951,27 +3943,27 @@ class DirectoryStructure(Resource):
decoded_token = request.decoded_token
if not decoded_token:
return make_response(jsonify({"success": False}), 401)
user = decoded_token.get("sub")
doc_id = request.args.get("id")
if not doc_id:
return make_response(
jsonify({"error": "Document ID is required"}), 400
)
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid document ID"}), 400)
try:
doc = sources_collection.find_one({"_id": ObjectId(doc_id), "user": user})
if not doc:
return make_response(
jsonify({"error": "Document not found or access denied"}), 404
)
directory_structure = doc.get("directory_structure", {})
return make_response(
jsonify({
"success": True,
@@ -3979,7 +3971,7 @@ class DirectoryStructure(Resource):
"base_path": doc.get("file_path", "")
}), 200
)
except Exception as e:
current_app.logger.error(
f"Error retrieving directory structure: {e}", exc_info=True
@@ -3989,409 +3981,4 @@ class DirectoryStructure(Resource):
)
@user_ns.route("/api/google-drive/auth")
class GoogleDriveAuth(Resource):
@api.doc(description="Get Google Drive OAuth authorization URL")
def get(self):
"""Get Google Drive OAuth authorization URL"""
try:
from application.parser.connectors.connector_creator import ConnectorCreator
auth = ConnectorCreator.create_auth("google_drive")
# Generate state parameter for CSRF protection
import uuid
state = str(uuid.uuid4())
# Store state in session or database for validation
# For now, we'll include it in the URL and validate on callback
authorization_url = auth.get_authorization_url(state=state)
current_app.logger.info(f"Generated authorization URL: {authorization_url}")
return make_response(
jsonify({
"success": True,
"authorization_url": authorization_url,
"state": state
}),
200
)
except Exception as e:
current_app.logger.error(f"Error generating Google Drive auth URL: {e}")
return make_response(
jsonify({"success": False, "error": str(e)}), 500
)
@user_ns.route("/api/google-drive/callback")
class GoogleDriveCallback(Resource):
@api.doc(description="Handle Google Drive OAuth callback")
def get(self):
"""Handle Google Drive OAuth callback"""
try:
from application.parser.connectors.connector_creator import ConnectorCreator
from flask import request
import uuid
# Get authorization code and state from query parameters
authorization_code = request.args.get('code')
_ = request.args.get('state') # We don't currently use state, but capture it to avoid unused variable warning
error = request.args.get('error')
if error:
return make_response(
jsonify({"success": False, "error": f"OAuth error: {error}. Please try again and make sure to grant all requested permissions, including offline access."}), 400
)
if not authorization_code:
return make_response(
jsonify({"success": False, "error": "Authorization code not provided. Please complete the authorization process and make sure to grant offline access."}), 400
)
# Exchange code for tokens
try:
auth = ConnectorCreator.create_auth("google_drive")
token_info = auth.exchange_code_for_tokens(authorization_code)
# Log detailed information about the token_info we received
current_app.logger.info(f"Token info received from OAuth callback - has refresh_token: {bool(token_info.get('refresh_token'))}, "
f"has access_token: {bool(token_info.get('access_token'))}, "
f"expiry: {token_info.get('expiry')}")
# Log the full token_info structure (without sensitive data)
safe_token_info = {k: v for k, v in token_info.items() if k not in ['access_token', 'refresh_token', 'client_secret']}
current_app.logger.info(f"Full token info structure: {safe_token_info}")
# Validate that we got token info
if not token_info:
current_app.logger.error("exchange_code_for_tokens returned None or empty result")
return make_response(
jsonify({"success": False, "error": "Failed to exchange authorization code for tokens. Please try again and make sure to grant all requested permissions, including offline access."}), 400
)
# Validate required fields in token_info
required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret']
missing_fields = [field for field in required_fields if not token_info.get(field)]
if missing_fields:
current_app.logger.error(f"Token info missing required fields: {missing_fields}")
return make_response(
jsonify({"success": False, "error": f"Token information incomplete. Missing fields: {missing_fields}. Please try again and make sure to grant all requested permissions."}), 400
)
# Check if refresh_token is present - this is critical for long-term access
if not token_info.get('refresh_token'):
return make_response(
jsonify({
"success": False,
"error": "OAuth flow did not return a refresh token. This typically happens when offline access wasn't granted. "
"Please reconnect your Google Drive account and ensure you grant offline access when prompted. "
"Make sure to check 'Allow offline access' during the authorization process."
}), 400
)
# Validate required fields in token_info
required_fields = ['access_token', 'token_uri', 'client_id', 'client_secret']
missing_fields = [field for field in required_fields if not token_info.get(field)]
if missing_fields:
current_app.logger.error(f"Token info missing required fields: {missing_fields}")
return make_response(
jsonify({"success": False, "error": f"Token info missing required fields: {missing_fields}"}), 400
)
except Exception as e:
current_app.logger.error(f"Error exchanging code for tokens: {e}", exc_info=True)
return make_response(
jsonify({"success": False, "error": f"Failed to exchange authorization code for tokens: {str(e)}"}), 400
)
# Get user information
try:
credentials = auth.create_credentials_from_token_info(token_info)
service = auth.build_drive_service(credentials)
user_info = service.about().get(fields="user").execute()
user_email = user_info.get('user', {}).get('emailAddress', 'Connected User')
except Exception as e:
current_app.logger.warning(f"Could not get user info: {e}")
# Try to get user info without building service if we have access token
if token_info.get('access_token'):
try:
import requests
headers = {'Authorization': f'Bearer {token_info["access_token"]}'}
response = requests.get(
'https://www.googleapis.com/drive/v3/about?fields=user',
headers=headers
)
if response.status_code == 200:
user_info = response.json()
user_email = user_info.get('user', {}).get('emailAddress', 'Connected User')
else:
user_email = 'Connected User'
except Exception as request_error:
current_app.logger.warning(f"Could not get user info via direct request: {request_error}")
user_email = 'Connected User'
else:
user_email = 'Connected User'
# Generate a session token
session_token = str(uuid.uuid4())
# Store token_info in MongoDB
from application.core.mongo_db import MongoDB
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
sessions_collection = db["drive_sessions"]
# Store only necessary token info, removing sensitive fields
sanitized_token_info = {
"access_token": token_info.get("access_token"),
"refresh_token": token_info.get("refresh_token"),
"token_uri": token_info.get("token_uri"),
"expiry": token_info.get("expiry"),
"scopes": token_info.get("scopes")
}
# Store the sanitized token info with the session token
sessions_collection.insert_one({
"session_token": session_token,
"token_info": sanitized_token_info,
"created_at": datetime.datetime.now(datetime.timezone.utc),
"user_email": user_email
})
# Return only the session token and user email to the client
return make_response(
jsonify({
"success": True,
"message": "Google Drive authentication successful",
"session_token": session_token,
"user_email": user_email
}),
200
)
except Exception as e:
current_app.logger.error(f"Error handling Google Drive callback: {e}")
return make_response(
jsonify({
"success": False,
"error": f"Failed to complete Google Drive authentication: {str(e)}. Please try again and make sure to grant all requested permissions, including offline access."
}), 500
)
@user_ns.route("/api/google-drive/refresh")
class GoogleDriveRefresh(Resource):
@api.expect(
api.model(
"GoogleDriveRefreshModel",
{
"refresh_token": fields.String(required=True, description="Refresh token")
}
)
)
@api.doc(description="Refresh Google Drive access token")
def post(self):
"""Refresh Google Drive access token"""
try:
from application.parser.connectors.connector_creator import ConnectorCreator
data = request.get_json()
refresh_token = data.get('refresh_token')
if not refresh_token:
return make_response(
jsonify({"success": False, "error": "Refresh token not provided"}), 400
)
auth = ConnectorCreator.create_auth("google_drive")
token_info = auth.refresh_access_token(refresh_token)
return make_response(
jsonify({
"success": True,
"message": "Token refreshed successfully",
"token_info": token_info
}),
200
)
except Exception as e:
current_app.logger.error(f"Error refreshing Google Drive token: {e}")
return make_response(
jsonify({
"success": False,
"error": f"Failed to refresh Google Drive token: {str(e)}. Please reconnect your Google Drive account and make sure to grant offline access."
}), 500
)
@user_ns.route("/api/google-drive/files")
class GoogleDriveFiles(Resource):
@api.expect(
api.model(
"GoogleDriveFilesModel",
{
"session_token": fields.String(required=True, description="Google Drive session token"),
"folder_id": fields.String(description="Google Drive folder ID to fetch files from. If not provided, fetches from root", required=False),
"limit": fields.Integer(description="Maximum number of files to return", default=50)
}
)
)
@api.doc(description="Get list of files from Google Drive")
def post(self):
"""Get list of files from Google Drive"""
try:
from application.parser.connectors.connector_creator import ConnectorCreator
data = request.get_json()
session_token = data.get('session_token')
folder_id = data.get('folder_id')
limit = data.get('limit', 50)
if not session_token:
return make_response(
jsonify({"success": False, "error": "Session token not provided"}), 400
)
# Create Google Drive loader with session token only
loader = ConnectorCreator.create_connector("google_drive", session_token)
# Get files from Google Drive (limit to first N files, metadata only)
files_config = {
'limit': limit,
'list_only': True,
'session_token': session_token,
'folder_id': folder_id
}
documents = loader.load_data(files_config)
# Convert documents to file list format
files = []
for doc in documents[:limit]:
# Use extra_info instead of doc_metadata
metadata = doc.extra_info
files.append({
'id': doc.doc_id,
'name': metadata.get('file_name', 'Unknown File'),
'type': metadata.get('mime_type', 'unknown'),
'size': metadata.get('size', 'Unknown'),
'modifiedTime': metadata.get('modified_time', 'Unknown'),
'iconUrl': get_file_icon(metadata.get('mime_type', ''))
})
return make_response(
jsonify({
"success": True,
"files": files,
"total": len(files)
}),
200
)
except Exception as e:
current_app.logger.error(f"Error loading Google Drive files: {e}")
return make_response(
jsonify({
"success": False,
"error": f"Failed to load files: {str(e)}. Please make sure your Google Drive account is properly connected and you granted offline access during authorization."
}), 500
)
def get_file_icon(mime_type: str) -> str:
"""Get appropriate icon for file type"""
if 'pdf' in mime_type:
return '📄'
elif 'word' in mime_type or 'document' in mime_type:
return '📝'
elif 'presentation' in mime_type or 'powerpoint' in mime_type:
return '📊'
elif 'spreadsheet' in mime_type or 'excel' in mime_type:
return '📈'
elif 'text' in mime_type:
return '📄'
elif 'image' in mime_type:
return '🖼️'
else:
return '📄'
@user_ns.route("/api/google-drive/validate-session")
class GoogleDriveValidateSession(Resource):
@api.expect(
api.model(
"GoogleDriveValidateSessionModel",
{
"session_token": fields.String(required=True, description="Google Drive session token")
}
)
)
@api.doc(description="Validate Google Drive session token")
def post(self):
"""Validate Google Drive session token and return user info"""
try:
from application.core.mongo_db import MongoDB
from application.parser.connectors.connector_creator import ConnectorCreator
data = request.get_json()
session_token = data.get('session_token')
if not session_token:
return make_response(
jsonify({"success": False, "error": "Session token not provided"}), 400
)
# Retrieve session from MongoDB using session token
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
sessions_collection = db["drive_sessions"]
session = sessions_collection.find_one({"session_token": session_token})
if not session or "token_info" not in session:
return make_response(
jsonify({"success": False, "error": "Invalid or expired session"}), 401
)
# Get token info and check if it's expired
token_info = session["token_info"]
auth = ConnectorCreator.create_auth("google_drive")
# Check if token is expired using our improved method
is_expired = auth.is_token_expired(token_info)
# Attempt to refresh token if needed
if is_expired and 'refresh_token' in token_info:
try:
current_app.logger.info("Refreshing expired Google Drive token")
refreshed_token_info = auth.refresh_access_token(token_info['refresh_token'])
# Update token in database
sessions_collection.update_one(
{"session_token": session_token},
{"$set": {"token_info": refreshed_token_info}}
)
# Use the refreshed token info
token_info = refreshed_token_info
except Exception as e:
current_app.logger.error(f"Error refreshing token: {e}", exc_info=True)
return make_response(
jsonify({"success": False, "error": "Session expired and could not be refreshed"}), 401
)
# Return success with user email
return make_response(
jsonify({
"success": True,
"user_email": session.get("user_email", "Connected User"),
"message": "Session is valid"
}),
200
)
except Exception as e:
current_app.logger.error(f"Error validating Google Drive session: {e}", exc_info=True)
return make_response(
jsonify({
"success": False,
"error": f"Failed to validate session: {str(e)}. Please reconnect your Google Drive account and make sure to grant offline access during authorization."
}), 500
)