mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 00:23:17 +00:00
254 lines
9.8 KiB
Python
254 lines
9.8 KiB
Python
"""Simple reader that reads files of different formats from a directory."""
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Callable, Dict, List, Optional, Union
|
|
|
|
from application.parser.file.base import BaseReader
|
|
from application.parser.file.base_parser import BaseParser
|
|
from application.parser.file.docs_parser import DocxParser, PDFParser
|
|
from application.parser.file.epub_parser import EpubParser
|
|
from application.parser.file.html_parser import HTMLParser
|
|
from application.parser.file.markdown_parser import MarkdownParser
|
|
from application.parser.file.rst_parser import RstParser
|
|
from application.parser.file.tabular_parser import PandasCSVParser,ExcelParser
|
|
from application.parser.file.json_parser import JSONParser
|
|
from application.parser.file.pptx_parser import PPTXParser
|
|
from application.parser.file.image_parser import ImageParser
|
|
from application.parser.schema.base import Document
|
|
from application.utils import num_tokens_from_string
|
|
|
|
DEFAULT_FILE_EXTRACTOR: Dict[str, BaseParser] = {
|
|
".pdf": PDFParser(),
|
|
".docx": DocxParser(),
|
|
".csv": PandasCSVParser(),
|
|
".xlsx":ExcelParser(),
|
|
".epub": EpubParser(),
|
|
".md": MarkdownParser(),
|
|
".rst": RstParser(),
|
|
".html": HTMLParser(),
|
|
".mdx": MarkdownParser(),
|
|
".json":JSONParser(),
|
|
".pptx":PPTXParser(),
|
|
".png": ImageParser(),
|
|
".jpg": ImageParser(),
|
|
".jpeg": ImageParser(),
|
|
}
|
|
|
|
|
|
class SimpleDirectoryReader(BaseReader):
|
|
"""Simple directory reader.
|
|
|
|
Can read files into separate documents, or concatenates
|
|
files into one document text.
|
|
|
|
Args:
|
|
input_dir (str): Path to the directory.
|
|
input_files (List): List of file paths to read (Optional; overrides input_dir)
|
|
exclude_hidden (bool): Whether to exclude hidden files (dotfiles).
|
|
errors (str): how encoding and decoding errors are to be handled,
|
|
see https://docs.python.org/3/library/functions.html#open
|
|
recursive (bool): Whether to recursively search in subdirectories.
|
|
False by default.
|
|
required_exts (Optional[List[str]]): List of required extensions.
|
|
Default is None.
|
|
file_extractor (Optional[Dict[str, BaseParser]]): A mapping of file
|
|
extension to a BaseParser class that specifies how to convert that file
|
|
to text. See DEFAULT_FILE_EXTRACTOR.
|
|
num_files_limit (Optional[int]): Maximum number of files to read.
|
|
Default is None.
|
|
file_metadata (Optional[Callable[str, Dict]]): A function that takes
|
|
in a filename and returns a Dict of metadata for the Document.
|
|
Default is None.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
input_dir: Optional[str] = None,
|
|
input_files: Optional[List] = None,
|
|
exclude_hidden: bool = True,
|
|
errors: str = "ignore",
|
|
recursive: bool = True,
|
|
required_exts: Optional[List[str]] = None,
|
|
file_extractor: Optional[Dict[str, BaseParser]] = None,
|
|
num_files_limit: Optional[int] = None,
|
|
file_metadata: Optional[Callable[[str], Dict]] = None,
|
|
) -> None:
|
|
"""Initialize with parameters."""
|
|
super().__init__()
|
|
|
|
if not input_dir and not input_files:
|
|
raise ValueError("Must provide either `input_dir` or `input_files`.")
|
|
|
|
self.errors = errors
|
|
|
|
self.recursive = recursive
|
|
self.exclude_hidden = exclude_hidden
|
|
self.required_exts = required_exts
|
|
self.num_files_limit = num_files_limit
|
|
|
|
if input_files:
|
|
self.input_files = []
|
|
for path in input_files:
|
|
print(path)
|
|
input_file = Path(path)
|
|
self.input_files.append(input_file)
|
|
elif input_dir:
|
|
self.input_dir = Path(input_dir)
|
|
self.input_files = self._add_files(self.input_dir)
|
|
|
|
self.file_extractor = file_extractor or DEFAULT_FILE_EXTRACTOR
|
|
self.file_metadata = file_metadata
|
|
|
|
def _add_files(self, input_dir: Path) -> List[Path]:
|
|
"""Add files."""
|
|
input_files = sorted(input_dir.iterdir())
|
|
new_input_files = []
|
|
dirs_to_explore = []
|
|
for input_file in input_files:
|
|
if input_file.is_dir():
|
|
if self.recursive:
|
|
dirs_to_explore.append(input_file)
|
|
elif self.exclude_hidden and input_file.name.startswith("."):
|
|
continue
|
|
elif (
|
|
self.required_exts is not None
|
|
and input_file.suffix not in self.required_exts
|
|
):
|
|
continue
|
|
else:
|
|
new_input_files.append(input_file)
|
|
|
|
for dir_to_explore in dirs_to_explore:
|
|
sub_input_files = self._add_files(dir_to_explore)
|
|
new_input_files.extend(sub_input_files)
|
|
|
|
if self.num_files_limit is not None and self.num_files_limit > 0:
|
|
new_input_files = new_input_files[0: self.num_files_limit]
|
|
|
|
# print total number of files added
|
|
logging.debug(
|
|
f"> [SimpleDirectoryReader] Total files added: {len(new_input_files)}"
|
|
)
|
|
|
|
return new_input_files
|
|
|
|
def load_data(self, concatenate: bool = False) -> List[Document]:
|
|
"""Load data from the input directory.
|
|
|
|
Args:
|
|
concatenate (bool): whether to concatenate all files into one document.
|
|
If set to True, file metadata is ignored.
|
|
False by default.
|
|
|
|
Returns:
|
|
List[Document]: A list of documents.
|
|
"""
|
|
data: Union[str, List[str]] = ""
|
|
data_list: List[str] = []
|
|
metadata_list = []
|
|
self.file_token_counts = {}
|
|
|
|
for input_file in self.input_files:
|
|
if input_file.suffix in self.file_extractor:
|
|
parser = self.file_extractor[input_file.suffix]
|
|
if not parser.parser_config_set:
|
|
parser.init_parser()
|
|
data = parser.parse_file(input_file, errors=self.errors)
|
|
else:
|
|
# do standard read
|
|
with open(input_file, "r", errors=self.errors) as f:
|
|
data = f.read()
|
|
|
|
# Calculate token count for this file
|
|
if isinstance(data, List):
|
|
file_tokens = sum(num_tokens_from_string(str(d)) for d in data)
|
|
else:
|
|
file_tokens = num_tokens_from_string(str(data))
|
|
|
|
full_path = str(input_file.resolve())
|
|
self.file_token_counts[full_path] = file_tokens
|
|
|
|
base_metadata = {
|
|
'title': input_file.name,
|
|
'token_count': file_tokens,
|
|
}
|
|
|
|
if hasattr(self, 'input_dir'):
|
|
try:
|
|
relative_path = str(input_file.relative_to(self.input_dir))
|
|
base_metadata['source'] = relative_path
|
|
except ValueError:
|
|
base_metadata['source'] = str(input_file)
|
|
else:
|
|
base_metadata['source'] = str(input_file)
|
|
|
|
if self.file_metadata is not None:
|
|
custom_metadata = self.file_metadata(input_file.name)
|
|
base_metadata.update(custom_metadata)
|
|
|
|
if isinstance(data, List):
|
|
# Extend data_list with each item in the data list
|
|
data_list.extend([str(d) for d in data])
|
|
metadata_list.extend([base_metadata for _ in data])
|
|
else:
|
|
data_list.append(str(data))
|
|
metadata_list.append(base_metadata)
|
|
|
|
# Build directory structure if input_dir is provided
|
|
if hasattr(self, 'input_dir'):
|
|
self.directory_structure = self.build_directory_structure(self.input_dir)
|
|
logging.info("Directory structure built successfully")
|
|
else:
|
|
self.directory_structure = {}
|
|
|
|
if concatenate:
|
|
return [Document("\n".join(data_list))]
|
|
elif self.file_metadata is not None:
|
|
return [Document(d, extra_info=m) for d, m in zip(data_list, metadata_list)]
|
|
else:
|
|
return [Document(d) for d in data_list]
|
|
|
|
def build_directory_structure(self, base_path):
|
|
"""Build a dictionary representing the directory structure.
|
|
|
|
Args:
|
|
base_path: The base path to start building the structure from.
|
|
|
|
Returns:
|
|
dict: A nested dictionary representing the directory structure.
|
|
"""
|
|
import mimetypes
|
|
|
|
def build_tree(path):
|
|
"""Helper function to recursively build the directory tree."""
|
|
result = {}
|
|
|
|
for item in path.iterdir():
|
|
if self.exclude_hidden and item.name.startswith('.'):
|
|
continue
|
|
|
|
if item.is_dir():
|
|
subtree = build_tree(item)
|
|
if subtree:
|
|
result[item.name] = subtree
|
|
else:
|
|
if self.required_exts is not None and item.suffix not in self.required_exts:
|
|
continue
|
|
|
|
full_path = str(item.resolve())
|
|
file_size_bytes = item.stat().st_size
|
|
mime_type = mimetypes.guess_type(item.name)[0] or "application/octet-stream"
|
|
|
|
file_info = {
|
|
"type": mime_type,
|
|
"size_bytes": file_size_bytes
|
|
}
|
|
|
|
if hasattr(self, 'file_token_counts') and full_path in self.file_token_counts:
|
|
file_info["token_count"] = self.file_token_counts[full_path]
|
|
|
|
result[item.name] = file_info
|
|
|
|
return result
|
|
|
|
return build_tree(Path(base_path)) |