|
26 | 26 | import tempfile
|
27 | 27 | from abc import ABC, abstractmethod
|
28 | 28 | from typing import List, Union, Dict, Optional, Any
|
29 |
| - |
| 29 | +from enum import Enum |
30 | 30 | from schema import Schema, And, Use, Or, Optional as SchemaOptional, Regex
|
31 | 31 |
|
32 | 32 | from sagemaker import image_uris, s3, utils
|
|
304 | 304 | )
|
305 | 305 |
|
306 | 306 |
|
| 307 | +class DatasetType(Enum): |
| 308 | + """Enum to store different dataset types supported in the Analysis config file""" |
| 309 | + |
| 310 | + TEXTCSV = "text/csv" |
| 311 | + JSONLINES = "application/jsonlines" |
| 312 | + JSON = "application/json" |
| 313 | + PARQUET = "application/x-parquet" |
| 314 | + IMAGE = "application/x-image" |
| 315 | + |
| 316 | + |
307 | 317 | class DataConfig:
|
308 | 318 | """Config object related to configurations of the input and output dataset."""
|
309 | 319 |
|
@@ -1451,7 +1461,7 @@ def _run(
|
1451 | 1461 | source=self._CLARIFY_OUTPUT,
|
1452 | 1462 | destination=data_config.s3_output_path,
|
1453 | 1463 | output_name="analysis_result",
|
1454 |
| - s3_upload_mode="EndOfJob", |
| 1464 | + s3_upload_mode=ProcessingOutputHandler.get_s3_upload_mode(analysis_config), |
1455 | 1465 | )
|
1456 | 1466 |
|
1457 | 1467 | return super().run(
|
@@ -2171,6 +2181,33 @@ def _upload_analysis_config(analysis_config_file, s3_output_path, sagemaker_sess
|
2171 | 2181 | )
|
2172 | 2182 |
|
2173 | 2183 |
|
| 2184 | +class ProcessingOutputHandler: |
| 2185 | + """Class to handle the parameters for SagemakerProcessor.Processingoutput""" |
| 2186 | + |
| 2187 | + class S3UploadMode(Enum): |
| 2188 | + """Enum values for different uplaod modes to s3 bucket""" |
| 2189 | + |
| 2190 | + CONTINUOUS = "Continuous" |
| 2191 | + ENDOFJOB = "EndOfJob" |
| 2192 | + |
| 2193 | + @classmethod |
| 2194 | + def get_s3_upload_mode(cls, analysis_config: Dict[str, Any]) -> str: |
| 2195 | + """Fetches s3_upload mode based on the shap_config values |
| 2196 | +
|
| 2197 | + Args: |
| 2198 | + analysis_config (dict): dict Config following the analysis_config.json format |
| 2199 | +
|
| 2200 | + Returns: |
| 2201 | + The s3_upload_mode type for the processing output. |
| 2202 | + """ |
| 2203 | + dataset_type = analysis_config["dataset_type"] |
| 2204 | + return ( |
| 2205 | + ProcessingOutputHandler.S3UploadMode.CONTINUOUS.value |
| 2206 | + if dataset_type == DatasetType.IMAGE.value |
| 2207 | + else ProcessingOutputHandler.S3UploadMode.ENDOFJOB.value |
| 2208 | + ) |
| 2209 | + |
| 2210 | + |
2174 | 2211 | def _set(value, key, dictionary):
|
2175 | 2212 | """Sets dictionary[key] = value if value is not None."""
|
2176 | 2213 | if value is not None:
|
|
0 commit comments