diff --git a/application/parser/chunking.py b/application/parser/chunking.py new file mode 100644 index 00000000..26f05dba --- /dev/null +++ b/application/parser/chunking.py @@ -0,0 +1,118 @@ +import re +from typing import List, Tuple, Union +import logging +from application.parser.schema.base import Document +from application.utils import get_encoding + +logger = logging.getLogger(__name__) + +class Chunker: + def __init__( + self, + chunking_strategy: str = "classic_chunk", + max_tokens: int = 2000, + min_tokens: int = 150, + duplicate_headers: bool = False, + ): + if chunking_strategy not in ["classic_chunk"]: + raise ValueError(f"Unsupported chunking strategy: {chunking_strategy}") + self.chunking_strategy = chunking_strategy + self.max_tokens = max_tokens + self.min_tokens = min_tokens + self.duplicate_headers = duplicate_headers + self.encoding = get_encoding() + + def separate_header_and_body(self, text: str) -> Tuple[str, str]: + header_pattern = r"^(.*?\n){3}" + match = re.match(header_pattern, text) + if match: + header = match.group(0) + body = text[len(header):] + else: + header, body = "", text # No header, treat entire text as body + return header, body + + def combine_documents(self, doc: Document, next_doc: Document) -> Document: + combined_text = doc.text + " " + next_doc.text + combined_token_count = len(self.encoding.encode(combined_text)) + new_doc = Document( + text=combined_text, + doc_id=doc.doc_id, + embedding=doc.embedding, + extra_info={**(doc.extra_info or {}), "token_count": combined_token_count} + ) + return new_doc + + def split_document(self, doc: Document) -> List[Document]: + split_docs = [] + header, body = self.separate_header_and_body(doc.text) + header_tokens = self.encoding.encode(header) if header else [] + body_tokens = self.encoding.encode(body) + + current_position = 0 + part_index = 0 + while current_position < len(body_tokens): + end_position = current_position + self.max_tokens - len(header_tokens) + chunk_tokens = (header_tokens + body_tokens[current_position:end_position] + if self.duplicate_headers or part_index == 0 else body_tokens[current_position:end_position]) + chunk_text = self.encoding.decode(chunk_tokens) + new_doc = Document( + text=chunk_text, + doc_id=f"{doc.doc_id}-{part_index}", + embedding=doc.embedding, + extra_info={**(doc.extra_info or {}), "token_count": len(chunk_tokens)} + ) + split_docs.append(new_doc) + current_position = end_position + part_index += 1 + header_tokens = [] + return split_docs + + def classic_chunk(self, documents: List[Document]) -> List[Document]: + processed_docs = [] + i = 0 + while i < len(documents): + doc = documents[i] + tokens = self.encoding.encode(doc.text) + token_count = len(tokens) + + if self.min_tokens <= token_count <= self.max_tokens: + doc.extra_info = doc.extra_info or {} + doc.extra_info["token_count"] = token_count + processed_docs.append(doc) + i += 1 + elif token_count < self.min_tokens: + if i + 1 < len(documents): + next_doc = documents[i + 1] + next_tokens = self.encoding.encode(next_doc.text) + if token_count + len(next_tokens) <= self.max_tokens: + # Combine small documents + combined_doc = self.combine_documents(doc, next_doc) + processed_docs.append(combined_doc) + i += 2 + else: + # Keep the small document as is if adding next_doc would exceed max_tokens + doc.extra_info = doc.extra_info or {} + doc.extra_info["token_count"] = token_count + processed_docs.append(doc) + i += 1 + else: + # No next document to combine with; add the small document as is + doc.extra_info = doc.extra_info or {} + doc.extra_info["token_count"] = token_count + processed_docs.append(doc) + i += 1 + else: + # Split large documents + processed_docs.extend(self.split_document(doc)) + i += 1 + return processed_docs + + def chunk( + self, + documents: List[Document] + ) -> List[Document]: + if self.chunking_strategy == "classic_chunk": + return self.classic_chunk(documents) + else: + raise ValueError("Unsupported chunking strategy") diff --git a/application/parser/embedding_pipeline.py b/application/parser/embedding_pipeline.py new file mode 100755 index 00000000..6cf40048 --- /dev/null +++ b/application/parser/embedding_pipeline.py @@ -0,0 +1,86 @@ +import os +import logging +from retry import retry +from tqdm import tqdm +from application.core.settings import settings +from application.vectorstore.vector_creator import VectorCreator + + +@retry(tries=10, delay=60) +def add_text_to_store_with_retry(store, doc, source_id): + """ + Add a document's text and metadata to the vector store with retry logic. + Args: + store: The vector store object. + doc: The document to be added. + source_id: Unique identifier for the source. + """ + try: + doc.metadata["source_id"] = str(source_id) + store.add_texts([doc.page_content], metadatas=[doc.metadata]) + except Exception as e: + logging.error(f"Failed to add document with retry: {e}") + raise + + +def embed_and_store_documents(docs, folder_name, source_id, task_status): + """ + Embeds documents and stores them in a vector store. + + Args: + docs (list): List of documents to be embedded and stored. + folder_name (str): Directory to save the vector store. + source_id (str): Unique identifier for the source. + task_status: Task state manager for progress updates. + + Returns: + None + """ + # Ensure the folder exists + if not os.path.exists(folder_name): + os.makedirs(folder_name) + + # Initialize vector store + if settings.VECTOR_STORE == "faiss": + docs_init = [docs.pop(0)] + store = VectorCreator.create_vectorstore( + settings.VECTOR_STORE, + docs_init=docs_init, + source_id=folder_name, + embeddings_key=os.getenv("EMBEDDINGS_KEY"), + ) + else: + store = VectorCreator.create_vectorstore( + settings.VECTOR_STORE, + source_id=source_id, + embeddings_key=os.getenv("EMBEDDINGS_KEY"), + ) + store.delete_index() + + total_docs = len(docs) + + # Process and embed documents + for idx, doc in tqdm( + docs, + desc="Embedding 🦖", + unit="docs", + total=total_docs, + bar_format="{l_bar}{bar}| Time Left: {remaining}", + ): + try: + # Update task status for progress tracking + progress = int((idx / total_docs) * 100) + task_status.update_state(state="PROGRESS", meta={"current": progress}) + + # Add document to vector store + add_text_to_store_with_retry(store, doc, source_id) + except Exception as e: + logging.error(f"Error embedding document {idx}: {e}") + logging.info(f"Saving progress at document {idx} out of {total_docs}") + store.save_local(folder_name) + break + + # Save the vector store + if settings.VECTOR_STORE == "faiss": + store.save_local(folder_name) + logging.info("Vector store saved successfully.") diff --git a/application/parser/late_chunking.py b/application/parser/late_chunking.py new file mode 100755 index 00000000..ca3caa0c --- /dev/null +++ b/application/parser/late_chunking.py @@ -0,0 +1,94 @@ +from typing import List, Tuple, Union, Optional +from transformers import AutoTokenizer, AutoModel +from sentence_transformers import SentenceTransformer +import torch +import torch.nn as nn +from application.parser.schema.base import Document + + +class LateChunker: + def __init__(self, model_name: str, late_tokens: int = 1000, **model_kwargs): + """ + Initialize the LateChunker with a model, tokenizer, and late_tokens limit. + Supports both transformers and sentence-transformers models. + """ + self.late_tokens = late_tokens + self.model_name = model_name + + # Load model based on type + if "sentence-transformers" in model_name: + self.model = SentenceTransformer(model_name, **model_kwargs) + self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) + self.wrapper_type = "sentence_transformers" + else: + self.model = AutoModel.from_pretrained(model_name, trust_remote_code=True, **model_kwargs) + self.tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) + self.wrapper_type = "transformers" + + def tokenize_with_offsets(self, text: str): + """Tokenize text and return tokens with character offsets.""" + tokens = self.tokenizer.encode_plus( + text, return_offsets_mapping=True, add_special_tokens=False + ) + return tokens["input_ids"], tokens["offset_mapping"] + + def late_chunk_with_embeddings( + self, documents: List[Document] + ) -> List[Tuple[str, List[Tuple[int, int]], List[float]]]: + """ + Combines documents into 'super chunks' that fit within `late_tokens` limit. + Outputs each super chunk with span annotations and embeddings. + """ + super_chunks = [] + current_super_chunk_text = [] + current_token_count = 0 + span_annotations = [] + + for doc in documents: + doc_text = doc.text + input_ids, offsets = self.tokenize_with_offsets(doc_text) + doc_token_count = len(input_ids) + + # Check if adding this document exceeds the late_tokens limit + if current_token_count + doc_token_count > self.late_tokens: + # Finalize the current super chunk + combined_text = " ".join(current_super_chunk_text) + embeddings = self.generate_embeddings(combined_text) + + super_chunks.append((combined_text, span_annotations, embeddings)) + + # Reset for a new super chunk + current_super_chunk_text = [] + span_annotations = [] + current_token_count = 0 + + # Add document to the current super chunk + start_token = current_token_count + end_token = current_token_count + doc_token_count + span_annotations.append((start_token, end_token)) + current_super_chunk_text.append(doc_text) + current_token_count = end_token + + # Add the final super chunk if there are remaining documents + if current_super_chunk_text: + combined_text = " ".join(current_super_chunk_text) + embeddings = self.generate_embeddings(combined_text) + super_chunks.append((combined_text, span_annotations, embeddings)) + + return super_chunks + + def generate_embeddings(self, text: str) -> List[float]: + """Generate embeddings for a given text using the loaded model.""" + if self.wrapper_type == "sentence_transformers": + # Sentence-Transformers + embeddings = self.model.encode([text]) + return embeddings[0].tolist() + + elif self.wrapper_type == "transformers": + # Transformers models + inputs = self.tokenizer(text, return_tensors="pt") + model_output = self.model(**inputs) + return model_output.last_hidden_state.mean(dim=1).squeeze().tolist() + + else: + raise ValueError("Unsupported model type for embedding generation.")