(feat:sources) management, simple re-ingest

This commit is contained in:
ManishMadan2882
2025-07-30 01:57:40 +05:30
parent a24652f901
commit 266d256a07
3 changed files with 269 additions and 0 deletions

View File

@@ -336,6 +336,129 @@ def ingest_worker(
}
def reingest_source_worker(self, source_id, user):
"""
Re-ingestion worker that scans the source and determines what needs to be re-ingested.
This is decoupled from the file management operations.
Args:
self: Task instance
source_id: ID of the source to re-ingest
user: User identifier
Returns:
dict: Information about the re-ingestion task
"""
try:
from application.vectorstore.vector_creator import VectorCreator
from application.utils import num_tokens_from_string
self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing re-ingestion scan"})
source = sources_collection.find_one({"_id": ObjectId(source_id), "user": user})
if not source:
raise ValueError(f"Source {source_id} not found or access denied")
storage = StorageCreator.get_storage()
source_file_path = source.get("file_path", "")
job_name = source.get("name", "")
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Scanning current files"})
with tempfile.TemporaryDirectory() as temp_dir:
# Download all files from storage to temp directory, preserving directory structure
if storage.is_directory(source_file_path):
files_list = storage.list_files(source_file_path)
for storage_file_path in files_list:
if storage.is_directory(storage_file_path):
continue
rel_path = os.path.relpath(storage_file_path, source_file_path)
local_file_path = os.path.join(temp_dir, rel_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
# Download file
try:
file_data = storage.get_file(storage_file_path)
with open(local_file_path, "wb") as f:
f.write(file_data.read())
except Exception as e:
logging.error(f"Error downloading file {storage_file_path}: {e}")
continue
reader = SimpleDirectoryReader(
input_dir=temp_dir,
recursive=True,
required_exts=[
".rst", ".md", ".pdf", ".txt", ".docx", ".csv", ".epub",
".html", ".mdx", ".json", ".xlsx", ".pptx", ".png",
".jpg", ".jpeg",
],
exclude_hidden=True,
file_metadata=metadata_from_filename,
)
raw_docs = reader.load_data()
directory_structure = getattr(reader, 'directory_structure', {})
logging.info(f"Directory structure from reader: {directory_structure}")
total_tokens = 0
for doc in raw_docs:
if hasattr(doc, 'extra_info') and 'token_count' in doc.extra_info:
total_tokens += doc.extra_info['token_count']
else:
doc_text = str(doc.text) if hasattr(doc, 'text') else str(doc)
total_tokens += num_tokens_from_string(doc_text)
logging.info(f"Total tokens calculated: {total_tokens}")
self.update_state(state="PROGRESS", meta={"current": 50, "status": "Processing documents"})
temp_vector_dir = os.path.join(temp_dir, "vector_store")
os.makedirs(temp_vector_dir, exist_ok=True)
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False,
)
chunked_docs = chunker.chunk(documents=raw_docs)
docs = [Document.to_langchain_format(doc) for doc in chunked_docs]
embed_and_store_documents(docs, temp_vector_dir, ObjectId(source_id), self)
file_data = {
"name": source.get("name", ""),
"file": source.get("name", ""),
"user": user,
"tokens": total_tokens,
"retriever": source.get("retriever", "classic"),
"id": source_id,
"type": source.get("type", "local"),
"file_path": source_file_path,
"directory_structure": json.dumps(directory_structure),
}
upload_index(temp_vector_dir, file_data)
return {
"source_id": source_id,
"user": user,
"tokens": total_tokens,
"status": "completed"
}
except Exception as e:
logging.error(f"Error in reingest_source_worker: {e}", exc_info=True)
raise
def remote_worker(
self,
source_data,