mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 08:33:20 +00:00
(feat:reingest) eat and spit specific chunks
This commit is contained in:
@@ -3439,20 +3439,6 @@ class GetChunks(Resource):
|
|||||||
for chunk in chunks:
|
for chunk in chunks:
|
||||||
metadata = chunk.get("metadata", {})
|
metadata = chunk.get("metadata", {})
|
||||||
|
|
||||||
if path:
|
|
||||||
source = metadata.get("source", "")
|
|
||||||
path_match = False
|
|
||||||
|
|
||||||
if isinstance(source, str) and source.endswith(path):
|
|
||||||
path_match = True
|
|
||||||
elif isinstance(source, list):
|
|
||||||
for src in source:
|
|
||||||
if isinstance(src, str) and src.endswith(path):
|
|
||||||
path_match = True
|
|
||||||
break
|
|
||||||
|
|
||||||
if not path_match:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if search_term:
|
if search_term:
|
||||||
text_match = search_term in chunk.get("text", "").lower()
|
text_match = search_term in chunk.get("text", "").lower()
|
||||||
|
|||||||
@@ -338,8 +338,9 @@ def ingest_worker(
|
|||||||
|
|
||||||
def reingest_source_worker(self, source_id, user):
|
def reingest_source_worker(self, source_id, user):
|
||||||
"""
|
"""
|
||||||
Re-ingestion worker that scans the source and determines what needs to be re-ingested.
|
Re-ingestion worker that handles incremental updates by:
|
||||||
This is decoupled from the file management operations.
|
1. Adding chunks from newly added files
|
||||||
|
2. Removing chunks from deleted files
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
self: Task instance
|
self: Task instance
|
||||||
@@ -351,18 +352,15 @@ def reingest_source_worker(self, source_id, user):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
from application.vectorstore.vector_creator import VectorCreator
|
from application.vectorstore.vector_creator import VectorCreator
|
||||||
from application.utils import num_tokens_from_string
|
|
||||||
|
|
||||||
self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing re-ingestion scan"})
|
self.update_state(state="PROGRESS", meta={"current": 10, "status": "Initializing re-ingestion scan"})
|
||||||
|
|
||||||
|
|
||||||
source = sources_collection.find_one({"_id": ObjectId(source_id), "user": user})
|
source = sources_collection.find_one({"_id": ObjectId(source_id), "user": user})
|
||||||
if not source:
|
if not source:
|
||||||
raise ValueError(f"Source {source_id} not found or access denied")
|
raise ValueError(f"Source {source_id} not found or access denied")
|
||||||
|
|
||||||
storage = StorageCreator.get_storage()
|
storage = StorageCreator.get_storage()
|
||||||
source_file_path = source.get("file_path", "")
|
source_file_path = source.get("file_path", "")
|
||||||
job_name = source.get("name", "")
|
|
||||||
|
|
||||||
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Scanning current files"})
|
self.update_state(state="PROGRESS", meta={"current": 20, "status": "Scanning current files"})
|
||||||
|
|
||||||
@@ -401,59 +399,184 @@ def reingest_source_worker(self, source_id, user):
|
|||||||
exclude_hidden=True,
|
exclude_hidden=True,
|
||||||
file_metadata=metadata_from_filename,
|
file_metadata=metadata_from_filename,
|
||||||
)
|
)
|
||||||
|
reader.load_data()
|
||||||
|
directory_structure = reader.directory_structure
|
||||||
|
logging.info(f"Directory structure built with token counts: {directory_structure}")
|
||||||
|
|
||||||
raw_docs = reader.load_data()
|
try:
|
||||||
|
old_directory_structure = source.get("directory_structure") or {}
|
||||||
|
if isinstance(old_directory_structure, str):
|
||||||
|
try:
|
||||||
|
old_directory_structure = json.loads(old_directory_structure)
|
||||||
|
except Exception:
|
||||||
|
old_directory_structure = {}
|
||||||
|
|
||||||
directory_structure = getattr(reader, 'directory_structure', {})
|
def _flatten_directory_structure(struct, prefix=""):
|
||||||
logging.info(f"Directory structure from reader: {directory_structure}")
|
files = set()
|
||||||
|
if isinstance(struct, dict):
|
||||||
|
for name, meta in struct.items():
|
||||||
|
current_path = os.path.join(prefix, name) if prefix else name
|
||||||
|
if isinstance(meta, dict) and ("type" in meta and "size_bytes" in meta):
|
||||||
|
files.add(current_path)
|
||||||
|
elif isinstance(meta, dict):
|
||||||
|
files |= _flatten_directory_structure(meta, current_path)
|
||||||
|
return files
|
||||||
|
|
||||||
total_tokens = 0
|
old_files = _flatten_directory_structure(old_directory_structure)
|
||||||
for doc in raw_docs:
|
new_files = _flatten_directory_structure(directory_structure)
|
||||||
if hasattr(doc, 'extra_info') and 'token_count' in doc.extra_info:
|
|
||||||
total_tokens += doc.extra_info['token_count']
|
added_files = sorted(new_files - old_files)
|
||||||
|
removed_files = sorted(old_files - new_files)
|
||||||
|
|
||||||
|
if added_files:
|
||||||
|
logging.info(f"Files added since last ingest: {added_files}")
|
||||||
else:
|
else:
|
||||||
doc_text = str(doc.text) if hasattr(doc, 'text') else str(doc)
|
logging.info("No files added since last ingest.")
|
||||||
total_tokens += num_tokens_from_string(doc_text)
|
|
||||||
|
|
||||||
logging.info(f"Total tokens calculated: {total_tokens}")
|
if removed_files:
|
||||||
|
logging.info(f"Files removed since last ingest: {removed_files}")
|
||||||
|
else:
|
||||||
|
logging.info("No files removed since last ingest.")
|
||||||
|
|
||||||
self.update_state(state="PROGRESS", meta={"current": 50, "status": "Processing documents"})
|
except Exception as e:
|
||||||
|
logging.error(f"Error comparing directory structures: {e}", exc_info=True)
|
||||||
|
added_files = []
|
||||||
|
removed_files = []
|
||||||
|
try:
|
||||||
|
if not added_files and not removed_files:
|
||||||
|
logging.info("No changes detected.")
|
||||||
|
return {
|
||||||
|
"source_id": source_id,
|
||||||
|
"user": user,
|
||||||
|
"status": "no_changes",
|
||||||
|
"added_files": [],
|
||||||
|
"removed_files": [],
|
||||||
|
}
|
||||||
|
|
||||||
temp_vector_dir = os.path.join(temp_dir, "vector_store")
|
vector_store = VectorCreator.create_vectorstore(
|
||||||
os.makedirs(temp_vector_dir, exist_ok=True)
|
settings.VECTOR_STORE,
|
||||||
|
source_id,
|
||||||
|
settings.EMBEDDINGS_KEY,
|
||||||
|
)
|
||||||
|
|
||||||
chunker = Chunker(
|
self.update_state(state="PROGRESS", meta={"current": 40, "status": "Processing file changes"})
|
||||||
|
|
||||||
|
# 1) Delete chunks from removed files
|
||||||
|
deleted = 0
|
||||||
|
if removed_files:
|
||||||
|
try:
|
||||||
|
for ch in vector_store.get_chunks() or []:
|
||||||
|
metadata = ch.get("metadata", {}) if isinstance(ch, dict) else getattr(ch, "metadata", {})
|
||||||
|
raw_source = metadata.get("source")
|
||||||
|
|
||||||
|
source_file = str(raw_source) if raw_source else ""
|
||||||
|
|
||||||
|
if source_file in removed_files:
|
||||||
|
cid = ch.get("doc_id")
|
||||||
|
if cid:
|
||||||
|
try:
|
||||||
|
vector_store.delete_chunk(cid)
|
||||||
|
deleted += 1
|
||||||
|
except Exception as de:
|
||||||
|
logging.error(f"Failed deleting chunk {cid}: {de}")
|
||||||
|
logging.info(f"Deleted {deleted} chunks from {len(removed_files)} removed files")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error during deletion of removed file chunks: {e}", exc_info=True)
|
||||||
|
|
||||||
|
# 2) Add chunks from new files
|
||||||
|
added = 0
|
||||||
|
if added_files:
|
||||||
|
try:
|
||||||
|
# Build list of local files for added files only
|
||||||
|
added_local_files = []
|
||||||
|
for rel_path in added_files:
|
||||||
|
local_path = os.path.join(temp_dir, rel_path)
|
||||||
|
if os.path.isfile(local_path):
|
||||||
|
added_local_files.append(local_path)
|
||||||
|
|
||||||
|
if added_local_files:
|
||||||
|
reader_new = SimpleDirectoryReader(
|
||||||
|
input_files=added_local_files,
|
||||||
|
exclude_hidden=True,
|
||||||
|
errors="ignore",
|
||||||
|
file_metadata=metadata_from_filename,
|
||||||
|
)
|
||||||
|
raw_docs_new = reader_new.load_data()
|
||||||
|
chunker_new = Chunker(
|
||||||
chunking_strategy="classic_chunk",
|
chunking_strategy="classic_chunk",
|
||||||
max_tokens=MAX_TOKENS,
|
max_tokens=MAX_TOKENS,
|
||||||
min_tokens=MIN_TOKENS,
|
min_tokens=MIN_TOKENS,
|
||||||
duplicate_headers=False,
|
duplicate_headers=False,
|
||||||
)
|
)
|
||||||
chunked_docs = chunker.chunk(documents=raw_docs)
|
chunked_new = chunker_new.chunk(documents=raw_docs_new)
|
||||||
|
|
||||||
docs = [Document.to_langchain_format(doc) for doc in chunked_docs]
|
for file_path, token_count in reader_new.file_token_counts.items():
|
||||||
|
try:
|
||||||
|
rel_path = os.path.relpath(file_path, start=temp_dir)
|
||||||
|
path_parts = rel_path.split(os.sep)
|
||||||
|
current_dir = directory_structure
|
||||||
|
|
||||||
embed_and_store_documents(docs, temp_vector_dir, ObjectId(source_id), self)
|
for part in path_parts[:-1]:
|
||||||
|
if part in current_dir and isinstance(current_dir[part], dict):
|
||||||
|
current_dir = current_dir[part]
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
file_data = {
|
filename = path_parts[-1]
|
||||||
"name": source.get("name", ""),
|
if filename in current_dir and isinstance(current_dir[filename], dict):
|
||||||
"file": source.get("name", ""),
|
current_dir[filename]["token_count"] = token_count
|
||||||
"user": user,
|
logging.info(f"Updated token count for {rel_path}: {token_count}")
|
||||||
"tokens": total_tokens,
|
except Exception as e:
|
||||||
"retriever": source.get("retriever", "classic"),
|
logging.warning(f"Could not update token count for {file_path}: {e}")
|
||||||
"id": source_id,
|
|
||||||
"type": source.get("type", "local"),
|
for d in chunked_new:
|
||||||
"file_path": source_file_path,
|
meta = dict(d.extra_info or {})
|
||||||
|
try:
|
||||||
|
raw_src = meta.get("source")
|
||||||
|
if isinstance(raw_src, str) and os.path.isabs(raw_src):
|
||||||
|
meta["source"] = os.path.relpath(raw_src, start=temp_dir)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
vector_store.add_chunk(d.text, metadata=meta)
|
||||||
|
added += 1
|
||||||
|
logging.info(f"Added {added} chunks from {len(added_files)} new files")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error during ingestion of new files: {e}", exc_info=True)
|
||||||
|
|
||||||
|
# 3) Update source directory structure timestamp
|
||||||
|
try:
|
||||||
|
total_tokens = sum(reader.file_token_counts.values())
|
||||||
|
|
||||||
|
sources_collection.update_one(
|
||||||
|
{"_id": ObjectId(source_id)},
|
||||||
|
{
|
||||||
|
"$set": {
|
||||||
"directory_structure": json.dumps(directory_structure),
|
"directory_structure": json.dumps(directory_structure),
|
||||||
|
"date": datetime.datetime.now(),
|
||||||
|
"tokens": total_tokens
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error updating directory_structure in DB: {e}", exc_info=True)
|
||||||
|
|
||||||
upload_index(temp_vector_dir, file_data)
|
self.update_state(state="PROGRESS", meta={"current": 100, "status": "Re-ingestion completed"})
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"source_id": source_id,
|
"source_id": source_id,
|
||||||
"user": user,
|
"user": user,
|
||||||
"tokens": total_tokens,
|
"status": "completed",
|
||||||
"status": "completed"
|
"added_files": added_files,
|
||||||
|
"removed_files": removed_files,
|
||||||
|
"chunks_added": added,
|
||||||
|
"chunks_deleted": deleted,
|
||||||
}
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error while processing file changes: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error in reingest_source_worker: {e}", exc_info=True)
|
logging.error(f"Error in reingest_source_worker: {e}", exc_info=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user