From 377e33c148c664d21e9b3c580cce1909352aca5e Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Wed, 16 Apr 2025 03:36:45 +0530 Subject: [PATCH] (feat:file_abstract) process files method --- application/storage/base.py | 20 +++++++++++++++++- application/storage/local.py | 24 ++++++++++++++++++++-- application/storage/s3.py | 39 +++++++++++++++++++++++++++++++++--- 3 files changed, 77 insertions(+), 6 deletions(-) diff --git a/application/storage/base.py b/application/storage/base.py index c16eb600..cb205091 100644 --- a/application/storage/base.py +++ b/application/storage/base.py @@ -1,6 +1,6 @@ """Base storage class for file system abstraction.""" from abc import ABC, abstractmethod -from typing import BinaryIO, List +from typing import BinaryIO, List, Optional, Callable class BaseStorage(ABC): @@ -33,6 +33,24 @@ class BaseStorage(ABC): """ pass + @abstractmethod + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + This method handles the details of retrieving the file and providing + it to the processor function in an appropriate way based on the storage type. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + pass + @abstractmethod def delete_file(self, path: str) -> bool: """ diff --git a/application/storage/local.py b/application/storage/local.py index 82707007..91c5c264 100644 --- a/application/storage/local.py +++ b/application/storage/local.py @@ -1,9 +1,8 @@ """Local file system implementation.""" import os import shutil -from typing import BinaryIO, List +from typing import BinaryIO, List, Callable -from application.core.settings import settings from application.storage.base import BaseStorage @@ -83,3 +82,24 @@ class LocalStorage(BaseStorage): result.append(rel_path) return result + + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + For local storage, we can directly pass the full path to the processor. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + full_path = self._get_full_path(path) + + if not os.path.exists(full_path): + raise FileNotFoundError(f"File not found: {full_path}") + + return processor_func(file_path=full_path, **kwargs) diff --git a/application/storage/s3.py b/application/storage/s3.py index f9d38d09..cdec6887 100644 --- a/application/storage/s3.py +++ b/application/storage/s3.py @@ -1,6 +1,6 @@ """S3 storage implementation.""" import io -from typing import BinaryIO, List +from typing import BinaryIO, List, Callable import boto3 from botocore.exceptions import ClientError @@ -24,7 +24,6 @@ class S3Storage(BaseStorage): """ self.bucket_name = bucket_name - # Initialize S3 client self.s3 = boto3.client( 's3', aws_access_key_id=aws_access_key_id, @@ -78,4 +77,38 @@ class S3Storage(BaseStorage): for obj in page['Contents']: result.append(obj['Key']) - return result \ No newline at end of file + return result + + def process_file(self, path: str, processor_func: Callable, **kwargs): + """ + Process a file using the provided processor function. + + For S3 storage, we need to download the file to a temporary location first. + + Args: + path: Path to the file + processor_func: Function that processes the file + **kwargs: Additional arguments to pass to the processor function + + Returns: + The result of the processor function + """ + import tempfile + import os + + if not self.file_exists(path): + raise FileNotFoundError(f"File not found: {path}") + + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + self.s3.download_fileobj(self.bucket_name, path, temp_file) + temp_path = temp_file.name + + try: + result = processor_func(file_path=temp_path, **kwargs) + return result + finally: + try: + os.unlink(temp_path) + except Exception as e: + import logging + logging.warning(f"Failed to delete temporary file: {e}")