Files
DocsGPT/application/parser/connectors/google_drive/auth.py
2025-08-26 02:46:36 +05:30

269 lines
10 KiB
Python

import logging
import datetime
from typing import Optional, Dict, Any
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import Flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from application.core.settings import settings
from application.parser.connectors.base import BaseConnectorAuth
class GoogleDriveAuth(BaseConnectorAuth):
"""
Handles Google OAuth 2.0 authentication for Google Drive access.
"""
SCOPES = [
'https://www.googleapis.com/auth/drive.readonly',
'https://www.googleapis.com/auth/drive.metadata.readonly'
]
def __init__(self):
self.client_id = settings.GOOGLE_CLIENT_ID
self.client_secret = settings.GOOGLE_CLIENT_SECRET
self.redirect_uri = settings.GOOGLE_REDIRECT_URI or "http://localhost:7091/api/google-drive/callback"
if not self.client_id or not self.client_secret:
raise ValueError("Google OAuth credentials not configured. Please set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET in settings.")
def get_authorization_url(self, state: Optional[str] = None) -> str:
try:
flow = Flow.from_client_config(
{
"web": {
"client_id": self.client_id,
"client_secret": self.client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [self.redirect_uri]
}
},
scopes=self.SCOPES
)
flow.redirect_uri = self.redirect_uri
authorization_url, _ = flow.authorization_url(
access_type='offline',
prompt='consent',
include_granted_scopes='true',
state=state
)
return authorization_url
except Exception as e:
logging.error(f"Error generating authorization URL: {e}")
raise
def exchange_code_for_tokens(self, authorization_code: str) -> Dict[str, Any]:
try:
if not authorization_code:
raise ValueError("Authorization code is required")
flow = Flow.from_client_config(
{
"web": {
"client_id": self.client_id,
"client_secret": self.client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": [self.redirect_uri]
}
},
scopes=self.SCOPES
)
flow.redirect_uri = self.redirect_uri
flow.fetch_token(code=authorization_code)
credentials = flow.credentials
if not credentials.refresh_token:
logging.warning("OAuth flow did not return a refresh_token.")
if not credentials.token:
raise ValueError("OAuth flow did not return an access token")
if not credentials.token_uri:
credentials.token_uri = "https://oauth2.googleapis.com/token"
if not credentials.client_id:
credentials.client_id = self.client_id
if not credentials.client_secret:
credentials.client_secret = self.client_secret
if not credentials.refresh_token:
raise ValueError(
"No refresh token received. This typically happens when offline access wasn't granted. "
)
return {
'access_token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes,
'expiry': credentials.expiry.isoformat() if credentials.expiry else None
}
except Exception as e:
logging.error(f"Error exchanging code for tokens: {e}")
raise
def refresh_access_token(self, refresh_token: str) -> Dict[str, Any]:
try:
if not refresh_token:
raise ValueError("Refresh token is required")
credentials = Credentials(
token=None,
refresh_token=refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=self.client_id,
client_secret=self.client_secret
)
from google.auth.transport.requests import Request
credentials.refresh(Request())
return {
'access_token': credentials.token,
'refresh_token': refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes,
'expiry': credentials.expiry.isoformat() if credentials.expiry else None
}
except Exception as e:
logging.error(f"Error refreshing access token: {e}", exc_info=True)
raise
def create_credentials_from_token_info(self, token_info: Dict[str, Any]) -> Credentials:
from application.core.settings import settings
access_token = token_info.get('access_token')
if not access_token:
raise ValueError("No access token found in token_info")
credentials = Credentials(
token=access_token,
refresh_token=token_info.get('refresh_token'),
token_uri= 'https://oauth2.googleapis.com/token',
client_id=settings.GOOGLE_CLIENT_ID,
client_secret=settings.GOOGLE_CLIENT_SECRET,
scopes=token_info.get('scopes', ['https://www.googleapis.com/auth/drive.readonly'])
)
if not credentials.token:
raise ValueError("Credentials created without valid access token")
return credentials
def build_drive_service(self, credentials: Credentials):
try:
if not credentials:
raise ValueError("No credentials provided")
if not credentials.token and not credentials.refresh_token:
raise ValueError("No access token or refresh token available. User must re-authorize with offline access.")
needs_refresh = credentials.expired or not credentials.token
if needs_refresh:
if credentials.refresh_token:
try:
from google.auth.transport.requests import Request
credentials.refresh(Request())
except Exception as refresh_error:
raise ValueError(f"Failed to refresh credentials: {refresh_error}")
else:
raise ValueError("No access token or refresh token available. User must re-authorize with offline access.")
return build('drive', 'v3', credentials=credentials)
except HttpError as e:
raise ValueError(f"Failed to build Google Drive service: HTTP {e.resp.status}")
except Exception as e:
raise ValueError(f"Failed to build Google Drive service: {str(e)}")
def is_token_expired(self, token_info):
if 'expiry' in token_info and token_info['expiry']:
try:
from dateutil import parser
# Google Drive provides timezone-aware ISO8601 dates
expiry_dt = parser.parse(token_info['expiry'])
current_time = datetime.datetime.now(datetime.timezone.utc)
return current_time >= expiry_dt - datetime.timedelta(seconds=60)
except Exception:
return True
if 'access_token' in token_info and token_info['access_token']:
return False
return True
def get_token_info_from_session(self, session_token: str) -> Dict[str, Any]:
try:
from application.core.mongo_db import MongoDB
from application.core.settings import settings
mongo = MongoDB.get_client()
db = mongo[settings.MONGO_DB_NAME]
sessions_collection = db["drive_sessions"]
session = sessions_collection.find_one({"session_token": session_token})
if not session:
raise ValueError(f"Invalid session token: {session_token}")
if "token_info" not in session:
raise ValueError("Session missing token information")
token_info = session["token_info"]
if not token_info:
raise ValueError("Invalid token information")
required_fields = ["access_token", "refresh_token"]
missing_fields = [field for field in required_fields if field not in token_info or not token_info.get(field)]
if missing_fields:
raise ValueError(f"Missing required token fields: {missing_fields}")
if 'client_id' not in token_info:
token_info['client_id'] = settings.GOOGLE_CLIENT_ID
if 'client_secret' not in token_info:
token_info['client_secret'] = settings.GOOGLE_CLIENT_SECRET
if 'token_uri' not in token_info:
token_info['token_uri'] = 'https://oauth2.googleapis.com/token'
return token_info
except Exception as e:
raise ValueError(f"Failed to retrieve Google Drive token information: {str(e)}")
def validate_credentials(self, credentials: Credentials) -> bool:
"""
Validate Google Drive credentials by making a test API call.
Args:
credentials: Google credentials object
Returns:
True if credentials are valid, False otherwise
"""
try:
service = self.build_drive_service(credentials)
service.about().get(fields="user").execute()
return True
except HttpError as e:
logging.error(f"HTTP error validating credentials: {e}")
return False
except Exception as e:
logging.error(f"Error validating credentials: {e}")
return False