(feat:connector) raw sync flow

This commit is contained in:
ManishMadan2882
2025-09-02 13:34:31 +05:30
parent 8c986aaa7f
commit 384ad3e0ac
4 changed files with 336 additions and 34 deletions

View File

@@ -650,8 +650,11 @@ def remote_worker(
"id": str(id),
"type": loader,
"remote_data": source_data,
"sync_frequency": sync_frequency,
"sync_frequency": sync_frequency
}
if operation_mode == "sync":
file_data["last_sync"] = datetime.datetime.now()
upload_index(full_path, file_data)
except Exception as e:
logging.error("Error in remote_worker task: %s", str(e), exc_info=True)
@@ -708,7 +711,7 @@ def sync_worker(self, frequency):
self, source_data, name, user, source_type, frequency, retriever, doc_id
)
sync_counts["total_sync_count"] += 1
sync_counts[
sync_counts[
"sync_success" if resp["status"] == "success" else "sync_failure"
] += 1
return {
@@ -745,7 +748,7 @@ def attachment_worker(self, file_info, user):
input_files=[local_path], exclude_hidden=True, errors="ignore"
)
.load_data()[0]
.text,
.text,
)
@@ -839,15 +842,18 @@ def agent_webhook_worker(self, agent_id, payload):
def ingest_connector(
self,
job_name: str,
user: str,
self,
job_name: str,
user: str,
source_type: str,
session_token=None,
file_ids=None,
folder_ids=None,
file_ids=None,
folder_ids=None,
recursive=True,
retriever: str = "classic"
retriever: str = "classic",
operation_mode: str = "upload",
doc_id=None,
sync_frequency: str = "never",
) -> Dict[str, Any]:
"""
Ingestion for internal knowledge bases (GoogleDrive, etc.).
@@ -861,6 +867,9 @@ def ingest_connector(
folder_ids: List of folder IDs to download
recursive: Whether to recursively download folders
retriever: Type of retriever to use
operation_mode: "upload" for initial ingestion, "sync" for incremental sync
doc_id: Document ID for sync operations (required when operation_mode="sync")
sync_frequency: How often to sync ("never", "daily", "weekly", "monthly")
"""
logging.info(f"Starting remote ingestion from {source_type} for user: {user}, job: {job_name}")
self.update_state(state="PROGRESS", meta={"current": 1})
@@ -869,25 +878,73 @@ def ingest_connector(
try:
# Step 1: Initialize the appropriate loader
self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing connector"})
# Handle incremental sync using Google Drive API directly
current_sync_time = datetime.datetime.now().isoformat() + 'Z'
if operation_mode == "sync":
if source_type == "google_drive":
from application.parser.connectors.connector_creator import ConnectorCreator
remote_loader = ConnectorCreator.create_connector("google_drive", session_token)
source = sources_collection.find_one({"_id": ObjectId(doc_id)})
last_sync_time = source.get("last_sync")
if not last_sync_time:
last_sync_time = source.get("date")
scan_results = remote_loader.scan_drive_contents(
file_ids or [],
folder_ids or [],
modified_after=last_sync_time
)
modified_files = scan_results.get('modified_files', [])
modified_folders = scan_results.get('modified_folders', [])
# Log atomic changes detected via Google Drive API
if modified_files:
logging.info(f"Files modified since last sync: {len(modified_files)} files")
for f in modified_files:
logging.info(f" - {f['name']} (ID: {f['id']}, Modified: {f['modifiedTime']})")
if modified_folders:
logging.info(f"Folders modified since last sync: {len(modified_folders)} folders")
for f in modified_folders:
logging.info(f" - {f['name']} (ID: {f['id']}, Modified: {f['modifiedTime']})")
if not modified_files and not modified_folders:
logging.info("No changes detected via Google Drive API")
return {
"user": user,
"name": job_name,
"tokens": 0,
"type": source_type,
"status": "no_changes"
}
file_ids = [f['id'] for f in modified_files]
folder_ids = [f['id'] for f in modified_folders]
if source_type == "google_drive":
if not session_token:
raise ValueError("Google Drive connector requires session_token")
from application.parser.connectors.connector_creator import ConnectorCreator
remote_loader = ConnectorCreator.create_connector("google_drive", session_token)
# Create a clean config for storage that excludes the session token
api_source_config = {
"file_ids": file_ids or [],
"folder_ids": folder_ids or [],
"recursive": recursive
}
# Step 2: Download files to temp directory
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"})
download_info = remote_loader.download_to_directory(
temp_dir,
temp_dir,
{
"file_ids": file_ids or [],
"folder_ids": folder_ids or [],
@@ -942,6 +999,8 @@ def ingest_connector(
)
raw_docs = reader.load_data()
directory_structure = getattr(reader, 'directory_structure', {})
# Step 4: Process documents (chunking, embedding, etc.)
self.update_state(state="PROGRESS", meta={"current": 60, "status": "Processing documents"})
@@ -964,8 +1023,16 @@ def ingest_connector(
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
# Step 5: Store in vector database
id = ObjectId()
if operation_mode == "upload":
id = ObjectId()
elif operation_mode == "sync":
if not doc_id or not ObjectId.is_valid(doc_id):
logging.error("Invalid doc_id provided for sync operation: %s", doc_id)
raise ValueError("doc_id must be provided for sync operation.")
id = ObjectId(doc_id)
else:
raise ValueError(f"Invalid operation_mode: {operation_mode}")
vector_store_path = os.path.join(temp_dir, "vector_store")
os.makedirs(vector_store_path, exist_ok=True)
@@ -986,16 +1053,22 @@ def ingest_connector(
"provider": source_type,
**api_source_config
}),
"directory_structure": json.dumps(directory_structure)
"directory_structure": json.dumps(directory_structure),
"sync_frequency": sync_frequency
}
if operation_mode == "sync":
file_data["last_sync"] = datetime.datetime.now()
else:
file_data["last_sync"] = datetime.datetime.now()
upload_index(vector_store_path, file_data)
# Ensure we mark the task as complete
self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"})
logging.info(f"Remote ingestion completed: {job_name}")
return {
"user": user,
"name": job_name,