| import re |
| from math import ceil |
| from typing import List |
|
|
| import tiktoken |
| from application.parser.schema.base import Document |
|
|
|
|
| def separate_header_and_body(text): |
| header_pattern = r"^(.*?\n){3}" |
| match = re.match(header_pattern, text) |
| header = match.group(0) |
| body = text[len(header):] |
| return header, body |
|
|
|
|
| def group_documents(documents: List[Document], min_tokens: int, max_tokens: int) -> List[Document]: |
| docs = [] |
| current_group = None |
|
|
| for doc in documents: |
| doc_len = len(tiktoken.get_encoding("cl100k_base").encode(doc.text)) |
|
|
| if current_group is None: |
| current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding, |
| extra_info=doc.extra_info) |
| elif len(tiktoken.get_encoding("cl100k_base").encode( |
| current_group.text)) + doc_len < max_tokens and doc_len < min_tokens: |
| current_group.text += " " + doc.text |
| else: |
| docs.append(current_group) |
| current_group = Document(text=doc.text, doc_id=doc.doc_id, embedding=doc.embedding, |
| extra_info=doc.extra_info) |
|
|
| if current_group is not None: |
| docs.append(current_group) |
|
|
| return docs |
|
|
|
|
| def split_documents(documents: List[Document], max_tokens: int) -> List[Document]: |
| docs = [] |
| for doc in documents: |
| token_length = len(tiktoken.get_encoding("cl100k_base").encode(doc.text)) |
| if token_length <= max_tokens: |
| docs.append(doc) |
| else: |
| header, body = separate_header_and_body(doc.text) |
| if len(tiktoken.get_encoding("cl100k_base").encode(header)) > max_tokens: |
| body = doc.text |
| header = "" |
| num_body_parts = ceil(token_length / max_tokens) |
| part_length = ceil(len(body) / num_body_parts) |
| body_parts = [body[i:i + part_length] for i in range(0, len(body), part_length)] |
| for i, body_part in enumerate(body_parts): |
| new_doc = Document(text=header + body_part.strip(), |
| doc_id=f"{doc.doc_id}-{i}", |
| embedding=doc.embedding, |
| extra_info=doc.extra_info) |
| docs.append(new_doc) |
| return docs |
|
|
|
|
| def group_split(documents: List[Document], max_tokens: int = 2000, min_tokens: int = 150, token_check: bool = True): |
| if not token_check: |
| return documents |
| print("Grouping small documents") |
| try: |
| documents = group_documents(documents=documents, min_tokens=min_tokens, max_tokens=max_tokens) |
| except Exception: |
| print("Grouping failed, try running without token_check") |
| print("Separating large documents") |
| try: |
| documents = split_documents(documents=documents, max_tokens=max_tokens) |
| except Exception: |
| print("Grouping failed, try running without token_check") |
| return documents |
|
|