|
26 | 26 | import tempfile
|
27 | 27 | from abc import ABC, abstractmethod
|
28 | 28 | from typing import List, Union, Dict, Optional, Any
|
29 |
| - |
| 29 | +from enum import Enum |
30 | 30 | from schema import Schema, And, Use, Or, Optional as SchemaOptional, Regex
|
31 | 31 |
|
32 | 32 | from sagemaker import image_uris, s3, utils
|
|
302 | 302 | )
|
303 | 303 |
|
304 | 304 |
|
| 305 | +class DatasetType(Enum): |
| 306 | + """Enum to store different dataset types supported in the Analysis config file""" |
| 307 | + |
| 308 | + TEXTCSV = "text/csv" |
| 309 | + JSONLINES = "application/jsonlines" |
| 310 | + JSON = "application/json" |
| 311 | + PARQUET = "application/x-parquet" |
| 312 | + IMAGE = "application/x-image" |
| 313 | + |
| 314 | + |
305 | 315 | class DataConfig:
|
306 | 316 | """Config object related to configurations of the input and output dataset."""
|
307 | 317 |
|
@@ -1363,7 +1373,7 @@ def _run(
|
1363 | 1373 | source=self._CLARIFY_OUTPUT,
|
1364 | 1374 | destination=data_config.s3_output_path,
|
1365 | 1375 | output_name="analysis_result",
|
1366 |
| - s3_upload_mode="EndOfJob", |
| 1376 | + s3_upload_mode=ProcessingOutputHandler.get_s3_upload_mode(analysis_config), |
1367 | 1377 | )
|
1368 | 1378 |
|
1369 | 1379 | return super().run(
|
@@ -2083,6 +2093,30 @@ def _upload_analysis_config(analysis_config_file, s3_output_path, sagemaker_sess
|
2083 | 2093 | )
|
2084 | 2094 |
|
2085 | 2095 |
|
| 2096 | +class ProcessingOutputHandler: |
| 2097 | + """Handles the parameters sent in SagemakerProcessor.Processingoutput based on the dataset |
| 2098 | + type in analysis_config. |
| 2099 | + """ |
| 2100 | + |
| 2101 | + class S3UploadMode(Enum): |
| 2102 | + """Enum values for different uplaod modes to s3 bucket""" |
| 2103 | + |
| 2104 | + CONTINUOUS = "Continuous" |
| 2105 | + ENDOFJOB = "EndOfJob" |
| 2106 | + |
| 2107 | + @classmethod |
| 2108 | + def get_s3_upload_mode(cls, analysis_config: Dict[str, Any]) -> str: |
| 2109 | + """ |
| 2110 | + returns the s3_upload mode based on the shap_config values |
| 2111 | + """ |
| 2112 | + dataset_type = analysis_config["dataset_type"] |
| 2113 | + return ( |
| 2114 | + ProcessingOutputHandler.S3UploadMode.CONTINUOUS.value |
| 2115 | + if dataset_type == DatasetType.IMAGE |
| 2116 | + else ProcessingOutputHandler.S3UploadMode.ENDOFJOB.value |
| 2117 | + ) |
| 2118 | + |
| 2119 | + |
2086 | 2120 | def _set(value, key, dictionary):
|
2087 | 2121 | """Sets dictionary[key] = value if value is not None."""
|
2088 | 2122 | if value is not None:
|
|
0 commit comments