(feat:ingestion) external drive connect

This commit is contained in:
ManishMadan2882
2025-08-22 13:33:21 +05:30
parent 8c3f75e3e2
commit f82be23ca9
3 changed files with 596 additions and 0 deletions

View File

@@ -6,6 +6,7 @@ import os
import shutil
import string
import tempfile
from typing import Any, Dict
import zipfile
from collections import Counter
@@ -835,3 +836,146 @@ def agent_webhook_worker(self, agent_id, payload):
f"Webhook processed for agent {agent_id}", extra={"agent_id": agent_id}
)
return {"status": "success", "result": result}
def ingest_connector(
self, job_name: str, user: str, source_type: str,
source_config: Dict[str, Any], retriever: str = "classic"
) -> Dict[str, Any]:
"""
ingestion for internal knowledge bases(GoogleDrive).
Args:
job_name: Name of the ingestion job
user: User identifier
source_type: Type of remote source ("google_drive", "dropbox", etc.)
source_config: Configuration specific to the source type
retriever: Type of retriever to use
"""
logging.info(f"Starting remote ingestion from {source_type} for user: {user}, job: {job_name}")
self.update_state(state="PROGRESS", meta={"current": 1})
with tempfile.TemporaryDirectory() as temp_dir:
try:
# Step 1: Get the appropriate remote loader
logging.info(f"source_config {source_config}")
if source_type == "google_drive":
session_token = source_config.get("session_token")
if not session_token:
raise ValueError("Google Drive connector requires session_token in source_config")
from application.parser.remote.google_drive_loader import GoogleDriveLoader
remote_loader = GoogleDriveLoader(session_token)
# Create a clean config for storage that excludes the session token
api_source_config = {
"file_ids": source_config.get("file_ids", []),
"folder_id": source_config.get("folder_id", ""),
}
if source_config.get("recursive") is not None:
api_source_config["recursive"] = source_config.get("recursive")
else:
remote_loader = RemoteCreator.create_loader(source_type, source_config)
api_source_config = source_config
# Step 2: Download files to temp directory
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Downloading files"})
# For Google Drive, pass the source_config to download_to_directory
if source_type == "google_drive":
download_info = remote_loader.download_to_directory(temp_dir, source_config)
else:
download_info = remote_loader.download_to_directory(temp_dir)
if download_info.get("empty_result", False) or not download_info.get("files_downloaded", 0):
logging.warning(f"No files were downloaded from {source_type}")
# Create empty result directly instead of calling a separate method
return {
"name": job_name,
"user": user,
"tokens": 0,
"type": source_type,
"source_config": source_config,
"directory_structure": "{}",
}
# Step 3: Use SimpleDirectoryReader to process downloaded files
self.update_state(state="PROGRESS", meta={"current": 40, "status": "Processing files"})
reader = SimpleDirectoryReader(
input_dir=temp_dir,
recursive=True,
required_exts=[
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
],
exclude_hidden=True,
file_metadata=metadata_from_filename,
)
raw_docs = reader.load_data()
directory_structure = getattr(reader, 'directory_structure', {})
# Step 4: Process documents (chunking, embedding, etc.)
self.update_state(state="PROGRESS", meta={"current": 60, "status": "Processing documents"})
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False,
)
raw_docs = chunker.chunk(documents=raw_docs)
# Preserve source information in document metadata
for doc in raw_docs:
if hasattr(doc, 'extra_info') and doc.extra_info:
source = doc.extra_info.get('source')
if source and os.path.isabs(source):
# Convert absolute path to relative path
doc.extra_info['source'] = os.path.relpath(source, start=temp_dir)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
# Step 5: Store in vector database
id = ObjectId()
vector_store_path = os.path.join(temp_dir, "vector_store")
os.makedirs(vector_store_path, exist_ok=True)
self.update_state(state="PROGRESS", meta={"current": 80, "status": "Storing documents"})
embed_and_store_documents(docs, vector_store_path, id, self)
tokens = count_tokens_docs(docs)
# Step 6: Upload index files
file_data = {
"user": user,
"name": job_name,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": source_type,
"remote_data": json.dumps(api_source_config),
"directory_structure": json.dumps(directory_structure)
}
upload_index(vector_store_path, file_data)
# Ensure we mark the task as complete
self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"})
logging.info(f"Remote ingestion completed: {job_name}")
return {
"user": user,
"name": job_name,
"tokens": tokens,
"type": source_type,
"id": str(id),
"status": "complete"
}
except Exception as e:
logging.error(f"Error during remote ingestion: {e}", exc_info=True)
raise