From f09f1433a98b94e633bdfdb1298161f76777d2c1 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Tue, 26 Aug 2025 01:38:36 +0530 Subject: [PATCH] (feat:connectors) separate layer --- application/api/user/routes.py | 10 ++-- application/parser/connectors/__init__.py | 11 ++++ .../parser/connectors/connector_creator.py | 57 +++++++++++++++++++ .../connectors/google_drive/__init__.py | 10 ++++ .../google_drive/auth.py} | 0 .../google_drive/loader.py} | 22 +++---- application/parser/remote/remote_creator.py | 12 +++- application/worker.py | 36 +++++++----- 8 files changed, 125 insertions(+), 33 deletions(-) create mode 100644 application/parser/connectors/__init__.py create mode 100644 application/parser/connectors/connector_creator.py create mode 100644 application/parser/connectors/google_drive/__init__.py rename application/parser/{remote/google_auth.py => connectors/google_drive/auth.py} (100%) rename application/parser/{remote/google_drive_loader.py => connectors/google_drive/loader.py} (98%) diff --git a/application/api/user/routes.py b/application/api/user/routes.py index ae696952..db371d0c 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -3995,7 +3995,7 @@ class GoogleDriveAuth(Resource): def get(self): """Get Google Drive OAuth authorization URL""" try: - from application.parser.remote.google_auth import GoogleDriveAuth + from application.parser.connectors.google_drive.auth import GoogleDriveAuth auth = GoogleDriveAuth() @@ -4029,7 +4029,7 @@ class GoogleDriveCallback(Resource): def get(self): """Handle Google Drive OAuth callback""" try: - from application.parser.remote.google_auth import GoogleDriveAuth + from application.parser.connectors.google_drive.auth import GoogleDriveAuth from flask import request import uuid @@ -4193,7 +4193,7 @@ class GoogleDriveRefresh(Resource): def post(self): """Refresh Google Drive access token""" try: - from application.parser.remote.google_auth import GoogleDriveAuth + from application.parser.connectors.google_drive.auth import GoogleDriveAuth data = request.get_json() refresh_token = data.get('refresh_token') @@ -4241,7 +4241,7 @@ class GoogleDriveFiles(Resource): def post(self): """Get list of files from Google Drive""" try: - from application.parser.remote.google_drive_loader import GoogleDriveLoader + from application.parser.connectors.google_drive.loader import GoogleDriveLoader data = request.get_json() session_token = data.get('session_token') @@ -4329,7 +4329,7 @@ class GoogleDriveValidateSession(Resource): """Validate Google Drive session token and return user info""" try: from application.core.mongo_db import MongoDB - from application.parser.remote.google_auth import GoogleDriveAuth + from application.parser.connectors.google_drive.auth import GoogleDriveAuth data = request.get_json() session_token = data.get('session_token') diff --git a/application/parser/connectors/__init__.py b/application/parser/connectors/__init__.py new file mode 100644 index 00000000..ee1af121 --- /dev/null +++ b/application/parser/connectors/__init__.py @@ -0,0 +1,11 @@ +""" +External knowledge base connectors for DocsGPT. + +This module contains connectors for external knowledge bases and document storage systems +that require authentication and specialized handling, separate from simple web scrapers. +""" + +from .connector_creator import ConnectorCreator +from .google_drive import GoogleDriveAuth, GoogleDriveLoader + +__all__ = ['ConnectorCreator', 'GoogleDriveAuth', 'GoogleDriveLoader'] diff --git a/application/parser/connectors/connector_creator.py b/application/parser/connectors/connector_creator.py new file mode 100644 index 00000000..cefba7b4 --- /dev/null +++ b/application/parser/connectors/connector_creator.py @@ -0,0 +1,57 @@ +from application.parser.connectors.google_drive.loader import GoogleDriveLoader + + +class ConnectorCreator: + """ + Factory class for creating external knowledge base connectors. + + These are different from remote loaders as they typically require + authentication and connect to external document storage systems. + """ + + connectors = { + "google_drive": GoogleDriveLoader, + } + + @classmethod + def create_connector(cls, connector_type, *args, **kwargs): + """ + Create a connector instance for the specified type. + + Args: + connector_type: Type of connector to create (e.g., 'google_drive') + *args, **kwargs: Arguments to pass to the connector constructor + + Returns: + Connector instance + + Raises: + ValueError: If connector type is not supported + """ + connector_class = cls.connectors.get(connector_type.lower()) + if not connector_class: + raise ValueError(f"No connector class found for type {connector_type}") + return connector_class(*args, **kwargs) + + @classmethod + def get_supported_connectors(cls): + """ + Get list of supported connector types. + + Returns: + List of supported connector type strings + """ + return list(cls.connectors.keys()) + + @classmethod + def is_supported(cls, connector_type): + """ + Check if a connector type is supported. + + Args: + connector_type: Type of connector to check + + Returns: + True if supported, False otherwise + """ + return connector_type.lower() in cls.connectors diff --git a/application/parser/connectors/google_drive/__init__.py b/application/parser/connectors/google_drive/__init__.py new file mode 100644 index 00000000..18abeec1 --- /dev/null +++ b/application/parser/connectors/google_drive/__init__.py @@ -0,0 +1,10 @@ +""" +Google Drive connector for DocsGPT. + +This module provides authentication and document loading capabilities for Google Drive. +""" + +from .auth import GoogleDriveAuth +from .loader import GoogleDriveLoader + +__all__ = ['GoogleDriveAuth', 'GoogleDriveLoader'] diff --git a/application/parser/remote/google_auth.py b/application/parser/connectors/google_drive/auth.py similarity index 100% rename from application/parser/remote/google_auth.py rename to application/parser/connectors/google_drive/auth.py diff --git a/application/parser/remote/google_drive_loader.py b/application/parser/connectors/google_drive/loader.py similarity index 98% rename from application/parser/remote/google_drive_loader.py rename to application/parser/connectors/google_drive/loader.py index a5d5cc9f..d782649c 100644 --- a/application/parser/remote/google_drive_loader.py +++ b/application/parser/connectors/google_drive/loader.py @@ -12,7 +12,7 @@ from googleapiclient.http import MediaIoBaseDownload from googleapiclient.errors import HttpError from application.parser.remote.base import BaseRemote -from application.parser.remote.google_auth import GoogleDriveAuth +from application.parser.connectors.google_drive.auth import GoogleDriveAuth from application.parser.schema.base import Document @@ -329,7 +329,7 @@ class GoogleDriveLoader(BaseRemote): if e.resp.status in [401, 403]: logging.error(f"Authentication error downloading file {file_id}") - + if hasattr(self.credentials, 'refresh_token') and self.credentials.refresh_token: logging.info(f"Attempting to refresh credentials for file {file_id}") try: @@ -406,10 +406,10 @@ class GoogleDriveLoader(BaseRemote): files_downloaded = 0 try: os.makedirs(local_dir, exist_ok=True) - + query = f"'{folder_id}' in parents and trashed=false" page_token = None - + while True: results = self.service.files().list( q=query, @@ -417,15 +417,15 @@ class GoogleDriveLoader(BaseRemote): pageToken=page_token, pageSize=1000 ).execute() - + items = results.get('files', []) logging.info(f"Found {len(items)} items in folder {folder_id}") - + for item in items: item_name = item['name'] item_id = item['id'] mime_type = item['mimeType'] - + if mime_type == 'application/vnd.google-apps.folder': if recursive: # Create subfolder and recurse @@ -446,13 +446,13 @@ class GoogleDriveLoader(BaseRemote): logging.info(f"Downloaded file: {item_name}") else: logging.warning(f"Failed to download file: {item_name}") - + page_token = results.get('nextPageToken') if not page_token: break - + return files_downloaded - + except Exception as e: logging.error(f"Error in _download_folder_recursive for folder {folder_id}: {e}", exc_info=True) return files_downloaded @@ -513,7 +513,7 @@ class GoogleDriveLoader(BaseRemote): folder_name = folder_metadata.get('name', '') folder_path = os.path.join(local_dir, folder_name) os.makedirs(folder_path, exist_ok=True) - + folder_files = self._download_folder_recursive( folder_id, folder_path, diff --git a/application/parser/remote/remote_creator.py b/application/parser/remote/remote_creator.py index 4d1f34a2..a47b186a 100644 --- a/application/parser/remote/remote_creator.py +++ b/application/parser/remote/remote_creator.py @@ -3,17 +3,25 @@ from application.parser.remote.crawler_loader import CrawlerLoader from application.parser.remote.web_loader import WebLoader from application.parser.remote.reddit_loader import RedditPostsLoaderRemote from application.parser.remote.github_loader import GitHubLoader -from application.parser.remote.google_drive_loader import GoogleDriveLoader class RemoteCreator: + """ + Factory class for creating remote content loaders. + + These loaders fetch content from remote web sources like URLs, + sitemaps, web crawlers, social media platforms, etc. + + For external knowledge base connectors (like Google Drive), + use ConnectorCreator instead. + """ + loaders = { "url": WebLoader, "sitemap": SitemapLoader, "crawler": CrawlerLoader, "reddit": RedditPostsLoaderRemote, "github": GitHubLoader, - "google_drive": GoogleDriveLoader, } @classmethod diff --git a/application/worker.py b/application/worker.py index fe386a2d..719ebccc 100755 --- a/application/worker.py +++ b/application/worker.py @@ -874,8 +874,8 @@ def ingest_connector( if not session_token: raise ValueError("Google Drive connector requires session_token") - from application.parser.remote.google_drive_loader import GoogleDriveLoader - remote_loader = GoogleDriveLoader(session_token) + from application.parser.connectors.connector_creator import ConnectorCreator + remote_loader = ConnectorCreator.create_connector("google_drive", session_token) # Create a clean config for storage that excludes the session token api_source_config = { @@ -895,19 +895,25 @@ def ingest_connector( } ) else: - # For other connectors, maintain backward compatibility - source_config = { - "session_token": session_token + # For other external knowledge base connectors (future: dropbox, onedrive, etc.) + from application.parser.connectors.connector_creator import ConnectorCreator + + if not ConnectorCreator.is_supported(source_type): + raise ValueError(f"Unsupported connector type: {source_type}. Supported types: {ConnectorCreator.get_supported_connectors()}") + + # Create connector with session token and other parameters + remote_loader = ConnectorCreator.create_connector(source_type, session_token) + + api_source_config = { + "file_ids": file_ids or [], + "folder_ids": folder_ids or [], + "recursive": recursive } - if file_ids: - source_config["file_ids"] = file_ids - if folder_ids: - source_config["folder_ids"] = folder_ids - source_config["recursive"] = recursive - - remote_loader = RemoteCreator.create_loader(source_type, source_config) - api_source_config = source_config - download_info = remote_loader.download_to_directory(temp_dir) + + download_info = remote_loader.download_to_directory( + temp_dir, + api_source_config + ) if download_info.get("empty_result", False) or not download_info.get("files_downloaded", 0): logging.warning(f"No files were downloaded from {source_type}") @@ -917,7 +923,7 @@ def ingest_connector( "user": user, "tokens": 0, "type": source_type, - "source_config": source_config, + "source_config": api_source_config, "directory_structure": "{}", }