diff --git a/application/worker.py b/application/worker.py index d561a53f..b5caa23e 100755 --- a/application/worker.py +++ b/application/worker.py @@ -7,11 +7,12 @@ import io import datetime import mimetypes import requests +import tempfile from collections import Counter from urllib.parse import urljoin -from application.storage.storage_creator import StorageCreator +from application.storage.storage_creator import StorageCreator from application.utils import num_tokens_from_string from application.core.settings import settings from application.parser.file.bulk import SimpleDirectoryReader @@ -209,7 +210,7 @@ def remote_worker( sync_frequency="never", operation_mode="upload", doc_id=None, -): +): full_path = os.path.join(directory, user, name_job) if not os.path.exists(full_path): os.makedirs(full_path) @@ -324,58 +325,48 @@ def attachment_worker(self, file_info, user): """ Process and store a single attachment without vectorization. """ - + mongo = MongoDB.get_client() db = mongo["docsgpt"] attachments_collection = db["attachments"] - + filename = file_info["filename"] attachment_id = file_info["attachment_id"] relative_path = file_info["path"] file_content = file_info["file_content"] - + try: self.update_state(state="PROGRESS", meta={"current": 10}) - storage_type = getattr(settings, "STORAGE_TYPE", "local") storage = StorageCreator.create_storage(storage_type) - - self.update_state(state="PROGRESS", meta={"current": 30, "status": "Saving file"}) - file_obj = io.BytesIO(file_content) - storage.save_file(file_obj, relative_path) - - def process_document(file_path, **kwargs): - self.update_state(state="PROGRESS", meta={"current": 50, "status": "Processing content"}) - + self.update_state(state="PROGRESS", meta={"current": 30, "status": "Processing content"}) + + with tempfile.NamedTemporaryFile(suffix=os.path.splitext(filename)[1]) as temp_file: + temp_file.write(file_content) + temp_file.flush() reader = SimpleDirectoryReader( - input_files=[file_path], + input_files=[temp_file.name], exclude_hidden=True, errors="ignore" ) documents = reader.load_data() - + if not documents: logging.warning(f"No content extracted from file: {filename}") raise ValueError(f"Failed to extract content from file: {filename}") - + content = documents[0].text token_count = num_tokens_from_string(content) - + + self.update_state(state="PROGRESS", meta={"current": 60, "status": "Saving file"}) + file_obj = io.BytesIO(file_content) + + metadata = storage.save_file(file_obj, relative_path) + mime_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream' - - metadata = { - "storage_type": storage_type, - } - - if storage_type == "s3": - metadata.update({ - "bucket_name": getattr(storage, "bucket_name", "docsgpt-test-bucket"), - "uri": f"s3://{storage.bucket_name}/{relative_path}", - "region": getattr(settings, "SAGEMAKER_REGION", "us-east-1") - }) - + self.update_state(state="PROGRESS", meta={"current": 80, "status": "Storing in database"}) - + doc_id = ObjectId(attachment_id) attachments_collection.insert_one({ "_id": doc_id, @@ -387,12 +378,12 @@ def attachment_worker(self, file_info, user): "date": datetime.datetime.now(), "metadata": metadata }) - - logging.info(f"Stored attachment with ID: {attachment_id}", + + logging.info(f"Stored attachment with ID: {attachment_id}", extra={"user": user}) - + self.update_state(state="PROGRESS", meta={"current": 100, "status": "Complete"}) - + return { "filename": filename, "path": relative_path, @@ -401,9 +392,7 @@ def attachment_worker(self, file_info, user): "mime_type": mime_type, "metadata": metadata } - - return storage.process_file(relative_path, process_document) - + except Exception as e: logging.error(f"Error processing file {filename}: {e}", extra={"user": user}, exc_info=True) raise