diff --git a/doc/amazon_sagemaker_model_building_pipeline.rst b/doc/amazon_sagemaker_model_building_pipeline.rst index 9dfdf01d08..b85a9d9251 100644 --- a/doc/amazon_sagemaker_model_building_pipeline.rst +++ b/doc/amazon_sagemaker_model_building_pipeline.rst @@ -741,6 +741,8 @@ There are a number of properties for a pipeline execution that can only be resol - :class:`sagemaker.workflow.execution_variables.ExecutionVariables.PIPELINE_EXECUTION_ARN`: The execution ARN for an execution. - :class:`sagemaker.workflow.execution_variables.ExecutionVariables.PIPELINE_NAME`: The name of the pipeline. - :class:`sagemaker.workflow.execution_variables.ExecutionVariables.PIPELINE_ARN`: The ARN of the pipeline. +- :class:`sagemaker.workflow.execution_variables.ExecutionVariables.TRAINING_JOB_NAME`: The name of the training job launched by the training step. +- :class:`sagemaker.workflow.execution_variables.ExecutionVariables.PROCESSING_JOB_NAME`: The name of the processing job launched by the processing step. You can use these execution variables as you see fit. The following example uses the :code:`START_DATETIME` execution variable to construct a processing output path: diff --git a/doc/workflows/pipelines/sagemaker.workflow.pipelines.rst b/doc/workflows/pipelines/sagemaker.workflow.pipelines.rst index c8b68c0e6f..47e14b5e85 100644 --- a/doc/workflows/pipelines/sagemaker.workflow.pipelines.rst +++ b/doc/workflows/pipelines/sagemaker.workflow.pipelines.rst @@ -52,7 +52,7 @@ Execution Variables .. autoclass:: sagemaker.workflow.execution_variables.ExecutionVariable .. autoclass:: sagemaker.workflow.execution_variables.ExecutionVariables - :members: START_DATETIME, CURRENT_DATETIME, PIPELINE_EXECUTION_ID, PIPELINE_EXECUTION_ARN, PIPELINE_NAME, PIPELINE_ARN + :members: START_DATETIME, CURRENT_DATETIME, PIPELINE_EXECUTION_ID, PIPELINE_EXECUTION_ARN, PIPELINE_NAME, PIPELINE_ARN, TRAINING_JOB_NAME, PROCESSING_JOB_NAME Functions --------- diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index 9d0c30ff27..715c329f47 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -1025,6 +1025,12 @@ def fit( * If both `ExperimentName` and `TrialName` are not supplied the trial component will be unassociated. * `TrialComponentDisplayName` is used for display in Studio. + * Both `ExperimentName` and `TrialName` will be ignored if the Estimator instance + is built with :class:`~sagemaker.workflow.pipeline_context.PipelineSession`. + However, the value of `TrialComponentDisplayName` is honored for display in Studio. + Returns: + None or pipeline step arguments in case the Estimator instance is built with + :class:`~sagemaker.workflow.pipeline_context.PipelineSession` """ self._prepare_for_training(job_name=job_name) diff --git a/src/sagemaker/processing.py b/src/sagemaker/processing.py index 8da6e04768..9a1d8bd431 100644 --- a/src/sagemaker/processing.py +++ b/src/sagemaker/processing.py @@ -173,9 +173,14 @@ def run( * If both `ExperimentName` and `TrialName` are not supplied the trial component will be unassociated. * `TrialComponentDisplayName` is used for display in Studio. + * Both `ExperimentName` and `TrialName` will be ignored if the Processor instance + is built with :class:`~sagemaker.workflow.pipeline_context.PipelineSession`. + However, the value of `TrialComponentDisplayName` is honored for display in Studio. kms_key (str): The ARN of the KMS key that is used to encrypt the user code file (default: None). - + Returns: + None or pipeline step arguments in case the Processor instance is built with + :class:`~sagemaker.workflow.pipeline_context.PipelineSession` Raises: ValueError: if ``logs`` is True but ``wait`` is False. """ @@ -543,8 +548,14 @@ def run( * If both `ExperimentName` and `TrialName` are not supplied the trial component will be unassociated. * `TrialComponentDisplayName` is used for display in Studio. + * Both `ExperimentName` and `TrialName` will be ignored if the Processor instance + is built with :class:`~sagemaker.workflow.pipeline_context.PipelineSession`. + However, the value of `TrialComponentDisplayName` is honored for display in Studio. kms_key (str): The ARN of the KMS key that is used to encrypt the user code file (default: None). + Returns: + None or pipeline step arguments in case the Processor instance is built with + :class:`~sagemaker.workflow.pipeline_context.PipelineSession` """ normalized_inputs, normalized_outputs = self._normalize_args( job_name=job_name, @@ -1601,8 +1612,14 @@ def run( # type: ignore[override] * If both `ExperimentName` and `TrialName` are not supplied the trial component will be unassociated. * `TrialComponentDisplayName` is used for display in Studio. + * Both `ExperimentName` and `TrialName` will be ignored if the Processor instance + is built with :class:`~sagemaker.workflow.pipeline_context.PipelineSession`. + However, the value of `TrialComponentDisplayName` is honored for display in Studio. kms_key (str): The ARN of the KMS key that is used to encrypt the user code file (default: None). + Returns: + None or pipeline step arguments in case the Processor instance is built with + :class:`~sagemaker.workflow.pipeline_context.PipelineSession` """ s3_runproc_sh, inputs, job_name = self._pack_and_upload_code( code, source_dir, dependencies, git_config, job_name, inputs diff --git a/src/sagemaker/transformer.py b/src/sagemaker/transformer.py index dbe54c8d57..6df56ad154 100644 --- a/src/sagemaker/transformer.py +++ b/src/sagemaker/transformer.py @@ -186,6 +186,9 @@ def transform( * If both `ExperimentName` and `TrialName` are not supplied the trial component will be unassociated. * `TrialComponentDisplayName` is used for display in Studio. + * Both `ExperimentName` and `TrialName` will be ignored if the Transformer instance + is built with :class:`~sagemaker.workflow.pipeline_context.PipelineSession`. + However, the value of `TrialComponentDisplayName` is honored for display in Studio. model_client_config (dict[str, str]): Model configuration. Dictionary contains two optional keys, 'InvocationsTimeoutInSeconds', and 'InvocationsMaxRetries'. @@ -194,6 +197,9 @@ def transform( (default: ``True``). logs (bool): Whether to show the logs produced by the job. Only meaningful when wait is ``True`` (default: ``True``). + Returns: + None or pipeline step arguments in case the Transformer instance is built with + :class:`~sagemaker.workflow.pipeline_context.PipelineSession` """ local_mode = self.sagemaker_session.local_mode if not local_mode and not is_pipeline_variable(data) and not data.startswith("s3://"): diff --git a/src/sagemaker/workflow/execution_variables.py b/src/sagemaker/workflow/execution_variables.py index 516efb784e..59ad1733ad 100644 --- a/src/sagemaker/workflow/execution_variables.py +++ b/src/sagemaker/workflow/execution_variables.py @@ -58,6 +58,8 @@ class ExecutionVariables: - ExecutionVariables.PIPELINE_ARN - ExecutionVariables.PIPELINE_EXECUTION_ID - ExecutionVariables.PIPELINE_EXECUTION_ARN + - ExecutionVariables.TRAINING_JOB_NAME + - ExecutionVariables.PROCESSING_JOB_NAME """ START_DATETIME = ExecutionVariable("StartDateTime") @@ -66,3 +68,5 @@ class ExecutionVariables: PIPELINE_ARN = ExecutionVariable("PipelineArn") PIPELINE_EXECUTION_ID = ExecutionVariable("PipelineExecutionId") PIPELINE_EXECUTION_ARN = ExecutionVariable("PipelineExecutionArn") + TRAINING_JOB_NAME = ExecutionVariable("TrainingJobName") + PROCESSING_JOB_NAME = ExecutionVariable("ProcessingJobName") diff --git a/src/sagemaker/workflow/steps.py b/src/sagemaker/workflow/steps.py index d73a899084..e979657bd4 100644 --- a/src/sagemaker/workflow/steps.py +++ b/src/sagemaker/workflow/steps.py @@ -223,6 +223,18 @@ def _get_step_name_from_str( return step_map[str_input].steps[-1].name return str_input + @staticmethod + def _trim_experiment_config(request_dict: Dict): + """For job steps, trim the experiment config to keep the trial component display name.""" + if request_dict.get("ExperimentConfig", {}).get("TrialComponentDisplayName"): + request_dict["ExperimentConfig"] = { + "TrialComponentDisplayName": request_dict["ExperimentConfig"][ + "TrialComponentDisplayName" + ] + } + else: + request_dict.pop("ExperimentConfig", None) + @attr.s class CacheConfig: @@ -432,7 +444,7 @@ def arguments(self) -> RequestType: request_dict["HyperParameters"].pop("sagemaker_job_name", None) request_dict.pop("TrainingJobName", None) - request_dict.pop("ExperimentConfig", None) + Step._trim_experiment_config(request_dict) return request_dict @@ -663,7 +675,8 @@ def arguments(self) -> RequestType: ) request_dict.pop("TransformJobName", None) - request_dict.pop("ExperimentConfig", None) + Step._trim_experiment_config(request_dict) + return request_dict @property @@ -811,7 +824,8 @@ def arguments(self) -> RequestType: request_dict = self.processor.sagemaker_session._get_process_request(**process_args) request_dict.pop("ProcessingJobName", None) - request_dict.pop("ExperimentConfig", None) + Step._trim_experiment_config(request_dict) + return request_dict @property diff --git a/tests/unit/sagemaker/workflow/test_processing_step.py b/tests/unit/sagemaker/workflow/test_processing_step.py index 262d0eb558..93fd439468 100644 --- a/tests/unit/sagemaker/workflow/test_processing_step.py +++ b/tests/unit/sagemaker/workflow/test_processing_step.py @@ -18,6 +18,8 @@ import pytest import warnings +from copy import deepcopy + from sagemaker.estimator import Estimator from sagemaker.parameter import IntegerParameter from sagemaker.transformer import Transformer @@ -244,7 +246,34 @@ def network_config(): ) -def test_processing_step_with_processor(pipeline_session, processing_input): +@pytest.mark.parametrize( + "experiment_config, expected_experiment_config", + [ + ( + { + "ExperimentName": "experiment-name", + "TrialName": "trial-name", + "TrialComponentDisplayName": "display-name", + }, + {"TrialComponentDisplayName": "display-name"}, + ), + ( + {"TrialComponentDisplayName": "display-name"}, + {"TrialComponentDisplayName": "display-name"}, + ), + ( + { + "ExperimentName": "experiment-name", + "TrialName": "trial-name", + }, + None, + ), + (None, None), + ], +) +def test_processing_step_with_processor( + pipeline_session, processing_input, experiment_config, expected_experiment_config +): custom_step1 = CustomStep("TestStep") custom_step2 = CustomStep("SecondTestStep") processor = Processor( @@ -256,7 +285,7 @@ def test_processing_step_with_processor(pipeline_session, processing_input): ) with warnings.catch_warnings(record=True) as w: - step_args = processor.run(inputs=processing_input) + step_args = processor.run(inputs=processing_input, experiment_config=experiment_config) assert len(w) == 1 assert issubclass(w[-1].category, UserWarning) assert "Running within a PipelineSession" in str(w[-1].message) @@ -283,13 +312,21 @@ def test_processing_step_with_processor(pipeline_session, processing_input): steps=[step, custom_step1, custom_step2], sagemaker_session=pipeline_session, ) + + expected_step_arguments = deepcopy(step_args.args) + if expected_experiment_config is None: + expected_step_arguments.pop("ExperimentConfig", None) + else: + expected_step_arguments["ExperimentConfig"] = expected_experiment_config + del expected_step_arguments["ProcessingJobName"] + assert json.loads(pipeline.definition())["Steps"][0] == { "Name": "MyProcessingStep", "Description": "ProcessingStep description", "DisplayName": "MyProcessingStep", "Type": "Processing", "DependsOn": ["TestStep", "SecondTestStep"], - "Arguments": step_args.args, + "Arguments": expected_step_arguments, "CacheConfig": {"Enabled": True, "ExpireAfter": "PT1H"}, "PropertyFiles": [ { diff --git a/tests/unit/sagemaker/workflow/test_training_step.py b/tests/unit/sagemaker/workflow/test_training_step.py index f043048095..66a7c2fc43 100644 --- a/tests/unit/sagemaker/workflow/test_training_step.py +++ b/tests/unit/sagemaker/workflow/test_training_step.py @@ -19,6 +19,8 @@ import pytest import warnings +from copy import deepcopy + from sagemaker import Processor, Model from sagemaker.parameter import IntegerParameter from sagemaker.transformer import Transformer @@ -207,7 +209,34 @@ def hyperparameters(): return {"test-key": "test-val"} -def test_training_step_with_estimator(pipeline_session, training_input, hyperparameters): +@pytest.mark.parametrize( + "experiment_config, expected_experiment_config", + [ + ( + { + "ExperimentName": "experiment-name", + "TrialName": "trial-name", + "TrialComponentDisplayName": "display-name", + }, + {"TrialComponentDisplayName": "display-name"}, + ), + ( + {"TrialComponentDisplayName": "display-name"}, + {"TrialComponentDisplayName": "display-name"}, + ), + ( + { + "ExperimentName": "experiment-name", + "TrialName": "trial-name", + }, + None, + ), + (None, None), + ], +) +def test_training_step_with_estimator( + pipeline_session, training_input, hyperparameters, experiment_config, expected_experiment_config +): custom_step1 = CustomStep("TestStep") custom_step2 = CustomStep("SecondTestStep") enable_network_isolation = ParameterBoolean(name="enable_network_isolation") @@ -226,7 +255,9 @@ def test_training_step_with_estimator(pipeline_session, training_input, hyperpar with warnings.catch_warnings(record=True) as w: # TODO: remove job_name once we merge # https://github.com/aws/sagemaker-python-sdk/pull/3158/files - step_args = estimator.fit(inputs=training_input, job_name="TestJob") + step_args = estimator.fit( + inputs=training_input, job_name="TestJob", experiment_config=experiment_config + ) assert len(w) == 1 assert issubclass(w[-1].category, UserWarning) assert "Running within a PipelineSession" in str(w[-1].message) @@ -247,17 +278,28 @@ def test_training_step_with_estimator(pipeline_session, training_input, hyperpar parameters=[enable_network_isolation, encrypt_container_traffic], sagemaker_session=pipeline_session, ) - step_args.args["EnableInterContainerTrafficEncryption"] = { + + expected_step_arguments = deepcopy(step_args.args) + + expected_step_arguments["EnableInterContainerTrafficEncryption"] = { "Get": "Parameters.encrypt_container_traffic" } - step_args.args["EnableNetworkIsolation"] = {"Get": "Parameters.encrypt_container_traffic"} + expected_step_arguments["EnableNetworkIsolation"] = { + "Get": "Parameters.enable_network_isolation" + } + if expected_experiment_config is None: + expected_step_arguments.pop("ExperimentConfig", None) + else: + expected_step_arguments["ExperimentConfig"] = expected_experiment_config + del expected_step_arguments["TrainingJobName"] + assert json.loads(pipeline.definition())["Steps"][0] == { "Name": "MyTrainingStep", "Description": "TrainingStep description", "DisplayName": "MyTrainingStep", "Type": "Training", "DependsOn": ["TestStep", "SecondTestStep"], - "Arguments": step_args.args, + "Arguments": expected_step_arguments, } assert step.properties.TrainingJobName.expr == {"Get": "Steps.MyTrainingStep.TrainingJobName"} adjacency_list = PipelineGraph.from_pipeline(pipeline).adjacency_list diff --git a/tests/unit/sagemaker/workflow/test_transform_step.py b/tests/unit/sagemaker/workflow/test_transform_step.py index 3d0e25a2ee..3052a910de 100644 --- a/tests/unit/sagemaker/workflow/test_transform_step.py +++ b/tests/unit/sagemaker/workflow/test_transform_step.py @@ -18,6 +18,8 @@ import pytest import warnings +from copy import deepcopy + from sagemaker import Model, Processor from sagemaker.estimator import Estimator from sagemaker.parameter import IntegerParameter @@ -153,27 +155,108 @@ def test_transform_step_with_transformer(model_name, data, output_path, pipeline parameters=[model_name, data], sagemaker_session=pipeline_session, ) - step_args = step_args.args - step_def = json.loads(pipeline.definition())["Steps"][0] - step_args["ModelName"] = model_name.expr if is_pipeline_variable(model_name) else model_name - step_args["TransformInput"]["DataSource"]["S3DataSource"]["S3Uri"] = ( + + expected_step_arguments = deepcopy(step_args.args) + expected_step_arguments["ModelName"] = ( + model_name.expr if is_pipeline_variable(model_name) else model_name + ) + expected_step_arguments["TransformInput"]["DataSource"]["S3DataSource"]["S3Uri"] = ( data.expr if is_pipeline_variable(data) else data ) - step_args["TransformOutput"]["S3OutputPath"] = ( + expected_step_arguments["TransformOutput"]["S3OutputPath"] = ( output_path.expr if is_pipeline_variable(output_path) else output_path ) + del expected_step_arguments["TransformJobName"] + + step_def = json.loads(pipeline.definition())["Steps"][0] + assert step_def == { + "Name": "MyTransformStep", + "Type": "Transform", + "Arguments": expected_step_arguments, + } + - del ( - step_args["ModelName"], - step_args["TransformInput"]["DataSource"]["S3DataSource"]["S3Uri"], - step_args["TransformOutput"]["S3OutputPath"], +@pytest.mark.parametrize( + "experiment_config, expected_experiment_config", + [ + ( + { + "ExperimentName": "experiment-name", + "TrialName": "trial-name", + "TrialComponentDisplayName": "display-name", + }, + {"TrialComponentDisplayName": "display-name"}, + ), + ( + {"TrialComponentDisplayName": "display-name"}, + {"TrialComponentDisplayName": "display-name"}, + ), + ( + { + "ExperimentName": "experiment-name", + "TrialName": "trial-name", + }, + None, + ), + (None, None), + ], +) +def test_transform_step_with_transformer_experiment_config( + experiment_config, expected_experiment_config, pipeline_session +): + transformer = Transformer( + model_name="my_model", + instance_type="ml.m5.xlarge", + instance_count=1, + output_path="s3://my-bucket/my-output-path", + sagemaker_session=pipeline_session, ) - del ( - step_def["Arguments"]["ModelName"], - step_def["Arguments"]["TransformInput"]["DataSource"]["S3DataSource"]["S3Uri"], - step_def["Arguments"]["TransformOutput"]["S3OutputPath"], + transform_inputs = TransformInput(data="s3://my-bucket/my-data") + + with warnings.catch_warnings(record=True) as w: + step_args = transformer.transform( + data=transform_inputs.data, + data_type=transform_inputs.data_type, + content_type=transform_inputs.content_type, + compression_type=transform_inputs.compression_type, + split_type=transform_inputs.split_type, + input_filter=transform_inputs.input_filter, + output_filter=transform_inputs.output_filter, + join_source=transform_inputs.join_source, + model_client_config=transform_inputs.model_client_config, + experiment_config=experiment_config, + ) + assert len(w) == 1 + assert issubclass(w[-1].category, UserWarning) + assert "Running within a PipelineSession" in str(w[-1].message) + + with warnings.catch_warnings(record=True) as w: + step = TransformStep( + name="MyTransformStep", + step_args=step_args, + ) + assert len(w) == 0 + + pipeline = Pipeline( + name="MyPipeline", + steps=[step], + sagemaker_session=pipeline_session, ) - assert step_def == {"Name": "MyTransformStep", "Type": "Transform", "Arguments": step_args} + + expected_step_arguments = deepcopy(step_args.args) + if expected_experiment_config is None: + expected_step_arguments.pop("ExperimentConfig", None) + else: + expected_step_arguments["ExperimentConfig"] = expected_experiment_config + del expected_step_arguments["TransformJobName"] + + step_def = json.loads(pipeline.definition())["Steps"][0] + assert step_def == { + "Name": "MyTransformStep", + "Type": "Transform", + "Arguments": expected_step_arguments, + } + adjacency_list = PipelineGraph.from_pipeline(pipeline).adjacency_list assert adjacency_list == {"MyTransformStep": []}