Merge remote-tracking branch 'upstream/main' into feat/agent-menu

This commit is contained in:
Siddhant Rai
2025-04-28 17:01:46 +05:30
19 changed files with 703 additions and 342 deletions

View File

@@ -1,9 +1,14 @@
import datetime
import io
import json
import logging
import mimetypes
import os
import shutil
import string
import tempfile
import zipfile
from collections import Counter
from urllib.parse import urljoin
@@ -22,10 +27,12 @@ from application.parser.file.bulk import SimpleDirectoryReader
from application.parser.remote.remote_creator import RemoteCreator
from application.parser.schema.base import Document
from application.retriever.retriever_creator import RetrieverCreator
from application.utils import count_tokens_docs
from application.storage.storage_creator import StorageCreator
from application.utils import count_tokens_docs, num_tokens_from_string
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
db = mongo[settings.MONGO_DB_NAME]
sources_collection = db["sources"]
# Constants
@@ -210,68 +217,87 @@ def ingest_worker(
limit = None
exclude = True
sample = False
storage = StorageCreator.get_storage()
full_path = os.path.join(directory, user, name_job)
source_file_path = os.path.join(full_path, filename)
logging.info(f"Ingest file: {full_path}", extra={"user": user, "job": name_job})
file_data = {"name": name_job, "file": filename, "user": user}
if not os.path.exists(full_path):
os.makedirs(full_path)
download_file(
urljoin(settings.API_URL, "/api/download"),
file_data,
os.path.join(full_path, filename),
)
# Create temporary working directory
with tempfile.TemporaryDirectory() as temp_dir:
try:
os.makedirs(temp_dir, exist_ok=True)
# check if file is .zip and extract it
if filename.endswith(".zip"):
extract_zip_recursive(
os.path.join(full_path, filename), full_path, 0, RECURSION_DEPTH
)
# Download file from storage to temp directory
temp_file_path = os.path.join(temp_dir, filename)
file_data = storage.get_file(source_file_path)
self.update_state(state="PROGRESS", meta={"current": 1})
with open(temp_file_path, "wb") as f:
f.write(file_data.read())
raw_docs = SimpleDirectoryReader(
input_dir=full_path,
input_files=input_files,
recursive=recursive,
required_exts=formats,
num_files_limit=limit,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
).load_data()
self.update_state(state="PROGRESS", meta={"current": 1})
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False,
)
raw_docs = chunker.chunk(documents=raw_docs)
# Handle zip files
if filename.endswith(".zip"):
logging.info(f"Extracting zip file: {filename}")
extract_zip_recursive(
temp_file_path, temp_dir, current_depth=0, max_depth=RECURSION_DEPTH
)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
id = ObjectId()
if sample:
logging.info(f"Sample mode enabled. Using {limit} documents.")
embed_and_store_documents(docs, full_path, id, self)
tokens = count_tokens_docs(docs)
self.update_state(state="PROGRESS", meta={"current": 100})
reader = SimpleDirectoryReader(
input_dir=temp_dir,
input_files=input_files,
recursive=recursive,
required_exts=formats,
exclude_hidden=exclude,
file_metadata=metadata_from_filename,
)
raw_docs = reader.load_data()
if sample:
for i in range(min(5, len(raw_docs))):
logging.info(f"Sample document {i}: {raw_docs[i]}")
chunker = Chunker(
chunking_strategy="classic_chunk",
max_tokens=MAX_TOKENS,
min_tokens=MIN_TOKENS,
duplicate_headers=False,
)
raw_docs = chunker.chunk(documents=raw_docs)
file_data.update(
{
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
}
)
upload_index(full_path, file_data)
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs]
# delete local
shutil.rmtree(full_path)
id = ObjectId()
vector_store_path = os.path.join(temp_dir, "vector_store")
os.makedirs(vector_store_path, exist_ok=True)
embed_and_store_documents(docs, vector_store_path, id, self)
tokens = count_tokens_docs(docs)
self.update_state(state="PROGRESS", meta={"current": 100})
if sample:
for i in range(min(5, len(raw_docs))):
logging.info(f"Sample document {i}: {raw_docs[i]}")
file_data = {
"name": name_job,
"file": filename,
"user": user,
"tokens": tokens,
"retriever": retriever,
"id": str(id),
"type": "local",
}
upload_index(vector_store_path, file_data)
except Exception as e:
logging.error(f"Error in ingest_worker: {e}", exc_info=True)
raise
return {
"directory": directory,
@@ -407,68 +433,69 @@ def sync_worker(self, frequency):
}
def attachment_worker(self, directory, file_info, user):
def attachment_worker(self, file_info, user):
"""
Process and store a single attachment without vectorization.
Args:
self: Reference to the instance of the task.
directory (str): Base directory for storing files.
file_info (dict): Dictionary with folder and filename info.
user (str): User identifier.
Returns:
dict: Information about processed attachment.
"""
import datetime
import mimetypes
import os
from application.utils import num_tokens_from_string
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
db = mongo[settings.MONGO_DB_NAME]
attachments_collection = db["attachments"]
filename = file_info["filename"]
attachment_id = file_info["attachment_id"]
logging.info(
f"Processing attachment: {attachment_id}/{filename}", extra={"user": user}
)
self.update_state(state="PROGRESS", meta={"current": 10})
file_path = os.path.join(directory, filename)
if not os.path.exists(file_path):
logging.warning(f"File not found: {file_path}", extra={"user": user})
raise FileNotFoundError(f"File not found: {file_path}")
relative_path = file_info["path"]
file_content = file_info["file_content"]
try:
reader = SimpleDirectoryReader(input_files=[file_path])
documents = reader.load_data()
self.update_state(state="PROGRESS", meta={"current": 10})
storage_type = getattr(settings, "STORAGE_TYPE", "local")
storage = StorageCreator.create_storage(storage_type)
self.update_state(
state="PROGRESS", meta={"current": 30, "status": "Processing content"}
)
self.update_state(state="PROGRESS", meta={"current": 50})
with tempfile.NamedTemporaryFile(
suffix=os.path.splitext(filename)[1]
) as temp_file:
temp_file.write(file_content)
temp_file.flush()
reader = SimpleDirectoryReader(
input_files=[temp_file.name], exclude_hidden=True, errors="ignore"
)
documents = reader.load_data()
if not documents:
logging.warning(f"No content extracted from file: {filename}")
raise ValueError(f"Failed to extract content from file: {filename}")
if documents:
content = documents[0].text
token_count = num_tokens_from_string(content)
file_path_relative = f"{settings.UPLOAD_FOLDER}/{user}/attachments/{attachment_id}/{filename}"
self.update_state(
state="PROGRESS", meta={"current": 60, "status": "Saving file"}
)
file_obj = io.BytesIO(file_content)
mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
metadata = storage.save_file(file_obj, relative_path)
mime_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
self.update_state(
state="PROGRESS", meta={"current": 80, "status": "Storing in database"}
)
doc_id = ObjectId(attachment_id)
attachments_collection.insert_one(
{
"_id": doc_id,
"user": user,
"path": file_path_relative,
"path": relative_path,
"content": content,
"token_count": token_count,
"mime_type": mime_type,
"date": datetime.datetime.now(),
"metadata": metadata,
}
)
@@ -476,20 +503,19 @@ def attachment_worker(self, directory, file_info, user):
f"Stored attachment with ID: {attachment_id}", extra={"user": user}
)
self.update_state(state="PROGRESS", meta={"current": 100})
self.update_state(
state="PROGRESS", meta={"current": 100, "status": "Complete"}
)
return {
"filename": filename,
"path": file_path_relative,
"path": relative_path,
"token_count": token_count,
"attachment_id": attachment_id,
"mime_type": mime_type,
"metadata": metadata,
}
else:
logging.warning(
"No content was extracted from the file", extra={"user": user}
)
raise ValueError("No content was extracted from the file")
except Exception as e:
logging.error(
f"Error processing file {filename}: {e}",