mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 16:43:16 +00:00
Compare commits
1 Commits
fix/eslint
...
late-chunk
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a891647bf |
118
application/parser/chunking.py
Normal file
118
application/parser/chunking.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import re
|
||||
from typing import List, Tuple, Union
|
||||
import logging
|
||||
from application.parser.schema.base import Document
|
||||
from application.utils import get_encoding
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class Chunker:
|
||||
def __init__(
|
||||
self,
|
||||
chunking_strategy: str = "classic_chunk",
|
||||
max_tokens: int = 2000,
|
||||
min_tokens: int = 150,
|
||||
duplicate_headers: bool = False,
|
||||
):
|
||||
if chunking_strategy not in ["classic_chunk"]:
|
||||
raise ValueError(f"Unsupported chunking strategy: {chunking_strategy}")
|
||||
self.chunking_strategy = chunking_strategy
|
||||
self.max_tokens = max_tokens
|
||||
self.min_tokens = min_tokens
|
||||
self.duplicate_headers = duplicate_headers
|
||||
self.encoding = get_encoding()
|
||||
|
||||
def separate_header_and_body(self, text: str) -> Tuple[str, str]:
|
||||
header_pattern = r"^(.*?\n){3}"
|
||||
match = re.match(header_pattern, text)
|
||||
if match:
|
||||
header = match.group(0)
|
||||
body = text[len(header):]
|
||||
else:
|
||||
header, body = "", text # No header, treat entire text as body
|
||||
return header, body
|
||||
|
||||
def combine_documents(self, doc: Document, next_doc: Document) -> Document:
|
||||
combined_text = doc.text + " " + next_doc.text
|
||||
combined_token_count = len(self.encoding.encode(combined_text))
|
||||
new_doc = Document(
|
||||
text=combined_text,
|
||||
doc_id=doc.doc_id,
|
||||
embedding=doc.embedding,
|
||||
extra_info={**(doc.extra_info or {}), "token_count": combined_token_count}
|
||||
)
|
||||
return new_doc
|
||||
|
||||
def split_document(self, doc: Document) -> List[Document]:
|
||||
split_docs = []
|
||||
header, body = self.separate_header_and_body(doc.text)
|
||||
header_tokens = self.encoding.encode(header) if header else []
|
||||
body_tokens = self.encoding.encode(body)
|
||||
|
||||
current_position = 0
|
||||
part_index = 0
|
||||
while current_position < len(body_tokens):
|
||||
end_position = current_position + self.max_tokens - len(header_tokens)
|
||||
chunk_tokens = (header_tokens + body_tokens[current_position:end_position]
|
||||
if self.duplicate_headers or part_index == 0 else body_tokens[current_position:end_position])
|
||||
chunk_text = self.encoding.decode(chunk_tokens)
|
||||
new_doc = Document(
|
||||
text=chunk_text,
|
||||
doc_id=f"{doc.doc_id}-{part_index}",
|
||||
embedding=doc.embedding,
|
||||
extra_info={**(doc.extra_info or {}), "token_count": len(chunk_tokens)}
|
||||
)
|
||||
split_docs.append(new_doc)
|
||||
current_position = end_position
|
||||
part_index += 1
|
||||
header_tokens = []
|
||||
return split_docs
|
||||
|
||||
def classic_chunk(self, documents: List[Document]) -> List[Document]:
|
||||
processed_docs = []
|
||||
i = 0
|
||||
while i < len(documents):
|
||||
doc = documents[i]
|
||||
tokens = self.encoding.encode(doc.text)
|
||||
token_count = len(tokens)
|
||||
|
||||
if self.min_tokens <= token_count <= self.max_tokens:
|
||||
doc.extra_info = doc.extra_info or {}
|
||||
doc.extra_info["token_count"] = token_count
|
||||
processed_docs.append(doc)
|
||||
i += 1
|
||||
elif token_count < self.min_tokens:
|
||||
if i + 1 < len(documents):
|
||||
next_doc = documents[i + 1]
|
||||
next_tokens = self.encoding.encode(next_doc.text)
|
||||
if token_count + len(next_tokens) <= self.max_tokens:
|
||||
# Combine small documents
|
||||
combined_doc = self.combine_documents(doc, next_doc)
|
||||
processed_docs.append(combined_doc)
|
||||
i += 2
|
||||
else:
|
||||
# Keep the small document as is if adding next_doc would exceed max_tokens
|
||||
doc.extra_info = doc.extra_info or {}
|
||||
doc.extra_info["token_count"] = token_count
|
||||
processed_docs.append(doc)
|
||||
i += 1
|
||||
else:
|
||||
# No next document to combine with; add the small document as is
|
||||
doc.extra_info = doc.extra_info or {}
|
||||
doc.extra_info["token_count"] = token_count
|
||||
processed_docs.append(doc)
|
||||
i += 1
|
||||
else:
|
||||
# Split large documents
|
||||
processed_docs.extend(self.split_document(doc))
|
||||
i += 1
|
||||
return processed_docs
|
||||
|
||||
def chunk(
|
||||
self,
|
||||
documents: List[Document]
|
||||
) -> List[Document]:
|
||||
if self.chunking_strategy == "classic_chunk":
|
||||
return self.classic_chunk(documents)
|
||||
else:
|
||||
raise ValueError("Unsupported chunking strategy")
|
||||
86
application/parser/embedding_pipeline.py
Executable file
86
application/parser/embedding_pipeline.py
Executable file
@@ -0,0 +1,86 @@
|
||||
import os
|
||||
import logging
|
||||
from retry import retry
|
||||
from tqdm import tqdm
|
||||
from application.core.settings import settings
|
||||
from application.vectorstore.vector_creator import VectorCreator
|
||||
|
||||
|
||||
@retry(tries=10, delay=60)
|
||||
def add_text_to_store_with_retry(store, doc, source_id):
|
||||
"""
|
||||
Add a document's text and metadata to the vector store with retry logic.
|
||||
Args:
|
||||
store: The vector store object.
|
||||
doc: The document to be added.
|
||||
source_id: Unique identifier for the source.
|
||||
"""
|
||||
try:
|
||||
doc.metadata["source_id"] = str(source_id)
|
||||
store.add_texts([doc.page_content], metadatas=[doc.metadata])
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to add document with retry: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def embed_and_store_documents(docs, folder_name, source_id, task_status):
|
||||
"""
|
||||
Embeds documents and stores them in a vector store.
|
||||
|
||||
Args:
|
||||
docs (list): List of documents to be embedded and stored.
|
||||
folder_name (str): Directory to save the vector store.
|
||||
source_id (str): Unique identifier for the source.
|
||||
task_status: Task state manager for progress updates.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
# Ensure the folder exists
|
||||
if not os.path.exists(folder_name):
|
||||
os.makedirs(folder_name)
|
||||
|
||||
# Initialize vector store
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
docs_init = [docs.pop(0)]
|
||||
store = VectorCreator.create_vectorstore(
|
||||
settings.VECTOR_STORE,
|
||||
docs_init=docs_init,
|
||||
source_id=folder_name,
|
||||
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
|
||||
)
|
||||
else:
|
||||
store = VectorCreator.create_vectorstore(
|
||||
settings.VECTOR_STORE,
|
||||
source_id=source_id,
|
||||
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
|
||||
)
|
||||
store.delete_index()
|
||||
|
||||
total_docs = len(docs)
|
||||
|
||||
# Process and embed documents
|
||||
for idx, doc in tqdm(
|
||||
docs,
|
||||
desc="Embedding 🦖",
|
||||
unit="docs",
|
||||
total=total_docs,
|
||||
bar_format="{l_bar}{bar}| Time Left: {remaining}",
|
||||
):
|
||||
try:
|
||||
# Update task status for progress tracking
|
||||
progress = int((idx / total_docs) * 100)
|
||||
task_status.update_state(state="PROGRESS", meta={"current": progress})
|
||||
|
||||
# Add document to vector store
|
||||
add_text_to_store_with_retry(store, doc, source_id)
|
||||
except Exception as e:
|
||||
logging.error(f"Error embedding document {idx}: {e}")
|
||||
logging.info(f"Saving progress at document {idx} out of {total_docs}")
|
||||
store.save_local(folder_name)
|
||||
break
|
||||
|
||||
# Save the vector store
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
store.save_local(folder_name)
|
||||
logging.info("Vector store saved successfully.")
|
||||
94
application/parser/late_chunking.py
Executable file
94
application/parser/late_chunking.py
Executable file
@@ -0,0 +1,94 @@
|
||||
from typing import List, Tuple, Union, Optional
|
||||
from transformers import AutoTokenizer, AutoModel
|
||||
from sentence_transformers import SentenceTransformer
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from application.parser.schema.base import Document
|
||||
|
||||
|
||||
class LateChunker:
|
||||
def __init__(self, model_name: str, late_tokens: int = 1000, **model_kwargs):
|
||||
"""
|
||||
Initialize the LateChunker with a model, tokenizer, and late_tokens limit.
|
||||
Supports both transformers and sentence-transformers models.
|
||||
"""
|
||||
self.late_tokens = late_tokens
|
||||
self.model_name = model_name
|
||||
|
||||
# Load model based on type
|
||||
if "sentence-transformers" in model_name:
|
||||
self.model = SentenceTransformer(model_name, **model_kwargs)
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
|
||||
self.wrapper_type = "sentence_transformers"
|
||||
else:
|
||||
self.model = AutoModel.from_pretrained(model_name, trust_remote_code=True, **model_kwargs)
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
|
||||
self.wrapper_type = "transformers"
|
||||
|
||||
def tokenize_with_offsets(self, text: str):
|
||||
"""Tokenize text and return tokens with character offsets."""
|
||||
tokens = self.tokenizer.encode_plus(
|
||||
text, return_offsets_mapping=True, add_special_tokens=False
|
||||
)
|
||||
return tokens["input_ids"], tokens["offset_mapping"]
|
||||
|
||||
def late_chunk_with_embeddings(
|
||||
self, documents: List[Document]
|
||||
) -> List[Tuple[str, List[Tuple[int, int]], List[float]]]:
|
||||
"""
|
||||
Combines documents into 'super chunks' that fit within `late_tokens` limit.
|
||||
Outputs each super chunk with span annotations and embeddings.
|
||||
"""
|
||||
super_chunks = []
|
||||
current_super_chunk_text = []
|
||||
current_token_count = 0
|
||||
span_annotations = []
|
||||
|
||||
for doc in documents:
|
||||
doc_text = doc.text
|
||||
input_ids, offsets = self.tokenize_with_offsets(doc_text)
|
||||
doc_token_count = len(input_ids)
|
||||
|
||||
# Check if adding this document exceeds the late_tokens limit
|
||||
if current_token_count + doc_token_count > self.late_tokens:
|
||||
# Finalize the current super chunk
|
||||
combined_text = " ".join(current_super_chunk_text)
|
||||
embeddings = self.generate_embeddings(combined_text)
|
||||
|
||||
super_chunks.append((combined_text, span_annotations, embeddings))
|
||||
|
||||
# Reset for a new super chunk
|
||||
current_super_chunk_text = []
|
||||
span_annotations = []
|
||||
current_token_count = 0
|
||||
|
||||
# Add document to the current super chunk
|
||||
start_token = current_token_count
|
||||
end_token = current_token_count + doc_token_count
|
||||
span_annotations.append((start_token, end_token))
|
||||
current_super_chunk_text.append(doc_text)
|
||||
current_token_count = end_token
|
||||
|
||||
# Add the final super chunk if there are remaining documents
|
||||
if current_super_chunk_text:
|
||||
combined_text = " ".join(current_super_chunk_text)
|
||||
embeddings = self.generate_embeddings(combined_text)
|
||||
super_chunks.append((combined_text, span_annotations, embeddings))
|
||||
|
||||
return super_chunks
|
||||
|
||||
def generate_embeddings(self, text: str) -> List[float]:
|
||||
"""Generate embeddings for a given text using the loaded model."""
|
||||
if self.wrapper_type == "sentence_transformers":
|
||||
# Sentence-Transformers
|
||||
embeddings = self.model.encode([text])
|
||||
return embeddings[0].tolist()
|
||||
|
||||
elif self.wrapper_type == "transformers":
|
||||
# Transformers models
|
||||
inputs = self.tokenizer(text, return_tensors="pt")
|
||||
model_output = self.model(**inputs)
|
||||
return model_output.last_hidden_state.mean(dim=1).squeeze().tolist()
|
||||
|
||||
else:
|
||||
raise ValueError("Unsupported model type for embedding generation.")
|
||||
Reference in New Issue
Block a user