diff --git a/application/api/user/routes.py b/application/api/user/routes.py index d00b0623..9e7b8a6e 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -3438,21 +3438,7 @@ class GetChunks(Resource): filtered_chunks = [] for chunk in chunks: metadata = chunk.get("metadata", {}) - - if path: - source = metadata.get("source", "") - path_match = False - - if isinstance(source, str) and source.endswith(path): - path_match = True - elif isinstance(source, list): - for src in source: - if isinstance(src, str) and src.endswith(path): - path_match = True - break - - if not path_match: - continue + if search_term: text_match = search_term in chunk.get("text", "").lower() diff --git a/application/worker.py b/application/worker.py index 5e19a202..12357c72 100755 --- a/application/worker.py +++ b/application/worker.py @@ -338,8 +338,9 @@ def ingest_worker( def reingest_source_worker(self, source_id, user): """ - Re-ingestion worker that scans the source and determines what needs to be re-ingested. - This is decoupled from the file management operations. + Re-ingestion worker that handles incremental updates by: + 1. Adding chunks from newly added files + 2. Removing chunks from deleted files Args: self: Task instance @@ -351,18 +352,15 @@ def reingest_source_worker(self, source_id, user): """ try: from application.vectorstore.vector_creator import VectorCreator - from application.utils import num_tokens_from_string self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing re-ingestion scan"}) - source = sources_collection.find_one({"_id": ObjectId(source_id), "user": user}) if not source: raise ValueError(f"Source {source_id} not found or access denied") storage = StorageCreator.get_storage() source_file_path = source.get("file_path", "") - job_name = source.get("name", "") self.update_state(state="PROGRESS", meta={"current": 20, "status": "Scanning current files"}) @@ -370,17 +368,17 @@ def reingest_source_worker(self, source_id, user): # Download all files from storage to temp directory, preserving directory structure if storage.is_directory(source_file_path): files_list = storage.list_files(source_file_path) - + for storage_file_path in files_list: if storage.is_directory(storage_file_path): continue - - + + rel_path = os.path.relpath(storage_file_path, source_file_path) local_file_path = os.path.join(temp_dir, rel_path) - + os.makedirs(os.path.dirname(local_file_path), exist_ok=True) - + # Download file try: file_data = storage.get_file(storage_file_path) @@ -389,7 +387,7 @@ def reingest_source_worker(self, source_id, user): except Exception as e: logging.error(f"Error downloading file {storage_file_path}: {e}") continue - + reader = SimpleDirectoryReader( input_dir=temp_dir, recursive=True, @@ -401,60 +399,185 @@ def reingest_source_worker(self, source_id, user): exclude_hidden=True, file_metadata=metadata_from_filename, ) - - raw_docs = reader.load_data() - - directory_structure = getattr(reader, 'directory_structure', {}) - logging.info(f"Directory structure from reader: {directory_structure}") - - total_tokens = 0 - for doc in raw_docs: - if hasattr(doc, 'extra_info') and 'token_count' in doc.extra_info: - total_tokens += doc.extra_info['token_count'] + reader.load_data() + directory_structure = reader.directory_structure + logging.info(f"Directory structure built with token counts: {directory_structure}") + + try: + old_directory_structure = source.get("directory_structure") or {} + if isinstance(old_directory_structure, str): + try: + old_directory_structure = json.loads(old_directory_structure) + except Exception: + old_directory_structure = {} + + def _flatten_directory_structure(struct, prefix=""): + files = set() + if isinstance(struct, dict): + for name, meta in struct.items(): + current_path = os.path.join(prefix, name) if prefix else name + if isinstance(meta, dict) and ("type" in meta and "size_bytes" in meta): + files.add(current_path) + elif isinstance(meta, dict): + files |= _flatten_directory_structure(meta, current_path) + return files + + old_files = _flatten_directory_structure(old_directory_structure) + new_files = _flatten_directory_structure(directory_structure) + + added_files = sorted(new_files - old_files) + removed_files = sorted(old_files - new_files) + + if added_files: + logging.info(f"Files added since last ingest: {added_files}") else: - doc_text = str(doc.text) if hasattr(doc, 'text') else str(doc) - total_tokens += num_tokens_from_string(doc_text) - - logging.info(f"Total tokens calculated: {total_tokens}") - - self.update_state(state="PROGRESS", meta={"current": 50, "status": "Processing documents"}) - - temp_vector_dir = os.path.join(temp_dir, "vector_store") - os.makedirs(temp_vector_dir, exist_ok=True) - - chunker = Chunker( - chunking_strategy="classic_chunk", - max_tokens=MAX_TOKENS, - min_tokens=MIN_TOKENS, - duplicate_headers=False, - ) - chunked_docs = chunker.chunk(documents=raw_docs) - - docs = [Document.to_langchain_format(doc) for doc in chunked_docs] - - embed_and_store_documents(docs, temp_vector_dir, ObjectId(source_id), self) - - file_data = { - "name": source.get("name", ""), - "file": source.get("name", ""), - "user": user, - "tokens": total_tokens, - "retriever": source.get("retriever", "classic"), - "id": source_id, - "type": source.get("type", "local"), - "file_path": source_file_path, - "directory_structure": json.dumps(directory_structure), - } - - upload_index(temp_vector_dir, file_data) - - return { - "source_id": source_id, - "user": user, - "tokens": total_tokens, - "status": "completed" - } - + logging.info("No files added since last ingest.") + + if removed_files: + logging.info(f"Files removed since last ingest: {removed_files}") + else: + logging.info("No files removed since last ingest.") + + except Exception as e: + logging.error(f"Error comparing directory structures: {e}", exc_info=True) + added_files = [] + removed_files = [] + try: + if not added_files and not removed_files: + logging.info("No changes detected.") + return { + "source_id": source_id, + "user": user, + "status": "no_changes", + "added_files": [], + "removed_files": [], + } + + vector_store = VectorCreator.create_vectorstore( + settings.VECTOR_STORE, + source_id, + settings.EMBEDDINGS_KEY, + ) + + self.update_state(state="PROGRESS", meta={"current": 40, "status": "Processing file changes"}) + + # 1) Delete chunks from removed files + deleted = 0 + if removed_files: + try: + for ch in vector_store.get_chunks() or []: + metadata = ch.get("metadata", {}) if isinstance(ch, dict) else getattr(ch, "metadata", {}) + raw_source = metadata.get("source") + + source_file = str(raw_source) if raw_source else "" + + if source_file in removed_files: + cid = ch.get("doc_id") + if cid: + try: + vector_store.delete_chunk(cid) + deleted += 1 + except Exception as de: + logging.error(f"Failed deleting chunk {cid}: {de}") + logging.info(f"Deleted {deleted} chunks from {len(removed_files)} removed files") + except Exception as e: + logging.error(f"Error during deletion of removed file chunks: {e}", exc_info=True) + + # 2) Add chunks from new files + added = 0 + if added_files: + try: + # Build list of local files for added files only + added_local_files = [] + for rel_path in added_files: + local_path = os.path.join(temp_dir, rel_path) + if os.path.isfile(local_path): + added_local_files.append(local_path) + + if added_local_files: + reader_new = SimpleDirectoryReader( + input_files=added_local_files, + exclude_hidden=True, + errors="ignore", + file_metadata=metadata_from_filename, + ) + raw_docs_new = reader_new.load_data() + chunker_new = Chunker( + chunking_strategy="classic_chunk", + max_tokens=MAX_TOKENS, + min_tokens=MIN_TOKENS, + duplicate_headers=False, + ) + chunked_new = chunker_new.chunk(documents=raw_docs_new) + + for file_path, token_count in reader_new.file_token_counts.items(): + try: + rel_path = os.path.relpath(file_path, start=temp_dir) + path_parts = rel_path.split(os.sep) + current_dir = directory_structure + + for part in path_parts[:-1]: + if part in current_dir and isinstance(current_dir[part], dict): + current_dir = current_dir[part] + else: + break + + filename = path_parts[-1] + if filename in current_dir and isinstance(current_dir[filename], dict): + current_dir[filename]["token_count"] = token_count + logging.info(f"Updated token count for {rel_path}: {token_count}") + except Exception as e: + logging.warning(f"Could not update token count for {file_path}: {e}") + + for d in chunked_new: + meta = dict(d.extra_info or {}) + try: + raw_src = meta.get("source") + if isinstance(raw_src, str) and os.path.isabs(raw_src): + meta["source"] = os.path.relpath(raw_src, start=temp_dir) + except Exception: + pass + + vector_store.add_chunk(d.text, metadata=meta) + added += 1 + logging.info(f"Added {added} chunks from {len(added_files)} new files") + except Exception as e: + logging.error(f"Error during ingestion of new files: {e}", exc_info=True) + + # 3) Update source directory structure timestamp + try: + total_tokens = sum(reader.file_token_counts.values()) + + sources_collection.update_one( + {"_id": ObjectId(source_id)}, + { + "$set": { + "directory_structure": json.dumps(directory_structure), + "date": datetime.datetime.now(), + "tokens": total_tokens + } + }, + ) + except Exception as e: + logging.error(f"Error updating directory_structure in DB: {e}", exc_info=True) + + self.update_state(state="PROGRESS", meta={"current": 100, "status": "Re-ingestion completed"}) + + return { + "source_id": source_id, + "user": user, + "status": "completed", + "added_files": added_files, + "removed_files": removed_files, + "chunks_added": added, + "chunks_deleted": deleted, + } + except Exception as e: + logging.error(f"Error while processing file changes: {e}", exc_info=True) + raise + + + except Exception as e: logging.error(f"Error in reingest_source_worker: {e}", exc_info=True) raise