From 9454150f7d125d179bcfb97629f0cb16ebdf6932 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Thu, 17 Apr 2025 02:36:55 +0530 Subject: [PATCH] (fix:s3) processor func --- application/storage/base.py | 2 +- application/storage/s3.py | 42 ++++++++++++++++++------------------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/application/storage/base.py b/application/storage/base.py index cb205091..88fed0c6 100644 --- a/application/storage/base.py +++ b/application/storage/base.py @@ -1,6 +1,6 @@ """Base storage class for file system abstraction.""" from abc import ABC, abstractmethod -from typing import BinaryIO, List, Optional, Callable +from typing import BinaryIO, List, Callable class BaseStorage(ABC): diff --git a/application/storage/s3.py b/application/storage/s3.py index cdec6887..e02a2a5a 100644 --- a/application/storage/s3.py +++ b/application/storage/s3.py @@ -1,28 +1,31 @@ """S3 storage implementation.""" import io from typing import BinaryIO, List, Callable +import os import boto3 from botocore.exceptions import ClientError from application.storage.base import BaseStorage +from application.core.settings import settings class S3Storage(BaseStorage): """AWS S3 storage implementation.""" - def __init__(self, bucket_name: str, aws_access_key_id=None, - aws_secret_access_key=None, region_name=None): + def __init__(self, bucket_name=None): """ Initialize S3 storage. Args: - bucket_name: S3 bucket name - aws_access_key_id: AWS access key ID (optional if using IAM roles) - aws_secret_access_key: AWS secret access key (optional if using IAM roles) - region_name: AWS region name (optional) + bucket_name: S3 bucket name (optional, defaults to settings) """ - self.bucket_name = bucket_name + self.bucket_name = bucket_name or getattr(settings, "S3_BUCKET_NAME", "docsgpt-test-bucket") + + # Get credentials from settings + aws_access_key_id = getattr(settings, "SAGEMAKER_ACCESS_KEY", None) + aws_secret_access_key = getattr(settings, "SAGEMAKER_SECRET_KEY", None) + region_name = getattr(settings, "SAGEMAKER_REGION", None) self.s3 = boto3.client( 's3', @@ -83,8 +86,6 @@ class S3Storage(BaseStorage): """ Process a file using the provided processor function. - For S3 storage, we need to download the file to a temporary location first. - Args: path: Path to the file processor_func: Function that processes the file @@ -94,21 +95,18 @@ class S3Storage(BaseStorage): The result of the processor function """ import tempfile - import os + import logging if not self.file_exists(path): - raise FileNotFoundError(f"File not found: {path}") + raise FileNotFoundError(f"File not found in S3: {path}") - with tempfile.NamedTemporaryFile(delete=False) as temp_file: - self.s3.download_fileobj(self.bucket_name, path, temp_file) - temp_path = temp_file.name - - try: - result = processor_func(file_path=temp_path, **kwargs) - return result - finally: + with tempfile.NamedTemporaryFile(suffix=os.path.splitext(path)[1], delete=True) as temp_file: try: - os.unlink(temp_path) + # Download the file from S3 to the temporary file + self.s3.download_fileobj(self.bucket_name, path, temp_file) + temp_file.flush() + result = processor_func(file_path=temp_file.name, **kwargs) + return result except Exception as e: - import logging - logging.warning(f"Failed to delete temporary file: {e}") + logging.error(f"Error processing S3 file {path}: {e}", exc_info=True) + raise