mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 08:33:20 +00:00
Compare commits
1 Commits
dependabot
...
late-chunk
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a891647bf |
118
application/parser/chunking.py
Normal file
118
application/parser/chunking.py
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
import re
|
||||||
|
from typing import List, Tuple, Union
|
||||||
|
import logging
|
||||||
|
from application.parser.schema.base import Document
|
||||||
|
from application.utils import get_encoding
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class Chunker:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
chunking_strategy: str = "classic_chunk",
|
||||||
|
max_tokens: int = 2000,
|
||||||
|
min_tokens: int = 150,
|
||||||
|
duplicate_headers: bool = False,
|
||||||
|
):
|
||||||
|
if chunking_strategy not in ["classic_chunk"]:
|
||||||
|
raise ValueError(f"Unsupported chunking strategy: {chunking_strategy}")
|
||||||
|
self.chunking_strategy = chunking_strategy
|
||||||
|
self.max_tokens = max_tokens
|
||||||
|
self.min_tokens = min_tokens
|
||||||
|
self.duplicate_headers = duplicate_headers
|
||||||
|
self.encoding = get_encoding()
|
||||||
|
|
||||||
|
def separate_header_and_body(self, text: str) -> Tuple[str, str]:
|
||||||
|
header_pattern = r"^(.*?\n){3}"
|
||||||
|
match = re.match(header_pattern, text)
|
||||||
|
if match:
|
||||||
|
header = match.group(0)
|
||||||
|
body = text[len(header):]
|
||||||
|
else:
|
||||||
|
header, body = "", text # No header, treat entire text as body
|
||||||
|
return header, body
|
||||||
|
|
||||||
|
def combine_documents(self, doc: Document, next_doc: Document) -> Document:
|
||||||
|
combined_text = doc.text + " " + next_doc.text
|
||||||
|
combined_token_count = len(self.encoding.encode(combined_text))
|
||||||
|
new_doc = Document(
|
||||||
|
text=combined_text,
|
||||||
|
doc_id=doc.doc_id,
|
||||||
|
embedding=doc.embedding,
|
||||||
|
extra_info={**(doc.extra_info or {}), "token_count": combined_token_count}
|
||||||
|
)
|
||||||
|
return new_doc
|
||||||
|
|
||||||
|
def split_document(self, doc: Document) -> List[Document]:
|
||||||
|
split_docs = []
|
||||||
|
header, body = self.separate_header_and_body(doc.text)
|
||||||
|
header_tokens = self.encoding.encode(header) if header else []
|
||||||
|
body_tokens = self.encoding.encode(body)
|
||||||
|
|
||||||
|
current_position = 0
|
||||||
|
part_index = 0
|
||||||
|
while current_position < len(body_tokens):
|
||||||
|
end_position = current_position + self.max_tokens - len(header_tokens)
|
||||||
|
chunk_tokens = (header_tokens + body_tokens[current_position:end_position]
|
||||||
|
if self.duplicate_headers or part_index == 0 else body_tokens[current_position:end_position])
|
||||||
|
chunk_text = self.encoding.decode(chunk_tokens)
|
||||||
|
new_doc = Document(
|
||||||
|
text=chunk_text,
|
||||||
|
doc_id=f"{doc.doc_id}-{part_index}",
|
||||||
|
embedding=doc.embedding,
|
||||||
|
extra_info={**(doc.extra_info or {}), "token_count": len(chunk_tokens)}
|
||||||
|
)
|
||||||
|
split_docs.append(new_doc)
|
||||||
|
current_position = end_position
|
||||||
|
part_index += 1
|
||||||
|
header_tokens = []
|
||||||
|
return split_docs
|
||||||
|
|
||||||
|
def classic_chunk(self, documents: List[Document]) -> List[Document]:
|
||||||
|
processed_docs = []
|
||||||
|
i = 0
|
||||||
|
while i < len(documents):
|
||||||
|
doc = documents[i]
|
||||||
|
tokens = self.encoding.encode(doc.text)
|
||||||
|
token_count = len(tokens)
|
||||||
|
|
||||||
|
if self.min_tokens <= token_count <= self.max_tokens:
|
||||||
|
doc.extra_info = doc.extra_info or {}
|
||||||
|
doc.extra_info["token_count"] = token_count
|
||||||
|
processed_docs.append(doc)
|
||||||
|
i += 1
|
||||||
|
elif token_count < self.min_tokens:
|
||||||
|
if i + 1 < len(documents):
|
||||||
|
next_doc = documents[i + 1]
|
||||||
|
next_tokens = self.encoding.encode(next_doc.text)
|
||||||
|
if token_count + len(next_tokens) <= self.max_tokens:
|
||||||
|
# Combine small documents
|
||||||
|
combined_doc = self.combine_documents(doc, next_doc)
|
||||||
|
processed_docs.append(combined_doc)
|
||||||
|
i += 2
|
||||||
|
else:
|
||||||
|
# Keep the small document as is if adding next_doc would exceed max_tokens
|
||||||
|
doc.extra_info = doc.extra_info or {}
|
||||||
|
doc.extra_info["token_count"] = token_count
|
||||||
|
processed_docs.append(doc)
|
||||||
|
i += 1
|
||||||
|
else:
|
||||||
|
# No next document to combine with; add the small document as is
|
||||||
|
doc.extra_info = doc.extra_info or {}
|
||||||
|
doc.extra_info["token_count"] = token_count
|
||||||
|
processed_docs.append(doc)
|
||||||
|
i += 1
|
||||||
|
else:
|
||||||
|
# Split large documents
|
||||||
|
processed_docs.extend(self.split_document(doc))
|
||||||
|
i += 1
|
||||||
|
return processed_docs
|
||||||
|
|
||||||
|
def chunk(
|
||||||
|
self,
|
||||||
|
documents: List[Document]
|
||||||
|
) -> List[Document]:
|
||||||
|
if self.chunking_strategy == "classic_chunk":
|
||||||
|
return self.classic_chunk(documents)
|
||||||
|
else:
|
||||||
|
raise ValueError("Unsupported chunking strategy")
|
||||||
86
application/parser/embedding_pipeline.py
Executable file
86
application/parser/embedding_pipeline.py
Executable file
@@ -0,0 +1,86 @@
|
|||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from retry import retry
|
||||||
|
from tqdm import tqdm
|
||||||
|
from application.core.settings import settings
|
||||||
|
from application.vectorstore.vector_creator import VectorCreator
|
||||||
|
|
||||||
|
|
||||||
|
@retry(tries=10, delay=60)
|
||||||
|
def add_text_to_store_with_retry(store, doc, source_id):
|
||||||
|
"""
|
||||||
|
Add a document's text and metadata to the vector store with retry logic.
|
||||||
|
Args:
|
||||||
|
store: The vector store object.
|
||||||
|
doc: The document to be added.
|
||||||
|
source_id: Unique identifier for the source.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
doc.metadata["source_id"] = str(source_id)
|
||||||
|
store.add_texts([doc.page_content], metadatas=[doc.metadata])
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to add document with retry: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def embed_and_store_documents(docs, folder_name, source_id, task_status):
|
||||||
|
"""
|
||||||
|
Embeds documents and stores them in a vector store.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
docs (list): List of documents to be embedded and stored.
|
||||||
|
folder_name (str): Directory to save the vector store.
|
||||||
|
source_id (str): Unique identifier for the source.
|
||||||
|
task_status: Task state manager for progress updates.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
# Ensure the folder exists
|
||||||
|
if not os.path.exists(folder_name):
|
||||||
|
os.makedirs(folder_name)
|
||||||
|
|
||||||
|
# Initialize vector store
|
||||||
|
if settings.VECTOR_STORE == "faiss":
|
||||||
|
docs_init = [docs.pop(0)]
|
||||||
|
store = VectorCreator.create_vectorstore(
|
||||||
|
settings.VECTOR_STORE,
|
||||||
|
docs_init=docs_init,
|
||||||
|
source_id=folder_name,
|
||||||
|
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
store = VectorCreator.create_vectorstore(
|
||||||
|
settings.VECTOR_STORE,
|
||||||
|
source_id=source_id,
|
||||||
|
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
|
||||||
|
)
|
||||||
|
store.delete_index()
|
||||||
|
|
||||||
|
total_docs = len(docs)
|
||||||
|
|
||||||
|
# Process and embed documents
|
||||||
|
for idx, doc in tqdm(
|
||||||
|
docs,
|
||||||
|
desc="Embedding 🦖",
|
||||||
|
unit="docs",
|
||||||
|
total=total_docs,
|
||||||
|
bar_format="{l_bar}{bar}| Time Left: {remaining}",
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
# Update task status for progress tracking
|
||||||
|
progress = int((idx / total_docs) * 100)
|
||||||
|
task_status.update_state(state="PROGRESS", meta={"current": progress})
|
||||||
|
|
||||||
|
# Add document to vector store
|
||||||
|
add_text_to_store_with_retry(store, doc, source_id)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error embedding document {idx}: {e}")
|
||||||
|
logging.info(f"Saving progress at document {idx} out of {total_docs}")
|
||||||
|
store.save_local(folder_name)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Save the vector store
|
||||||
|
if settings.VECTOR_STORE == "faiss":
|
||||||
|
store.save_local(folder_name)
|
||||||
|
logging.info("Vector store saved successfully.")
|
||||||
94
application/parser/late_chunking.py
Executable file
94
application/parser/late_chunking.py
Executable file
@@ -0,0 +1,94 @@
|
|||||||
|
from typing import List, Tuple, Union, Optional
|
||||||
|
from transformers import AutoTokenizer, AutoModel
|
||||||
|
from sentence_transformers import SentenceTransformer
|
||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from application.parser.schema.base import Document
|
||||||
|
|
||||||
|
|
||||||
|
class LateChunker:
|
||||||
|
def __init__(self, model_name: str, late_tokens: int = 1000, **model_kwargs):
|
||||||
|
"""
|
||||||
|
Initialize the LateChunker with a model, tokenizer, and late_tokens limit.
|
||||||
|
Supports both transformers and sentence-transformers models.
|
||||||
|
"""
|
||||||
|
self.late_tokens = late_tokens
|
||||||
|
self.model_name = model_name
|
||||||
|
|
||||||
|
# Load model based on type
|
||||||
|
if "sentence-transformers" in model_name:
|
||||||
|
self.model = SentenceTransformer(model_name, **model_kwargs)
|
||||||
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
|
||||||
|
self.wrapper_type = "sentence_transformers"
|
||||||
|
else:
|
||||||
|
self.model = AutoModel.from_pretrained(model_name, trust_remote_code=True, **model_kwargs)
|
||||||
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
|
||||||
|
self.wrapper_type = "transformers"
|
||||||
|
|
||||||
|
def tokenize_with_offsets(self, text: str):
|
||||||
|
"""Tokenize text and return tokens with character offsets."""
|
||||||
|
tokens = self.tokenizer.encode_plus(
|
||||||
|
text, return_offsets_mapping=True, add_special_tokens=False
|
||||||
|
)
|
||||||
|
return tokens["input_ids"], tokens["offset_mapping"]
|
||||||
|
|
||||||
|
def late_chunk_with_embeddings(
|
||||||
|
self, documents: List[Document]
|
||||||
|
) -> List[Tuple[str, List[Tuple[int, int]], List[float]]]:
|
||||||
|
"""
|
||||||
|
Combines documents into 'super chunks' that fit within `late_tokens` limit.
|
||||||
|
Outputs each super chunk with span annotations and embeddings.
|
||||||
|
"""
|
||||||
|
super_chunks = []
|
||||||
|
current_super_chunk_text = []
|
||||||
|
current_token_count = 0
|
||||||
|
span_annotations = []
|
||||||
|
|
||||||
|
for doc in documents:
|
||||||
|
doc_text = doc.text
|
||||||
|
input_ids, offsets = self.tokenize_with_offsets(doc_text)
|
||||||
|
doc_token_count = len(input_ids)
|
||||||
|
|
||||||
|
# Check if adding this document exceeds the late_tokens limit
|
||||||
|
if current_token_count + doc_token_count > self.late_tokens:
|
||||||
|
# Finalize the current super chunk
|
||||||
|
combined_text = " ".join(current_super_chunk_text)
|
||||||
|
embeddings = self.generate_embeddings(combined_text)
|
||||||
|
|
||||||
|
super_chunks.append((combined_text, span_annotations, embeddings))
|
||||||
|
|
||||||
|
# Reset for a new super chunk
|
||||||
|
current_super_chunk_text = []
|
||||||
|
span_annotations = []
|
||||||
|
current_token_count = 0
|
||||||
|
|
||||||
|
# Add document to the current super chunk
|
||||||
|
start_token = current_token_count
|
||||||
|
end_token = current_token_count + doc_token_count
|
||||||
|
span_annotations.append((start_token, end_token))
|
||||||
|
current_super_chunk_text.append(doc_text)
|
||||||
|
current_token_count = end_token
|
||||||
|
|
||||||
|
# Add the final super chunk if there are remaining documents
|
||||||
|
if current_super_chunk_text:
|
||||||
|
combined_text = " ".join(current_super_chunk_text)
|
||||||
|
embeddings = self.generate_embeddings(combined_text)
|
||||||
|
super_chunks.append((combined_text, span_annotations, embeddings))
|
||||||
|
|
||||||
|
return super_chunks
|
||||||
|
|
||||||
|
def generate_embeddings(self, text: str) -> List[float]:
|
||||||
|
"""Generate embeddings for a given text using the loaded model."""
|
||||||
|
if self.wrapper_type == "sentence_transformers":
|
||||||
|
# Sentence-Transformers
|
||||||
|
embeddings = self.model.encode([text])
|
||||||
|
return embeddings[0].tolist()
|
||||||
|
|
||||||
|
elif self.wrapper_type == "transformers":
|
||||||
|
# Transformers models
|
||||||
|
inputs = self.tokenizer(text, return_tensors="pt")
|
||||||
|
model_output = self.model(**inputs)
|
||||||
|
return model_output.last_hidden_state.mean(dim=1).squeeze().tolist()
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError("Unsupported model type for embedding generation.")
|
||||||
Reference in New Issue
Block a user