mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 08:33:20 +00:00
(feat:connector) sync, simply re-ingest
This commit is contained in:
@@ -146,115 +146,7 @@ class GoogleDriveLoader(BaseConnectorLoader):
|
||||
logging.error(f"Error loading data from Google Drive: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def scan_drive_contents(self, file_ids: List[str], folder_ids: List[str],
|
||||
modified_after: str = "2024-01-01T00:00:00Z") -> Dict[str, Any]:
|
||||
"""
|
||||
Scan Google Drive contents and check for files/folders modified after a specific date.
|
||||
|
||||
Args:
|
||||
file_ids: List of specific file IDs to check
|
||||
folder_ids: List of folder IDs to scan for modified contents
|
||||
modified_after: ISO 8601 formatted date string (default: "2024-01-01T00:00:00Z")
|
||||
|
||||
Returns:
|
||||
Dictionary containing:
|
||||
- 'modified_files': List of file IDs that were modified after the given date
|
||||
- 'modified_folders': List of folder IDs that were modified after the given date
|
||||
- 'scan_summary': Summary of the scan results
|
||||
"""
|
||||
self._ensure_service()
|
||||
|
||||
modified_files = []
|
||||
modified_folders = []
|
||||
|
||||
try:
|
||||
for file_id in file_ids:
|
||||
try:
|
||||
file_metadata = self.service.files().get(
|
||||
fileId=file_id,
|
||||
fields='id,name,modifiedTime,mimeType'
|
||||
).execute()
|
||||
|
||||
modified_time = file_metadata.get('modifiedTime', '')
|
||||
if modified_time > modified_after:
|
||||
modified_files.append({
|
||||
'id': file_id,
|
||||
'name': file_metadata.get('name', 'Unknown'),
|
||||
'modifiedTime': modified_time,
|
||||
'mimeType': file_metadata.get('mimeType', '')
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f"Error checking file {file_id}: {e}")
|
||||
continue
|
||||
|
||||
for folder_id in folder_ids:
|
||||
try:
|
||||
folder_metadata = self.service.files().get(
|
||||
fileId=folder_id,
|
||||
fields='id,name,modifiedTime,mimeType'
|
||||
).execute()
|
||||
|
||||
folder_modified_time = folder_metadata.get('modifiedTime', '')
|
||||
if folder_modified_time > modified_after:
|
||||
modified_folders.append({
|
||||
'id': folder_id,
|
||||
'name': folder_metadata.get('name', 'Unknown'),
|
||||
'modifiedTime': folder_modified_time,
|
||||
'mimeType': folder_metadata.get('mimeType', '')
|
||||
})
|
||||
|
||||
query = f"'{folder_id}' in parents and modifiedTime > '{modified_after}'"
|
||||
|
||||
page_token = None
|
||||
while True:
|
||||
results = self.service.files().list(
|
||||
q=query,
|
||||
spaces='drive',
|
||||
fields='nextPageToken, files(id, name, modifiedTime, mimeType)',
|
||||
pageToken=page_token
|
||||
).execute()
|
||||
|
||||
items = results.get('files', [])
|
||||
|
||||
for item in items:
|
||||
item_info = {
|
||||
'id': item['id'],
|
||||
'name': item['name'],
|
||||
'modifiedTime': item['modifiedTime'],
|
||||
'mimeType': item['mimeType']
|
||||
}
|
||||
|
||||
if item['mimeType'] == 'application/vnd.google-apps.folder':
|
||||
modified_folders.append(item_info)
|
||||
else:
|
||||
modified_files.append(item_info)
|
||||
|
||||
page_token = results.get('nextPageToken')
|
||||
if not page_token:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f"Error scanning folder {folder_id}: {e}")
|
||||
continue
|
||||
|
||||
summary = {
|
||||
'total_modified_files': len(modified_files),
|
||||
'total_modified_folders': len(modified_folders),
|
||||
'scan_date': modified_after
|
||||
}
|
||||
|
||||
logging.info(f"Drive scan completed: {summary['total_modified_files']} files and {summary['total_modified_folders']} folders modified after {modified_after}")
|
||||
|
||||
return {
|
||||
'modified_files': modified_files,
|
||||
'modified_folders': modified_folders,
|
||||
'scan_summary': summary
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error scanning drive contents: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def _load_file_by_id(self, file_id: str, load_content: bool = True) -> Optional[Document]:
|
||||
self._ensure_service()
|
||||
|
||||
@@ -22,6 +22,7 @@ from application.api.answer.services.stream_processor import get_prompt
|
||||
from application.core.mongo_db import MongoDB
|
||||
from application.core.settings import settings
|
||||
from application.parser.chunking import Chunker
|
||||
from application.parser.connectors.connector_creator import ConnectorCreator
|
||||
from application.parser.embedding_pipeline import embed_and_store_documents
|
||||
from application.parser.file.bulk import SimpleDirectoryReader
|
||||
from application.parser.remote.remote_creator import RemoteCreator
|
||||
@@ -879,98 +880,27 @@ def ingest_connector(
|
||||
# Step 1: Initialize the appropriate loader
|
||||
self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing connector"})
|
||||
|
||||
# Handle incremental sync using Google Drive API directly
|
||||
current_sync_time = datetime.datetime.now().isoformat() + 'Z'
|
||||
if not session_token:
|
||||
raise ValueError(f"{source_type} connector requires session_token")
|
||||
|
||||
if operation_mode == "sync":
|
||||
if source_type == "google_drive":
|
||||
from application.parser.connectors.connector_creator import ConnectorCreator
|
||||
remote_loader = ConnectorCreator.create_connector("google_drive", session_token)
|
||||
if not ConnectorCreator.is_supported(source_type):
|
||||
raise ValueError(f"Unsupported connector type: {source_type}. Supported types: {ConnectorCreator.get_supported_connectors()}")
|
||||
|
||||
source = sources_collection.find_one({"_id": ObjectId(doc_id)})
|
||||
|
||||
last_sync_time = source.get("last_sync")
|
||||
if not last_sync_time:
|
||||
last_sync_time = source.get("date")
|
||||
|
||||
remote_loader = ConnectorCreator.create_connector(source_type, session_token)
|
||||
|
||||
scan_results = remote_loader.scan_drive_contents(
|
||||
file_ids or [],
|
||||
folder_ids or [],
|
||||
modified_after=last_sync_time
|
||||
)
|
||||
# Create a clean config for storage
|
||||
api_source_config = {
|
||||
"file_ids": file_ids or [],
|
||||
"folder_ids": folder_ids or [],
|
||||
"recursive": recursive
|
||||
}
|
||||
|
||||
modified_files = scan_results.get('modified_files', [])
|
||||
modified_folders = scan_results.get('modified_folders', [])
|
||||
|
||||
# Log atomic changes detected via Google Drive API
|
||||
if modified_files:
|
||||
logging.info(f"Files modified since last sync: {len(modified_files)} files")
|
||||
for f in modified_files:
|
||||
logging.info(f" - {f['name']} (ID: {f['id']}, Modified: {f['modifiedTime']})")
|
||||
|
||||
if modified_folders:
|
||||
logging.info(f"Folders modified since last sync: {len(modified_folders)} folders")
|
||||
for f in modified_folders:
|
||||
logging.info(f" - {f['name']} (ID: {f['id']}, Modified: {f['modifiedTime']})")
|
||||
|
||||
if not modified_files and not modified_folders:
|
||||
logging.info("No changes detected via Google Drive API")
|
||||
return {
|
||||
"user": user,
|
||||
"name": job_name,
|
||||
"tokens": 0,
|
||||
"type": source_type,
|
||||
"status": "no_changes"
|
||||
}
|
||||
|
||||
file_ids = [f['id'] for f in modified_files]
|
||||
folder_ids = [f['id'] for f in modified_folders]
|
||||
|
||||
if source_type == "google_drive":
|
||||
if not session_token:
|
||||
raise ValueError("Google Drive connector requires session_token")
|
||||
|
||||
from application.parser.connectors.connector_creator import ConnectorCreator
|
||||
remote_loader = ConnectorCreator.create_connector("google_drive", session_token)
|
||||
|
||||
# Create a clean config for storage that excludes the session token
|
||||
api_source_config = {
|
||||
"file_ids": file_ids or [],
|
||||
"folder_ids": folder_ids or [],
|
||||
"recursive": recursive
|
||||
}
|
||||
|
||||
# Step 2: Download files to temp directory
|
||||
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"})
|
||||
download_info = remote_loader.download_to_directory(
|
||||
temp_dir,
|
||||
{
|
||||
"file_ids": file_ids or [],
|
||||
"folder_ids": folder_ids or [],
|
||||
"recursive": recursive
|
||||
}
|
||||
)
|
||||
else:
|
||||
# For other external knowledge base connectors (future: dropbox, onedrive, etc.)
|
||||
from application.parser.connectors.connector_creator import ConnectorCreator
|
||||
|
||||
if not ConnectorCreator.is_supported(source_type):
|
||||
raise ValueError(f"Unsupported connector type: {source_type}. Supported types: {ConnectorCreator.get_supported_connectors()}")
|
||||
|
||||
# Create connector with session token and other parameters
|
||||
remote_loader = ConnectorCreator.create_connector(source_type, session_token)
|
||||
|
||||
api_source_config = {
|
||||
"file_ids": file_ids or [],
|
||||
"folder_ids": folder_ids or [],
|
||||
"recursive": recursive
|
||||
}
|
||||
|
||||
download_info = remote_loader.download_to_directory(
|
||||
temp_dir,
|
||||
api_source_config
|
||||
)
|
||||
# Step 2: Download files to temp directory
|
||||
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"})
|
||||
download_info = remote_loader.download_to_directory(
|
||||
temp_dir,
|
||||
api_source_config
|
||||
)
|
||||
|
||||
if download_info.get("empty_result", False) or not download_info.get("files_downloaded", 0):
|
||||
logging.warning(f"No files were downloaded from {source_type}")
|
||||
|
||||
@@ -10,7 +10,7 @@ import FolderIcon from '../assets/folder.svg';
|
||||
import ArrowLeft from '../assets/arrow-left.svg';
|
||||
import ThreeDots from '../assets/three-dots.svg';
|
||||
import EyeView from '../assets/eye-view.svg';
|
||||
import SearchIcon from '../assets/search.svg';
|
||||
import SyncIcon from '../assets/sync.svg';
|
||||
import { useOutsideAlerter } from '../hooks';
|
||||
|
||||
interface FileNode {
|
||||
@@ -59,6 +59,8 @@ const ConnectorTreeComponent: React.FC<ConnectorTreeComponentProps> = ({
|
||||
const [searchQuery, setSearchQuery] = useState('');
|
||||
const [searchResults, setSearchResults] = useState<SearchResult[]>([]);
|
||||
const searchDropdownRef = useRef<HTMLDivElement>(null);
|
||||
const [isSyncing, setIsSyncing] = useState<boolean>(false);
|
||||
const [syncProgress, setSyncProgress] = useState<number>(0);
|
||||
|
||||
useOutsideAlerter(
|
||||
searchDropdownRef,
|
||||
@@ -78,6 +80,71 @@ const ConnectorTreeComponent: React.FC<ConnectorTreeComponentProps> = ({
|
||||
});
|
||||
};
|
||||
|
||||
const handleSync = async () => {
|
||||
if (isSyncing) return;
|
||||
|
||||
setIsSyncing(true);
|
||||
setSyncProgress(0);
|
||||
|
||||
try {
|
||||
const response = await userService.syncConnector(docId, token);
|
||||
const data = await response.json();
|
||||
|
||||
if (data.success) {
|
||||
console.log('Sync started successfully:', data.task_id);
|
||||
setSyncProgress(10);
|
||||
|
||||
// Poll task status using userService
|
||||
const maxAttempts = 30;
|
||||
const pollInterval = 2000;
|
||||
|
||||
for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
||||
try {
|
||||
const statusResponse = await userService.getTaskStatus(data.task_id, token);
|
||||
const statusData = await statusResponse.json();
|
||||
|
||||
console.log(`Task status (attempt ${attempt + 1}):`, statusData.status);
|
||||
|
||||
if (statusData.status === 'SUCCESS') {
|
||||
setSyncProgress(100);
|
||||
console.log('Sync completed successfully');
|
||||
|
||||
// Refresh directory structure
|
||||
try {
|
||||
const refreshResponse = await userService.getDirectoryStructure(docId, token);
|
||||
const refreshData = await refreshResponse.json();
|
||||
if (refreshData && refreshData.directory_structure) {
|
||||
setDirectoryStructure(refreshData.directory_structure);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Error refreshing directory structure:', err);
|
||||
}
|
||||
break;
|
||||
} else if (statusData.status === 'FAILURE') {
|
||||
console.error('Sync task failed:', statusData.result);
|
||||
break;
|
||||
} else if (statusData.status === 'PROGRESS') {
|
||||
const progress = statusData.meta?.current || 0;
|
||||
setSyncProgress(Math.max(10, progress)); // Ensure minimum 10% after start
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, pollInterval));
|
||||
} catch (error) {
|
||||
console.error('Error polling task status:', error);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
console.error('Sync failed:', data.error);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Error syncing connector:', err);
|
||||
} finally {
|
||||
setIsSyncing(false);
|
||||
setSyncProgress(0);
|
||||
}
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
const fetchDirectoryStructure = async () => {
|
||||
try {
|
||||
@@ -247,7 +314,27 @@ const ConnectorTreeComponent: React.FC<ConnectorTreeComponentProps> = ({
|
||||
</div>
|
||||
|
||||
<div className="flex relative flex-row flex-nowrap items-center gap-2 w-full sm:w-auto justify-end mt-2 sm:mt-0">
|
||||
|
||||
{renderFileSearch()}
|
||||
|
||||
{/* Sync button */}
|
||||
<button
|
||||
onClick={handleSync}
|
||||
disabled={isSyncing}
|
||||
className={`flex h-[38px] min-w-[108px] items-center justify-center rounded-full px-4 text-[14px] whitespace-nowrap font-medium transition-colors ${
|
||||
isSyncing
|
||||
? 'bg-gray-300 text-gray-600 cursor-not-allowed dark:bg-gray-600 dark:text-gray-400'
|
||||
: 'bg-purple-30 hover:bg-violets-are-blue text-white'
|
||||
}`}
|
||||
title={isSyncing ? `${t('settings.sources.syncing')} ${syncProgress}%` : t('settings.sources.sync')}
|
||||
>
|
||||
<img
|
||||
src={SyncIcon}
|
||||
alt={t('settings.sources.sync')}
|
||||
className={`h-4 w-4 mr-2 ${isSyncing ? 'animate-spin' : ''} ${!isSyncing ? 'filter invert' : ''}`}
|
||||
/>
|
||||
{isSyncing ? `${syncProgress}%` : t('settings.sources.sync')}
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
|
||||
@@ -67,6 +67,7 @@
|
||||
"preLoaded": "Pre-loaded",
|
||||
"private": "Private",
|
||||
"sync": "Sync",
|
||||
"syncing": "Syncing...",
|
||||
"syncFrequency": {
|
||||
"never": "Never",
|
||||
"daily": "Daily",
|
||||
|
||||
@@ -67,6 +67,7 @@
|
||||
"preLoaded": "Precargado",
|
||||
"private": "Privado",
|
||||
"sync": "Sincronizar",
|
||||
"syncing": "Sincronizando...",
|
||||
"syncFrequency": {
|
||||
"never": "Nunca",
|
||||
"daily": "Diario",
|
||||
|
||||
@@ -67,6 +67,7 @@
|
||||
"preLoaded": "プリロード済み",
|
||||
"private": "プライベート",
|
||||
"sync": "同期",
|
||||
"syncing": "同期中...",
|
||||
"syncFrequency": {
|
||||
"never": "なし",
|
||||
"daily": "毎日",
|
||||
|
||||
@@ -67,6 +67,7 @@
|
||||
"preLoaded": "Предзагруженный",
|
||||
"private": "Частный",
|
||||
"sync": "Синхронизация",
|
||||
"syncing": "Синхронизация...",
|
||||
"syncFrequency": {
|
||||
"never": "Никогда",
|
||||
"daily": "Ежедневно",
|
||||
|
||||
@@ -67,6 +67,7 @@
|
||||
"preLoaded": "預載入",
|
||||
"private": "私人",
|
||||
"sync": "同步",
|
||||
"syncing": "同步中...",
|
||||
"syncFrequency": {
|
||||
"never": "從不",
|
||||
"daily": "每天",
|
||||
|
||||
@@ -67,6 +67,7 @@
|
||||
"preLoaded": "预加载",
|
||||
"private": "私有",
|
||||
"sync": "同步",
|
||||
"syncing": "同步中...",
|
||||
"syncFrequency": {
|
||||
"never": "从不",
|
||||
"daily": "每天",
|
||||
|
||||
Reference in New Issue
Block a user