(feat:attachmentUpload) parse content before upload

This commit is contained in:
ManishMadan2882
2025-04-17 16:23:01 +05:30
parent 68e4cf4d14
commit 0d3e6157cd

View File

@@ -7,11 +7,12 @@ import io
import datetime
import mimetypes
import requests
import tempfile
from collections import Counter
from urllib.parse import urljoin
from application.storage.storage_creator import StorageCreator
from application.storage.storage_creator import StorageCreator
from application.utils import num_tokens_from_string
from application.core.settings import settings
from application.parser.file.bulk import SimpleDirectoryReader
@@ -209,7 +210,7 @@ def remote_worker(
sync_frequency="never",
operation_mode="upload",
doc_id=None,
):
):
full_path = os.path.join(directory, user, name_job)
if not os.path.exists(full_path):
os.makedirs(full_path)
@@ -324,58 +325,48 @@ def attachment_worker(self, file_info, user):
"""
Process and store a single attachment without vectorization.
"""
mongo = MongoDB.get_client()
db = mongo["docsgpt"]
attachments_collection = db["attachments"]
filename = file_info["filename"]
attachment_id = file_info["attachment_id"]
relative_path = file_info["path"]
file_content = file_info["file_content"]
try:
self.update_state(state="PROGRESS", meta={"current": 10})
storage_type = getattr(settings, "STORAGE_TYPE", "local")
storage = StorageCreator.create_storage(storage_type)
self.update_state(state="PROGRESS", meta={"current": 30, "status": "Saving file"})
file_obj = io.BytesIO(file_content)
storage.save_file(file_obj, relative_path)
def process_document(file_path, **kwargs):
self.update_state(state="PROGRESS", meta={"current": 50, "status": "Processing content"})
self.update_state(state="PROGRESS", meta={"current": 30, "status": "Processing content"})
with tempfile.NamedTemporaryFile(suffix=os.path.splitext(filename)[1]) as temp_file:
temp_file.write(file_content)
temp_file.flush()
reader = SimpleDirectoryReader(
input_files=[file_path],
input_files=[temp_file.name],
exclude_hidden=True,
errors="ignore"
)
documents = reader.load_data()
if not documents:
logging.warning(f"No content extracted from file: {filename}")
raise ValueError(f"Failed to extract content from file: {filename}")
content = documents[0].text
token_count = num_tokens_from_string(content)
self.update_state(state="PROGRESS", meta={"current": 60, "status": "Saving file"})
file_obj = io.BytesIO(file_content)
metadata = storage.save_file(file_obj, relative_path)
mime_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
metadata = {
"storage_type": storage_type,
}
if storage_type == "s3":
metadata.update({
"bucket_name": getattr(storage, "bucket_name", "docsgpt-test-bucket"),
"uri": f"s3://{storage.bucket_name}/{relative_path}",
"region": getattr(settings, "SAGEMAKER_REGION", "us-east-1")
})
self.update_state(state="PROGRESS", meta={"current": 80, "status": "Storing in database"})
doc_id = ObjectId(attachment_id)
attachments_collection.insert_one({
"_id": doc_id,
@@ -387,12 +378,12 @@ def attachment_worker(self, file_info, user):
"date": datetime.datetime.now(),
"metadata": metadata
})
logging.info(f"Stored attachment with ID: {attachment_id}",
logging.info(f"Stored attachment with ID: {attachment_id}",
extra={"user": user})
self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"})
return {
"filename": filename,
"path": relative_path,
@@ -401,9 +392,7 @@ def attachment_worker(self, file_info, user):
"mime_type": mime_type,
"metadata": metadata
}
return storage.process_file(relative_path, process_document)
except Exception as e:
logging.error(f"Error processing file {filename}: {e}", extra={"user": user}, exc_info=True)
raise