From e237f09a0fa99109807131b84796196f73e11cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jerrypeng73=F0=9F=98=8E?= Date: Tue, 31 May 2022 18:28:06 -0700 Subject: [PATCH 1/2] fix pipeline doc code example where process.run only accepts argument --- .../workflow/test_processing_step.py | 285 +++++++++++------- .../sagemaker/workflow/test_training_step.py | 233 +++++++------- .../sagemaker/workflow/test_tuning_step.py | 35 ++- 3 files changed, 324 insertions(+), 229 deletions(-) diff --git a/tests/unit/sagemaker/workflow/test_processing_step.py b/tests/unit/sagemaker/workflow/test_processing_step.py index a85106f9d6..8d6ee80389 100644 --- a/tests/unit/sagemaker/workflow/test_processing_step.py +++ b/tests/unit/sagemaker/workflow/test_processing_step.py @@ -24,7 +24,13 @@ from sagemaker.tuner import HyperparameterTuner from sagemaker.workflow.pipeline_context import PipelineSession -from sagemaker.processing import Processor, ScriptProcessor, FrameworkProcessor +from sagemaker.processing import ( + Processor, + ScriptProcessor, + FrameworkProcessor, + ProcessingOutput, + ProcessingInput, +) from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.pytorch.processing import PyTorchProcessor from sagemaker.tensorflow.processing import TensorFlowProcessor @@ -34,11 +40,12 @@ from sagemaker.wrangler.processing import DataWranglerProcessor from sagemaker.spark.processing import SparkJarProcessor, PySparkProcessor -from sagemaker.processing import ProcessingInput from sagemaker.workflow.steps import CacheConfig, ProcessingStep from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.properties import PropertyFile +from sagemaker.workflow.parameters import ParameterString +from sagemaker.workflow.functions import Join from sagemaker.network import NetworkConfig from sagemaker.pytorch.estimator import PyTorch @@ -62,6 +69,144 @@ DUMMY_S3_SCRIPT_PATH = "s3://dummy-s3/dummy_script.py" INSTANCE_TYPE = "ml.m4.xlarge" +FRAMEWORK_PROCESSOR = [ + ( + FrameworkProcessor( + framework_version="1.8", + instance_type=INSTANCE_TYPE, + instance_count=1, + role=ROLE, + estimator_cls=PyTorch, + ), + {"code": DUMMY_S3_SCRIPT_PATH}, + ), + ( + SKLearnProcessor( + framework_version="0.23-1", + instance_type=INSTANCE_TYPE, + instance_count=1, + role=ROLE, + ), + {"code": DUMMY_S3_SCRIPT_PATH}, + ), + ( + PyTorchProcessor( + role=ROLE, + instance_type=INSTANCE_TYPE, + instance_count=1, + framework_version="1.8.0", + py_version="py3", + ), + {"code": DUMMY_S3_SCRIPT_PATH}, + ), + ( + TensorFlowProcessor( + role=ROLE, + instance_type=INSTANCE_TYPE, + instance_count=1, + framework_version="2.0", + ), + {"code": DUMMY_S3_SCRIPT_PATH}, + ), + ( + HuggingFaceProcessor( + transformers_version="4.6", + pytorch_version="1.7", + role=ROLE, + instance_count=1, + instance_type="ml.p3.2xlarge", + ), + {"code": DUMMY_S3_SCRIPT_PATH}, + ), + ( + XGBoostProcessor( + framework_version="1.3-1", + py_version="py3", + role=ROLE, + instance_count=1, + instance_type=INSTANCE_TYPE, + base_job_name="test-xgboost", + ), + {"code": DUMMY_S3_SCRIPT_PATH}, + ), + ( + MXNetProcessor( + framework_version="1.4.1", + py_version="py3", + role=ROLE, + instance_count=1, + instance_type=INSTANCE_TYPE, + base_job_name="test-mxnet", + ), + {"code": DUMMY_S3_SCRIPT_PATH}, + ), + ( + DataWranglerProcessor( + role=ROLE, + data_wrangler_flow_source="s3://my-bucket/dw.flow", + instance_count=1, + instance_type=INSTANCE_TYPE, + ), + {}, + ), + ( + SparkJarProcessor( + role=ROLE, + framework_version="2.4", + instance_count=1, + instance_type=INSTANCE_TYPE, + ), + { + "submit_app": "s3://my-jar", + "submit_class": "com.amazonaws.sagemaker.spark.test.HelloJavaSparkApp", + "arguments": ["--input", "input-data-uri", "--output", "output-data-uri"], + }, + ), + ( + PySparkProcessor( + role=ROLE, + framework_version="2.4", + instance_count=1, + instance_type=INSTANCE_TYPE, + ), + { + "submit_app": "s3://my-jar", + "arguments": ["--input", "input-data-uri", "--output", "output-data-uri"], + }, + ), +] + +PROCESSING_INPUT = [ + ProcessingInput(source="s3://my-bucket/processing_manifest", destination="processing_manifest"), + ProcessingInput( + source=ParameterString(name="my-processing-input"), + destination="processing-input", + ), + ProcessingInput( + source=ParameterString( + name="my-processing-input", default_value="s3://my-bucket/my-processing" + ), + destination="processing-input", + ), + ProcessingInput( + source=Join(on="/", values=["s3://my-bucket", "my-input"]), + destination="processing-input", + ), +] + +PROCESSING_OUTPUT = [ + ProcessingOutput(source="/opt/ml/output", destination="s3://my-bucket/my-output"), + ProcessingOutput(source="/opt/ml/output", destination=ParameterString(name="my-output")), + ProcessingOutput( + source="/opt/ml/output", + destination=ParameterString(name="my-output", default_value="s3://my-bucket/my-output"), + ), + ProcessingOutput( + source="/opt/ml/output", + destination=Join(on="/", values=["s3://my-bucket", "my-output"]), + ), +] + @pytest.fixture def client(): @@ -253,117 +398,11 @@ def test_processing_step_with_script_processor(pipeline_session, processing_inpu } -@pytest.mark.parametrize( - "framework_processor", - [ - ( - FrameworkProcessor( - framework_version="1.8", - instance_type=INSTANCE_TYPE, - instance_count=1, - role=ROLE, - estimator_cls=PyTorch, - ), - {"code": DUMMY_S3_SCRIPT_PATH}, - ), - ( - SKLearnProcessor( - framework_version="0.23-1", - instance_type=INSTANCE_TYPE, - instance_count=1, - role=ROLE, - ), - {"code": DUMMY_S3_SCRIPT_PATH}, - ), - ( - PyTorchProcessor( - role=ROLE, - instance_type=INSTANCE_TYPE, - instance_count=1, - framework_version="1.8.0", - py_version="py3", - ), - {"code": DUMMY_S3_SCRIPT_PATH}, - ), - ( - TensorFlowProcessor( - role=ROLE, - instance_type=INSTANCE_TYPE, - instance_count=1, - framework_version="2.0", - ), - {"code": DUMMY_S3_SCRIPT_PATH}, - ), - ( - HuggingFaceProcessor( - transformers_version="4.6", - pytorch_version="1.7", - role=ROLE, - instance_count=1, - instance_type="ml.p3.2xlarge", - ), - {"code": DUMMY_S3_SCRIPT_PATH}, - ), - ( - XGBoostProcessor( - framework_version="1.3-1", - py_version="py3", - role=ROLE, - instance_count=1, - instance_type=INSTANCE_TYPE, - base_job_name="test-xgboost", - ), - {"code": DUMMY_S3_SCRIPT_PATH}, - ), - ( - MXNetProcessor( - framework_version="1.4.1", - py_version="py3", - role=ROLE, - instance_count=1, - instance_type=INSTANCE_TYPE, - base_job_name="test-mxnet", - ), - {"code": DUMMY_S3_SCRIPT_PATH}, - ), - ( - DataWranglerProcessor( - role=ROLE, - data_wrangler_flow_source=f"s3://{BUCKET}/dw.flow", - instance_count=1, - instance_type=INSTANCE_TYPE, - ), - {}, - ), - ( - SparkJarProcessor( - role=ROLE, - framework_version="2.4", - instance_count=1, - instance_type=INSTANCE_TYPE, - ), - { - "submit_app": "s3://my-jar", - "submit_class": "com.amazonaws.sagemaker.spark.test.HelloJavaSparkApp", - "arguments": ["--input", "input-data-uri", "--output", "output-data-uri"], - }, - ), - ( - PySparkProcessor( - role=ROLE, - framework_version="2.4", - instance_count=1, - instance_type=INSTANCE_TYPE, - ), - { - "submit_app": "s3://my-jar", - "arguments": ["--input", "input-data-uri", "--output", "output-data-uri"], - }, - ), - ], -) +@pytest.mark.parametrize("framework_processor", FRAMEWORK_PROCESSOR) +@pytest.mark.parametrize("processing_input", PROCESSING_INPUT) +@pytest.mark.parametrize("processing_output", PROCESSING_OUTPUT) def test_processing_step_with_framework_processor( - framework_processor, pipeline_session, processing_input, network_config + framework_processor, pipeline_session, processing_input, processing_output, network_config ): processor, run_inputs = framework_processor @@ -373,7 +412,8 @@ def test_processing_step_with_framework_processor( processor.volume_kms_key = "volume-kms-key" processor.network_config = network_config - run_inputs["inputs"] = processing_input + run_inputs["inputs"] = [processing_input] + run_inputs["outputs"] = [processing_output] step_args = processor.run(**run_inputs) @@ -387,10 +427,25 @@ def test_processing_step_with_framework_processor( sagemaker_session=pipeline_session, ) - assert json.loads(pipeline.definition())["Steps"][0] == { + step_args = step_args.args + step_def = json.loads(pipeline.definition())["Steps"][0] + + assert step_args["ProcessingInputs"][0]["S3Input"]["S3Uri"] == processing_input.source + assert ( + step_args["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] + == processing_output.destination + ) + + del step_args["ProcessingInputs"][0]["S3Input"]["S3Uri"] + del step_def["Arguments"]["ProcessingInputs"][0]["S3Input"]["S3Uri"] + + del step_args["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] + del step_def["Arguments"]["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] + + assert step_def == { "Name": "MyProcessingStep", "Type": "Processing", - "Arguments": step_args.args, + "Arguments": step_args, } diff --git a/tests/unit/sagemaker/workflow/test_training_step.py b/tests/unit/sagemaker/workflow/test_training_step.py index 2fe19b3c19..14df41c876 100644 --- a/tests/unit/sagemaker/workflow/test_training_step.py +++ b/tests/unit/sagemaker/workflow/test_training_step.py @@ -28,6 +28,7 @@ from sagemaker.workflow.steps import TrainingStep from sagemaker.workflow.pipeline import Pipeline +from sagemaker.workflow.functions import Join from sagemaker.estimator import Estimator from sagemaker.sklearn.estimator import SKLearn @@ -66,6 +67,88 @@ DUMMY_S3_SOURCE_DIR = "s3://dummy-s3-source-dir/" INSTANCE_TYPE = "ml.m4.xlarge" +ESTIMATOR_LISTS = [ + SKLearn( + framework_version="0.23-1", + py_version="py3", + instance_type=INSTANCE_TYPE, + instance_count=1, + role=ROLE, + entry_point=DUMMY_LOCAL_SCRIPT_PATH, + ), + PyTorch( + role=ROLE, + instance_type=INSTANCE_TYPE, + instance_count=1, + framework_version="1.8.0", + py_version="py36", + entry_point=DUMMY_LOCAL_SCRIPT_PATH, + ), + TensorFlow( + role=ROLE, + entry_point=DUMMY_LOCAL_SCRIPT_PATH, + instance_type=INSTANCE_TYPE, + instance_count=1, + framework_version="2.0", + py_version="py3", + ), + HuggingFace( + transformers_version="4.6", + pytorch_version="1.7", + role=ROLE, + instance_type="ml.p3.2xlarge", + instance_count=1, + py_version="py36", + entry_point=DUMMY_LOCAL_SCRIPT_PATH, + ), + XGBoost( + framework_version="1.3-1", + py_version="py3", + role=ROLE, + instance_type=INSTANCE_TYPE, + instance_count=1, + entry_point=DUMMY_LOCAL_SCRIPT_PATH, + ), + MXNet( + framework_version="1.4.1", + py_version="py3", + role=ROLE, + instance_type=INSTANCE_TYPE, + instance_count=1, + entry_point=DUMMY_LOCAL_SCRIPT_PATH, + toolkit=RLToolkit.RAY, + framework=RLFramework.TENSORFLOW, + toolkit_version="0.8.5", + ), + RLEstimator( + entry_point="cartpole.py", + toolkit=RLToolkit.RAY, + framework=RLFramework.TENSORFLOW, + toolkit_version="0.8.5", + role=ROLE, + instance_type=INSTANCE_TYPE, + instance_count=1, + ), + Chainer( + role=ROLE, + entry_point=DUMMY_LOCAL_SCRIPT_PATH, + use_mpi=True, + num_processes=4, + framework_version="5.0.0", + instance_type=INSTANCE_TYPE, + instance_count=1, + py_version="py3", + ), +] + + +INPUT_PARAM_LISTS = [ + "s3://my-bucket/my-training-input", + ParameterString(name="training_input", default_value="s3://my-bucket/my-input"), + ParameterString(name="training_input"), + Join(on="/", values=["s3://my-bucket", "my-input"]), +] + @pytest.fixture def client(): @@ -161,120 +244,23 @@ def test_training_step_with_estimator(pipeline_session, training_input, hyperpar assert step.properties.TrainingJobName.expr == {"Get": "Steps.MyTrainingStep.TrainingJobName"} -def test_estimator_with_parameterized_output(pipeline_session, training_input): - output_path = ParameterString(name="OutputPath") - estimator = XGBoost( - framework_version="1.3-1", - py_version="py3", - role=ROLE, - instance_type=INSTANCE_TYPE, - instance_count=1, - entry_point=DUMMY_LOCAL_SCRIPT_PATH, - output_path=output_path, - sagemaker_session=pipeline_session, - ) - step_args = estimator.fit(inputs=training_input) - step = TrainingStep( - name="MyTrainingStep", - step_args=step_args, - description="TrainingStep description", - display_name="MyTrainingStep", - ) - pipeline = Pipeline( - name="MyPipeline", - steps=[step], - sagemaker_session=pipeline_session, - ) - step_def = json.loads(pipeline.definition())["Steps"][0] - assert step_def["Arguments"]["OutputDataConfig"]["S3OutputPath"] == { - "Get": "Parameters.OutputPath" - } - - +@pytest.mark.parametrize("estimator", ESTIMATOR_LISTS) +@pytest.mark.parametrize("training_input", INPUT_PARAM_LISTS) @pytest.mark.parametrize( - "estimator", - [ - SKLearn( - framework_version="0.23-1", - py_version="py3", - instance_type=INSTANCE_TYPE, - instance_count=1, - role=ROLE, - entry_point=DUMMY_LOCAL_SCRIPT_PATH, - ), - PyTorch( - role=ROLE, - instance_type=INSTANCE_TYPE, - instance_count=1, - framework_version="1.8.0", - py_version="py36", - entry_point=DUMMY_LOCAL_SCRIPT_PATH, - ), - TensorFlow( - role=ROLE, - instance_type=INSTANCE_TYPE, - instance_count=1, - framework_version="2.0", - py_version="py3", - entry_point=DUMMY_LOCAL_SCRIPT_PATH, - ), - HuggingFace( - transformers_version="4.6", - pytorch_version="1.7", - role=ROLE, - instance_type="ml.p3.2xlarge", - instance_count=1, - py_version="py36", - entry_point=DUMMY_LOCAL_SCRIPT_PATH, - ), - XGBoost( - framework_version="1.3-1", - py_version="py3", - role=ROLE, - instance_type=INSTANCE_TYPE, - instance_count=1, - entry_point=DUMMY_LOCAL_SCRIPT_PATH, - ), - MXNet( - framework_version="1.4.1", - py_version="py3", - role=ROLE, - instance_type=INSTANCE_TYPE, - instance_count=1, - entry_point=DUMMY_LOCAL_SCRIPT_PATH, - ), - RLEstimator( - entry_point="cartpole.py", - toolkit=RLToolkit.RAY, - framework=RLFramework.TENSORFLOW, - toolkit_version="0.8.5", - role=ROLE, - instance_type=INSTANCE_TYPE, - instance_count=1, - ), - Chainer( - role=ROLE, - entry_point=DUMMY_LOCAL_SCRIPT_PATH, - use_mpi=True, - num_processes=4, - framework_version="5.0.0", - instance_type=INSTANCE_TYPE, - instance_count=1, - py_version="py3", - ), - ], + "output_path", ["s3://my-bucket/my-output-path", ParameterString(name="OutputPath")] ) def test_training_step_with_framework_estimator( - estimator, pipeline_session, training_input, hyperparameters + estimator, pipeline_session, training_input, output_path, hyperparameters ): estimator.source_dir = DUMMY_S3_SOURCE_DIR estimator.set_hyperparameters(**hyperparameters) estimator.volume_kms_key = "volume-kms-key" estimator.output_kms_key = "output-kms-key" estimator.dependencies = ["dep-1", "dep-2"] + estimator.output_path = output_path estimator.sagemaker_session = pipeline_session - step_args = estimator.fit(inputs=training_input) + step_args = estimator.fit(inputs=TrainingInput(s3_data=training_input)) step = TrainingStep( name="MyTrainingStep", @@ -286,10 +272,26 @@ def test_training_step_with_framework_estimator( sagemaker_session=pipeline_session, ) - assert json.loads(pipeline.definition())["Steps"][0] == { + step_args = step_args.args + step_def = json.loads(pipeline.definition())["Steps"][0] + + assert step_args["InputDataConfig"][0]["DataSource"]["S3DataSource"]["S3Uri"] == training_input + assert step_args["OutputDataConfig"]["S3OutputPath"] == output_path + + del step_args["InputDataConfig"][0]["DataSource"]["S3DataSource"]["S3Uri"] + del step_def["Arguments"]["InputDataConfig"][0]["DataSource"]["S3DataSource"]["S3Uri"] + + del step_args["OutputDataConfig"]["S3OutputPath"] + del step_def["Arguments"]["OutputDataConfig"]["S3OutputPath"] + + if "sagemaker_s3_output" in step_args["HyperParameters"]: + del step_args["HyperParameters"]["sagemaker_s3_output"] + del step_def["Arguments"]["HyperParameters"]["sagemaker_s3_output"] + + assert step_def == { "Name": "MyTrainingStep", "Type": "Training", - "Arguments": step_args.args, + "Arguments": step_args, } @@ -308,7 +310,11 @@ def test_training_step_with_framework_estimator( IPInsights, ], ) -def test_training_step_with_algorithm_base(algo_estimator, pipeline_session): +@pytest.mark.parametrize( + "training_input", + INPUT_PARAM_LISTS, +) +def test_training_step_with_algorithm_base(algo_estimator, training_input, pipeline_session): estimator = algo_estimator( role=ROLE, instance_type=INSTANCE_TYPE, @@ -316,7 +322,7 @@ def test_training_step_with_algorithm_base(algo_estimator, pipeline_session): sagemaker_session=pipeline_session, ) data = RecordSet( - "s3://{}/{}".format(pipeline_session.default_bucket(), "dummy"), + s3_data=training_input, num_records=1000, feature_dim=128, channel="train", @@ -343,12 +349,19 @@ def test_training_step_with_algorithm_base(algo_estimator, pipeline_session): steps=[step], sagemaker_session=pipeline_session, ) - assert json.loads(pipeline.definition())["Steps"][0] == { + + step_args = step_args.args + + step_def = json.loads(pipeline.definition())["Steps"][0] + assert step_args["InputDataConfig"][0]["DataSource"]["S3DataSource"]["S3Uri"] == training_input + del step_args["InputDataConfig"][0]["DataSource"]["S3DataSource"]["S3Uri"] + del step_def["Arguments"]["InputDataConfig"][0]["DataSource"]["S3DataSource"]["S3Uri"] + + assert step_def == { "Name": "MyTrainingStep", "Type": "Training", - "Arguments": step_args.args, + "Arguments": step_args, } - assert step.properties.TrainingJobName.expr == {"Get": "Steps.MyTrainingStep.TrainingJobName"} @pytest.mark.parametrize( diff --git a/tests/unit/sagemaker/workflow/test_tuning_step.py b/tests/unit/sagemaker/workflow/test_tuning_step.py index 3f905287c8..af9ce57d0c 100644 --- a/tests/unit/sagemaker/workflow/test_tuning_step.py +++ b/tests/unit/sagemaker/workflow/test_tuning_step.py @@ -26,6 +26,8 @@ from sagemaker.workflow.steps import TuningStep from sagemaker.inputs import TrainingInput from sagemaker.workflow.pipeline import Pipeline +from sagemaker.workflow.parameters import ParameterString +from sagemaker.workflow.functions import Join from sagemaker.tuner import HyperparameterTuner, IntegerParameter from sagemaker.pytorch.estimator import PyTorch @@ -86,8 +88,17 @@ def entry_point(): return os.path.join(base_dir, "mnist.py") -def test_tuning_step_with_single_algo_tuner(pipeline_session, entry_point): - inputs = TrainingInput(s3_data=f"s3://{pipeline_session.default_bucket()}/training-data") +@pytest.mark.parametrize( + "training_input", + [ + "s3://my-bucket/my-training-input", + ParameterString(name="training_input", default_value="s3://my-bucket/my-input"), + ParameterString(name="training_input"), + Join(on="/", values=["s3://my-bucket", "my-input"]), + ], +) +def test_tuning_step_with_single_algo_tuner(pipeline_session, training_input, entry_point): + inputs = TrainingInput(s3_data=training_input) pytorch_estimator = PyTorch( entry_point=entry_point, @@ -134,10 +145,26 @@ def test_tuning_step_with_single_algo_tuner(pipeline_session, entry_point): sagemaker_session=pipeline_session, ) - assert json.loads(pipeline.definition())["Steps"][0] == { + step_args = step_args.args + step_def = json.loads(pipeline.definition())["Steps"][0] + + assert ( + step_args["TrainingJobDefinition"]["InputDataConfig"][0]["DataSource"]["S3DataSource"][ + "S3Uri" + ] + == training_input + ) + del step_args["TrainingJobDefinition"]["InputDataConfig"][0]["DataSource"]["S3DataSource"][ + "S3Uri" + ] + del step_def["Arguments"]["TrainingJobDefinition"]["InputDataConfig"][0]["DataSource"][ + "S3DataSource" + ]["S3Uri"] + + assert step_def == { "Name": "MyTuningStep", "Type": "Tuning", - "Arguments": step_args.args, + "Arguments": step_args, } From eb70546f95700e547ea773b39d7c54222b0ac8e7 Mon Sep 17 00:00:00 2001 From: jerrypeng7773 <50377760+jerrypeng7773@users.noreply.github.com> Date: Thu, 2 Jun 2022 12:46:17 -0700 Subject: [PATCH 2/2] remove unused imports --- tests/unit/sagemaker/workflow/test_training_step.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/sagemaker/workflow/test_training_step.py b/tests/unit/sagemaker/workflow/test_training_step.py index 6bcb6a56a8..14df41c876 100644 --- a/tests/unit/sagemaker/workflow/test_training_step.py +++ b/tests/unit/sagemaker/workflow/test_training_step.py @@ -15,7 +15,6 @@ import os import json from mock import Mock, PropertyMock -import re import pytest import warnings