diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index f655645ffd..c6be941b7f 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -1879,7 +1879,9 @@ def _add_spot_checkpoint_args(cls, local_mode, estimator, train_args): if estimator.use_spot_instances: if local_mode: raise ValueError("Spot training is not supported in local mode.") - train_args["use_spot_instances"] = True + # estimator.use_spot_instances may be a Pipeline ParameterBoolean object + # which is parsed during the Pipeline execution runtime + train_args["use_spot_instances"] = estimator.use_spot_instances if estimator.checkpoint_s3_uri: if local_mode: diff --git a/src/sagemaker/model.py b/src/sagemaker/model.py index 317e865ddd..104b156646 100644 --- a/src/sagemaker/model.py +++ b/src/sagemaker/model.py @@ -37,6 +37,7 @@ from sagemaker.utils import unique_name_from_base from sagemaker.async_inference import AsyncInferenceConfig from sagemaker.predictor_async import AsyncPredictor +from sagemaker.workflow.entities import PipelineVariable LOGGER = logging.getLogger("sagemaker") @@ -443,7 +444,7 @@ def _upload_code(self, key_prefix: str, repack: bool = False) -> None: ) if repack and self.model_data is not None and self.entry_point is not None: - if isinstance(self.model_data, sagemaker.workflow.properties.Properties): + if isinstance(self.model_data, PipelineVariable): # model is not yet there, defer repacking to later during pipeline execution return diff --git a/src/sagemaker/processing.py b/src/sagemaker/processing.py index b9136c6c0e..25fe2f4376 100644 --- a/src/sagemaker/processing.py +++ b/src/sagemaker/processing.py @@ -36,7 +36,7 @@ from sagemaker.session import Session from sagemaker.workflow.properties import Properties from sagemaker.workflow.parameters import Parameter -from sagemaker.workflow.entities import Expression +from sagemaker.workflow.entities import Expression, PipelineVariable from sagemaker.dataset_definition.inputs import S3Input, DatasetDefinition from sagemaker.apiutils._base_types import ApiObject from sagemaker.s3 import S3Uploader @@ -233,6 +233,12 @@ def _normalize_args( kms_key (str): The ARN of the KMS key that is used to encrypt the user code file (default: None). """ + if code and isinstance(code, PipelineVariable): + raise ValueError( + "code argument has to be a valid S3 URI or local file path " + + "rather than a pipeline variable" + ) + self._current_job_name = self._generate_current_job_name(job_name=job_name) inputs_with_code = self._include_code_in_inputs(inputs, code, kms_key) diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index c50a22d3f8..cbc0fb37cd 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -763,6 +763,8 @@ def _get_train_request( # noqa: C901 train_request["EnableInterContainerTrafficEncryption"] = encrypt_inter_container_traffic if use_spot_instances: + # estimator.use_spot_instances may be a Pipeline ParameterBoolean object + # which is parsed during the Pipeline execution runtime train_request["EnableManagedSpotTraining"] = use_spot_instances if checkpoint_s3_uri: @@ -2338,13 +2340,17 @@ def _map_training_config( training_job_definition["VpcConfig"] = vpc_config if enable_network_isolation: - training_job_definition["EnableNetworkIsolation"] = True + training_job_definition["EnableNetworkIsolation"] = enable_network_isolation if encrypt_inter_container_traffic: - training_job_definition["EnableInterContainerTrafficEncryption"] = True + training_job_definition[ + "EnableInterContainerTrafficEncryption" + ] = encrypt_inter_container_traffic if use_spot_instances: - training_job_definition["EnableManagedSpotTraining"] = True + # use_spot_instances may be a Pipeline ParameterBoolean object + # which is parsed during the Pipeline execution runtime + training_job_definition["EnableManagedSpotTraining"] = use_spot_instances if checkpoint_s3_uri: checkpoint_config = {"S3Uri": checkpoint_s3_uri} diff --git a/src/sagemaker/tensorflow/model.py b/src/sagemaker/tensorflow/model.py index 0b7c369f48..97a6f7ed00 100644 --- a/src/sagemaker/tensorflow/model.py +++ b/src/sagemaker/tensorflow/model.py @@ -21,6 +21,7 @@ from sagemaker.deprecations import removed_kwargs from sagemaker.predictor import Predictor from sagemaker.serializers import JSONSerializer +from sagemaker.workflow.entities import PipelineVariable class TensorFlowPredictor(Predictor): @@ -330,7 +331,9 @@ def prepare_container_def(self, instance_type=None, accelerator_type=None): image_uri = self._get_image_uri(instance_type, accelerator_type) env = self._get_container_env() - if self.entry_point: + # If self.model_data is pipeline variable, model is not yet there. + # So defer repacking to later during pipeline execution + if self.entry_point and not isinstance(self.model_data, PipelineVariable): key_prefix = sagemaker.fw_utils.model_code_key_prefix( self.key_prefix, self.name, image_uri ) diff --git a/src/sagemaker/tuner.py b/src/sagemaker/tuner.py index 85cf811d64..d9762ae750 100644 --- a/src/sagemaker/tuner.py +++ b/src/sagemaker/tuner.py @@ -39,9 +39,6 @@ ParameterRange, ) from sagemaker.workflow.entities import PipelineVariable -from sagemaker.workflow.parameters import Parameter as PipelineParameter -from sagemaker.workflow.functions import JsonGet as PipelineJsonGet -from sagemaker.workflow.functions import Join as PipelineJoin from sagemaker.session import Session from sagemaker.utils import base_from_name, base_name_from_image, name_from_base @@ -64,18 +61,6 @@ logger = logging.getLogger(__name__) -def is_pipeline_parameters(value): - """Determine if a value is a pipeline parameter or function representation - - Args: - value (float or int): The value to be verified. - - Returns: - bool: True if it is, False otherwise. - """ - return isinstance(value, (PipelineParameter, PipelineJsonGet, PipelineJoin)) - - class WarmStartTypes(Enum): """Warm Start Configuration type. diff --git a/src/sagemaker/workflow/_repack_model.py b/src/sagemaker/workflow/_repack_model.py index f98f170f39..3cfa6760b3 100644 --- a/src/sagemaker/workflow/_repack_model.py +++ b/src/sagemaker/workflow/_repack_model.py @@ -39,14 +39,14 @@ def repack(inference_script, model_archive, dependencies=None, source_dir=None): Args: inference_script (str): The path to the custom entry point. - model_archive (str): The name of the model TAR archive. + model_archive (str): The name or path (e.g. s3 uri) of the model TAR archive. dependencies (str): A space-delimited string of paths to custom dependencies. source_dir (str): The path to a custom source directory. """ # the data directory contains a model archive generated by a previous training job data_directory = "/opt/ml/input/data/training" - model_path = os.path.join(data_directory, model_archive) + model_path = os.path.join(data_directory, model_archive.split("/")[-1]) # create a temporary directory with tempfile.TemporaryDirectory() as tmp: diff --git a/src/sagemaker/workflow/_utils.py b/src/sagemaker/workflow/_utils.py index fbbb6acba9..74de92314c 100644 --- a/src/sagemaker/workflow/_utils.py +++ b/src/sagemaker/workflow/_utils.py @@ -137,12 +137,6 @@ def __init__( self._model_data = model_data self.sagemaker_session = sagemaker_session self.role = role - if isinstance(model_data, Properties): - self._model_prefix = model_data - self._model_archive = "model.tar.gz" - else: - self._model_prefix = "/".join(self._model_data.split("/")[:-1]) - self._model_archive = self._model_data.split("/")[-1] self._entry_point = entry_point self._entry_point_basename = os.path.basename(self._entry_point) self._source_dir = source_dir @@ -164,7 +158,7 @@ def __init__( role=self.role, hyperparameters={ "inference_script": self._entry_point_basename, - "model_archive": self._model_archive, + "model_archive": self._model_data, "dependencies": dependencies_hyperparameter, "source_dir": self._source_dir, }, @@ -173,7 +167,7 @@ def __init__( **kwargs, ) repacker.disable_profiler = True - inputs = TrainingInput(self._model_prefix) + inputs = TrainingInput(self._model_data) # super! super(_RepackModelStep, self).__init__( diff --git a/src/sagemaker/workflow/airflow.py b/src/sagemaker/workflow/airflow.py index 739abc841a..7c78543702 100644 --- a/src/sagemaker/workflow/airflow.py +++ b/src/sagemaker/workflow/airflow.py @@ -184,7 +184,9 @@ def training_base_config(estimator, inputs=None, job_name=None, mini_batch_size= train_config["VpcConfig"] = job_config["vpc_config"] if estimator.use_spot_instances: - train_config["EnableManagedSpotTraining"] = True + # estimator.use_spot_instances may be a Pipeline ParameterBoolean object + # which is parsed during the Pipeline execution runtime + train_config["EnableManagedSpotTraining"] = estimator.use_spot_instances if estimator.hyperparameters() is not None: hyperparameters = {str(k): str(v) for (k, v) in estimator.hyperparameters().items()} diff --git a/src/sagemaker/workflow/clarify_check_step.py b/src/sagemaker/workflow/clarify_check_step.py index b2ae30a7f6..af4b7d7f60 100644 --- a/src/sagemaker/workflow/clarify_check_step.py +++ b/src/sagemaker/workflow/clarify_check_step.py @@ -37,8 +37,8 @@ from sagemaker.model_monitor.model_monitoring import _MODEL_MONITOR_S3_PATH from sagemaker.processing import ProcessingInput, ProcessingOutput, ProcessingJob from sagemaker.utils import name_from_base -from sagemaker.workflow import PipelineNonPrimitiveInputTypes, ExecutionVariable, Parameter -from sagemaker.workflow.entities import RequestType, Expression +from sagemaker.workflow import PipelineNonPrimitiveInputTypes +from sagemaker.workflow.entities import RequestType, PipelineVariable from sagemaker.workflow.properties import Properties from sagemaker.workflow.steps import Step, StepTypeEnum, CacheConfig from sagemaker.workflow.check_job_config import CheckJobConfig @@ -194,8 +194,7 @@ def __init__( ) if isinstance( - clarify_check_config.data_config.s3_analysis_config_output_path, - (ExecutionVariable, Expression, Parameter, Properties), + clarify_check_config.data_config.s3_analysis_config_output_path, PipelineVariable ): raise RuntimeError( "s3_analysis_config_output_path cannot be of type " @@ -203,8 +202,7 @@ def __init__( ) if not clarify_check_config.data_config.s3_analysis_config_output_path and isinstance( - clarify_check_config.data_config.s3_output_path, - (ExecutionVariable, Expression, Parameter, Properties), + clarify_check_config.data_config.s3_output_path, PipelineVariable ): raise RuntimeError( "`s3_output_path` cannot be of type ExecutionVariable/Expression/Parameter" diff --git a/src/sagemaker/workflow/conditions.py b/src/sagemaker/workflow/conditions.py index 065cf01315..2f9c923569 100644 --- a/src/sagemaker/workflow/conditions.py +++ b/src/sagemaker/workflow/conditions.py @@ -28,6 +28,7 @@ Expression, PrimitiveType, RequestType, + PipelineVariable, ) from sagemaker.workflow.execution_variables import ExecutionVariable from sagemaker.workflow.parameters import Parameter @@ -261,6 +262,6 @@ def primitive_or_expr( Returns: Either the expression of the value or the primitive value. """ - if isinstance(value, (ExecutionVariable, Expression, Parameter, Properties)): + if isinstance(value, PipelineVariable): return value.expr return value diff --git a/src/sagemaker/workflow/quality_check_step.py b/src/sagemaker/workflow/quality_check_step.py index b9a688cecd..5c27e21b0c 100644 --- a/src/sagemaker/workflow/quality_check_step.py +++ b/src/sagemaker/workflow/quality_check_step.py @@ -22,9 +22,9 @@ from sagemaker import s3 from sagemaker.model_monitor import ModelMonitor from sagemaker.processing import ProcessingOutput, ProcessingJob, Processor, ProcessingInput -from sagemaker.workflow import PipelineNonPrimitiveInputTypes, ExecutionVariable, Parameter +from sagemaker.workflow import PipelineNonPrimitiveInputTypes -from sagemaker.workflow.entities import RequestType, Expression +from sagemaker.workflow.entities import RequestType, PipelineVariable from sagemaker.workflow.properties import ( Properties, ) @@ -279,7 +279,7 @@ def _generate_baseline_job_inputs(self): _CONTAINER_BASE_PATH, _CONTAINER_INPUT_PATH, _BASELINE_DATASET_INPUT_NAME ) ) - if isinstance(baseline_dataset, (ExecutionVariable, Expression, Parameter, Properties)): + if isinstance(baseline_dataset, PipelineVariable): baseline_dataset_input = ProcessingInput( source=self.quality_check_config.baseline_dataset, destination=baseline_dataset_des, diff --git a/tests/integ/sagemaker/workflow/test_model_registration.py b/tests/integ/sagemaker/workflow/test_model_registration.py index 193bbb9755..1d31597b25 100644 --- a/tests/integ/sagemaker/workflow/test_model_registration.py +++ b/tests/integ/sagemaker/workflow/test_model_registration.py @@ -20,6 +20,7 @@ from botocore.exceptions import WaiterError import tests +from sagemaker.tensorflow import TensorFlow, TensorFlowModel from tests.integ.retry import retries from sagemaker.drift_check_baselines import DriftCheckBaselines from sagemaker import ( @@ -745,3 +746,101 @@ def test_model_registration_with_model_repack( pipeline.delete() except Exception: pass + + +def test_model_registration_with_tensorflow_model_with_pipeline_model( + sagemaker_session, role, tf_full_version, tf_full_py_version, pipeline_name, region_name +): + base_dir = os.path.join(DATA_DIR, "tensorflow_mnist") + entry_point = os.path.join(base_dir, "mnist_v2.py") + input_path = sagemaker_session.upload_data( + path=os.path.join(base_dir, "data"), + key_prefix="integ-test-data/tf-scriptmode/mnist/training", + ) + inputs = TrainingInput(s3_data=input_path) + + instance_count = ParameterInteger(name="InstanceCount", default_value=1) + instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") + + tensorflow_estimator = TensorFlow( + entry_point=entry_point, + role=role, + instance_count=instance_count, + instance_type=instance_type, + framework_version=tf_full_version, + py_version=tf_full_py_version, + sagemaker_session=sagemaker_session, + ) + step_train = TrainingStep( + name="MyTrain", + estimator=tensorflow_estimator, + inputs=inputs, + ) + + model = TensorFlowModel( + entry_point=entry_point, + framework_version="2.4", + model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, + role=role, + sagemaker_session=sagemaker_session, + ) + + pipeline_model = PipelineModel( + name="MyModelPipeline", models=[model], role=role, sagemaker_session=sagemaker_session + ) + + step_register_model = RegisterModel( + name="MyRegisterModel", + model=pipeline_model, + model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, + content_types=["application/json"], + response_types=["application/json"], + inference_instances=["ml.t2.medium", "ml.m5.large"], + transform_instances=["ml.m5.large"], + model_package_group_name=f"{pipeline_name}TestModelPackageGroup", + ) + + pipeline = Pipeline( + name=pipeline_name, + parameters=[ + instance_count, + instance_type, + ], + steps=[step_train, step_register_model], + sagemaker_session=sagemaker_session, + ) + + try: + response = pipeline.create(role) + create_arn = response["PipelineArn"] + + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + create_arn, + ) + + for _ in retries( + max_retry_count=5, + exception_message_prefix="Waiting for a successful execution of pipeline", + seconds_to_sleep=10, + ): + execution = pipeline.start(parameters={}) + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", + execution.arn, + ) + try: + execution.wait(delay=30, max_attempts=60) + except WaiterError: + pass + execution_steps = execution.list_steps() + + assert len(execution_steps) == 3 + for step in execution_steps: + assert step["StepStatus"] == "Succeeded" + break + finally: + try: + pipeline.delete() + except Exception: + pass diff --git a/tests/integ/sagemaker/workflow/test_training_steps.py b/tests/integ/sagemaker/workflow/test_training_steps.py index 0f1ba84a55..248dacac44 100644 --- a/tests/integ/sagemaker/workflow/test_training_steps.py +++ b/tests/integ/sagemaker/workflow/test_training_steps.py @@ -13,19 +13,22 @@ from __future__ import absolute_import import os +import re import uuid import logging import pytest from botocore.exceptions import WaiterError -from sagemaker import TrainingInput, get_execution_role, utils +from sagemaker import TrainingInput, get_execution_role, utils, image_uris from sagemaker.debugger import ( DebuggerHookConfig, Rule, rule_configs, ) +from sagemaker.estimator import Estimator from sagemaker.pytorch.estimator import PyTorch +from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger, ParameterString from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.steps import TrainingStep @@ -151,3 +154,72 @@ def test_training_job_with_debugger_and_profiler( pipeline.delete() except Exception: pass + + +def test_training_step_with_output_path_as_join( + sagemaker_session, role, tf_full_version, tf_full_py_version, pipeline_name, region_name +): + base_dir = os.path.join(DATA_DIR, "dummy_tensor") + input_path = sagemaker_session.upload_data( + path=base_dir, key_prefix="integ-test-data/estimator/training" + ) + inputs = TrainingInput(s3_data=input_path) + + instance_count = ParameterInteger(name="InstanceCount", default_value=1) + instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") + output_path = Join( + on="/", values=["s3:/", f"{sagemaker_session.default_bucket()}", f"{pipeline_name}Train"] + ) + + image_uri = image_uris.retrieve("factorization-machines", sagemaker_session.boto_region_name) + estimator = Estimator( + image_uri=image_uri, + role=role, + instance_count=instance_count, + instance_type=instance_type, + sagemaker_session=sagemaker_session, + output_path=output_path, + ) + estimator.set_hyperparameters( + num_factors=10, feature_dim=784, mini_batch_size=100, predictor_type="binary_classifier" + ) + step_train = TrainingStep( + name="MyTrain", + estimator=estimator, + inputs=inputs, + ) + + pipeline = Pipeline( + name=pipeline_name, + parameters=[instance_count, instance_type], + steps=[step_train], + sagemaker_session=sagemaker_session, + ) + + try: + response = pipeline.create(role) + create_arn = response["PipelineArn"] + + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + create_arn, + ) + + execution = pipeline.start(parameters={}) + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", + execution.arn, + ) + try: + execution.wait(delay=30, max_attempts=60) + except WaiterError: + pass + execution_steps = execution.list_steps() + + assert len(execution_steps) == 1 + assert execution_steps[0]["StepName"] == "MyTrain" + finally: + try: + pipeline.delete() + except Exception: + pass diff --git a/tests/integ/sagemaker/workflow/test_tuning_steps.py b/tests/integ/sagemaker/workflow/test_tuning_steps.py index 347420c7e0..2fe0161a61 100644 --- a/tests/integ/sagemaker/workflow/test_tuning_steps.py +++ b/tests/integ/sagemaker/workflow/test_tuning_steps.py @@ -16,7 +16,9 @@ import re import pytest +from botocore.exceptions import WaiterError +from tests.integ.retry import retries from sagemaker import TrainingInput, Model, get_execution_role, utils from sagemaker.dataset_definition import DatasetDefinition, AthenaDatasetDefinition from sagemaker.inputs import CreateModelInput @@ -154,6 +156,8 @@ def test_tuning_single_algo( ), sagemaker_session=sagemaker_session, role=role, + entry_point=entry_point, + source_dir=base_dir, ) step_second_best_model = CreateModelStep( @@ -177,11 +181,26 @@ def test_tuning_single_algo( create_arn, ) - execution = pipeline.start(parameters={}) - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", - execution.arn, - ) + for _ in retries( + max_retry_count=5, + exception_message_prefix="Waiting for a successful execution of pipeline", + seconds_to_sleep=10, + ): + execution = pipeline.start(parameters={}) + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", + execution.arn, + ) + try: + execution.wait(delay=30, max_attempts=60) + except WaiterError: + pass + execution_steps = execution.list_steps() + + assert len(execution_steps) == 3 + for step in execution_steps: + assert step["StepStatus"] == "Succeeded" + break finally: try: pipeline.delete() diff --git a/tests/integ/sagemaker/workflow/test_workflow.py b/tests/integ/sagemaker/workflow/test_workflow.py index e0c2c3219d..09e8701d07 100644 --- a/tests/integ/sagemaker/workflow/test_workflow.py +++ b/tests/integ/sagemaker/workflow/test_workflow.py @@ -23,6 +23,10 @@ from botocore.exceptions import WaiterError import pandas as pd +from tests.integ.retry import retries +from sagemaker.parameter import IntegerParameter +from sagemaker.pytorch import PyTorch +from sagemaker.tuner import HyperparameterTuner from tests.integ.timeout import timeout from sagemaker.session import Session @@ -71,6 +75,7 @@ TransformStep, TransformInput, PropertyFile, + TuningStep, ) from sagemaker.workflow.step_collections import RegisterModel from sagemaker.workflow.pipeline import Pipeline @@ -1008,3 +1013,111 @@ def test_create_and_update_with_parallelism_config( pipeline.delete() except Exception: pass + + +def test_model_registration_with_tuning_model( + sagemaker_session, + role, + cpu_instance_type, + pipeline_name, + region_name, +): + base_dir = os.path.join(DATA_DIR, "pytorch_mnist") + entry_point = os.path.join(base_dir, "mnist.py") + input_path = sagemaker_session.upload_data( + path=os.path.join(base_dir, "training"), + key_prefix="integ-test-data/pytorch_mnist/training", + ) + inputs = TrainingInput(s3_data=input_path) + + instance_count = ParameterInteger(name="InstanceCount", default_value=1) + instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") + + pytorch_estimator = PyTorch( + entry_point=entry_point, + role=role, + framework_version="1.5.0", + py_version="py3", + instance_count=instance_count, + instance_type=instance_type, + sagemaker_session=sagemaker_session, + enable_sagemaker_metrics=True, + max_retry_attempts=3, + ) + + min_batch_size = ParameterString(name="MinBatchSize", default_value="64") + max_batch_size = ParameterString(name="MaxBatchSize", default_value="128") + hyperparameter_ranges = { + "batch-size": IntegerParameter(min_batch_size, max_batch_size), + } + + tuner = HyperparameterTuner( + estimator=pytorch_estimator, + objective_metric_name="test:acc", + objective_type="Maximize", + hyperparameter_ranges=hyperparameter_ranges, + metric_definitions=[{"Name": "test:acc", "Regex": "Overall test accuracy: (.*?);"}], + max_jobs=2, + max_parallel_jobs=2, + ) + + step_tune = TuningStep( + name="my-tuning-step", + tuner=tuner, + inputs=inputs, + ) + + step_register_best = RegisterModel( + name="my-model-regis", + estimator=pytorch_estimator, + model_data=step_tune.get_top_model_s3_uri( + top_k=0, + s3_bucket=sagemaker_session.default_bucket(), + ), + content_types=["text/csv"], + response_types=["text/csv"], + inference_instances=["ml.t2.medium", "ml.m5.large"], + transform_instances=["ml.m5.large"], + entry_point=entry_point, + ) + + pipeline = Pipeline( + name=pipeline_name, + parameters=[instance_count, instance_type, min_batch_size, max_batch_size], + steps=[step_tune, step_register_best], + sagemaker_session=sagemaker_session, + ) + + try: + response = pipeline.create(role) + create_arn = response["PipelineArn"] + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + create_arn, + ) + + for _ in retries( + max_retry_count=5, + exception_message_prefix="Waiting for a successful execution of pipeline", + seconds_to_sleep=10, + ): + execution = pipeline.start(parameters={}) + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", + execution.arn, + ) + try: + execution.wait(delay=30, max_attempts=60) + except WaiterError: + pass + execution_steps = execution.list_steps() + + assert len(execution_steps) == 3 + for step in execution_steps: + assert step["StepStatus"] == "Succeeded" + break + finally: + try: + pipeline.delete() + except Exception: + pass diff --git a/tests/unit/sagemaker/workflow/test_repack_model_script.py b/tests/unit/sagemaker/workflow/test_repack_model_script.py index 69c9e7b740..043f5d3963 100644 --- a/tests/unit/sagemaker/workflow/test_repack_model_script.py +++ b/tests/unit/sagemaker/workflow/test_repack_model_script.py @@ -10,7 +10,6 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -# language governing permissions and limitations under the License. from __future__ import absolute_import from sagemaker.workflow import _repack_model @@ -36,7 +35,8 @@ def test_repack_entry_point_only(tmp): open(fake_model_path, "w") # create model.tar.gz - model_tar_name = "model-%s.tar.gz" % time.time() + model_tar_path = "s3://my-bucket/model-%s.tar.gz" % time.time() + model_tar_name = model_tar_path.split("/")[-1] model_tar_location = os.path.join(tmp, model_tar_name) with tarfile.open(model_tar_location, mode="w:gz") as t: t.add(fake_model_path, arcname=model_name) @@ -54,7 +54,7 @@ def test_repack_entry_point_only(tmp): ) # repack - _repack_model.repack(inference_script="inference.py", model_archive=model_tar_name) + _repack_model.repack(inference_script="inference.py", model_archive=model_tar_path) # /opt/ml/model should now have the original model and the inference script assert os.path.exists(os.path.join("/opt/ml/model", model_name)) diff --git a/tests/unit/sagemaker/workflow/test_step_collections.py b/tests/unit/sagemaker/workflow/test_step_collections.py index ea810796f4..899c9ab7b2 100644 --- a/tests/unit/sagemaker/workflow/test_step_collections.py +++ b/tests/unit/sagemaker/workflow/test_step_collections.py @@ -472,7 +472,7 @@ def test_register_model_with_model_repack_with_estimator( "HyperParameters": { "inference_script": '"dummy_script.py"', "dependencies": f'"{dummy_requirements}"', - "model_archive": '"model.tar.gz"', + "model_archive": '"s3://my-bucket/model.tar.gz"', "sagemaker_program": '"_repack_model.py"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', @@ -485,7 +485,7 @@ def test_register_model_with_model_repack_with_estimator( "S3DataSource": { "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", - "S3Uri": f"s3://{BUCKET}", + "S3Uri": f"s3://{BUCKET}/model.tar.gz", } }, } @@ -596,7 +596,7 @@ def test_register_model_with_model_repack_with_model(model, model_metrics, drift }, "HyperParameters": { "inference_script": '"dummy_script.py"', - "model_archive": '"model.tar.gz"', + "model_archive": '"s3://my-bucket/model.tar.gz"', "sagemaker_program": '"_repack_model.py"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', @@ -610,7 +610,7 @@ def test_register_model_with_model_repack_with_model(model, model_metrics, drift "S3DataSource": { "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", - "S3Uri": f"s3://{BUCKET}", + "S3Uri": f"s3://{BUCKET}/model.tar.gz", } }, } @@ -726,7 +726,7 @@ def test_register_model_with_model_repack_with_pipeline_model( "HyperParameters": { "dependencies": "null", "inference_script": '"dummy_script.py"', - "model_archive": '"model.tar.gz"', + "model_archive": '"s3://my-bucket/model.tar.gz"', "sagemaker_program": '"_repack_model.py"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', @@ -739,7 +739,7 @@ def test_register_model_with_model_repack_with_pipeline_model( "S3DataSource": { "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", - "S3Uri": f"s3://{BUCKET}", + "S3Uri": f"s3://{BUCKET}/model.tar.gz", } }, } @@ -927,7 +927,7 @@ def test_estimator_transformer_with_model_repack_with_estimator(estimator): "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", - "S3Uri": "s3://my-bucket", + "S3Uri": "s3://my-bucket/model.tar.gz", "S3DataDistributionType": "FullyReplicated", } }, @@ -936,7 +936,7 @@ def test_estimator_transformer_with_model_repack_with_estimator(estimator): ], "HyperParameters": { "inference_script": '"dummy_script.py"', - "model_archive": '"model.tar.gz"', + "model_archive": '"s3://my-bucket/model.tar.gz"', "dependencies": "null", "source_dir": "null", "sagemaker_program": '"_repack_model.py"', diff --git a/tests/unit/sagemaker/workflow/test_steps.py b/tests/unit/sagemaker/workflow/test_steps.py index 64da232c9e..0b5265f0a8 100644 --- a/tests/unit/sagemaker/workflow/test_steps.py +++ b/tests/unit/sagemaker/workflow/test_steps.py @@ -45,9 +45,10 @@ ) from sagemaker.network import NetworkConfig from sagemaker.transformer import Transformer +from sagemaker.workflow.functions import Join from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.properties import Properties, PropertyFile -from sagemaker.workflow.parameters import ParameterString, ParameterInteger +from sagemaker.workflow.parameters import ParameterString, ParameterInteger, ParameterBoolean from sagemaker.workflow.retry import ( StepRetryPolicy, StepExceptionTypeEnum, @@ -295,6 +296,8 @@ def test_training_step_base_estimator(sagemaker_session): ) training_epochs_parameter = ParameterInteger(name="TrainingEpochs", default_value=5) training_batch_size_parameter = ParameterInteger(name="TrainingBatchSize", default_value=500) + use_spot_instances = ParameterBoolean(name="UseSpotInstances", default_value=False) + output_path = Join(on="/", values=["s3:/", "a", "b"]) estimator = Estimator( image_uri=IMAGE_URI, role=ROLE, @@ -307,6 +310,8 @@ def test_training_step_base_estimator(sagemaker_session): }, rules=[], sagemaker_session=sagemaker_session, + output_path=output_path, + use_spot_instances=use_spot_instances, ) inputs = TrainingInput(s3_data=data_source_uri_parameter) cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") @@ -328,10 +333,12 @@ def test_training_step_base_estimator(sagemaker_session): data_source_uri_parameter, training_epochs_parameter, training_batch_size_parameter, + use_spot_instances, ], steps=[step], sagemaker_session=sagemaker_session, ) + assert json.loads(pipeline.definition())["Steps"][0] == { "Name": "MyTrainingStep", "Type": "Training", @@ -340,6 +347,7 @@ def test_training_step_base_estimator(sagemaker_session): "DependsOn": ["TestStep", "AnotherTestStep"], "Arguments": { "AlgorithmSpecification": {"TrainingImage": IMAGE_URI, "TrainingInputMode": "File"}, + "EnableManagedSpotTraining": {"Get": "Parameters.UseSpotInstances"}, "HyperParameters": { "batch-size": { "Std:Join": { @@ -366,7 +374,9 @@ def test_training_step_base_estimator(sagemaker_session): }, } ], - "OutputDataConfig": {"S3OutputPath": f"s3://{BUCKET}/"}, + "OutputDataConfig": { + "S3OutputPath": {"Std:Join": {"On": "/", "Values": ["s3:/", "a", "b"]}} + }, "ResourceConfig": { "InstanceCount": {"Get": "Parameters.InstanceCount"}, "InstanceType": {"Get": "Parameters.InstanceType"}, @@ -376,7 +386,7 @@ def test_training_step_base_estimator(sagemaker_session): "StoppingCondition": {"MaxRuntimeInSeconds": 86400}, "ProfilerConfig": { "ProfilingIntervalInMilliseconds": 500, - "S3OutputPath": f"s3://{BUCKET}/", + "S3OutputPath": {"Std:Join": {"On": "/", "Values": ["s3:/", "a", "b"]}}, }, }, "CacheConfig": {"Enabled": True, "ExpireAfter": "PT1H"}, @@ -681,6 +691,44 @@ def test_processing_step_normalizes_args_with_local_code(mock_normalize_args, sc ) +def test_processing_step_normalizes_args_with_param_str_local_code( + sagemaker_session, script_processor +): + cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") + code_param = ParameterString(name="Script", default_value="S3://my-bucket/file_name.py") + inputs = [ + ProcessingInput( + source=f"s3://{BUCKET}/processing_manifest", + destination="processing_manifest", + ) + ] + outputs = [ + ProcessingOutput( + source=f"s3://{BUCKET}/processing_manifest", + destination="processing_manifest", + ) + ] + step = ProcessingStep( + name="MyProcessingStep", + processor=script_processor, + code=code_param, + inputs=inputs, + outputs=outputs, + job_arguments=["arg1", "arg2"], + cache_config=cache_config, + ) + pipeline = Pipeline( + name="MyPipeline", + parameters=[code_param], + steps=[step], + sagemaker_session=sagemaker_session, + ) + with pytest.raises(ValueError) as error: + pipeline.definition() + + assert "has to be a valid S3 URI or local file path" in str(error.value) + + @patch("sagemaker.processing.ScriptProcessor._normalize_args") def test_processing_step_normalizes_args_with_s3_code(mock_normalize_args, script_processor): cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") @@ -971,6 +1019,7 @@ def test_single_algo_tuning_step(sagemaker_session): data_source_uri_parameter = ParameterString( name="DataSourceS3Uri", default_value=f"s3://{BUCKET}/train_manifest" ) + use_spot_instances = ParameterBoolean(name="UseSpotInstances", default_value=False) estimator = Estimator( image_uri=IMAGE_URI, role=ROLE, @@ -979,6 +1028,7 @@ def test_single_algo_tuning_step(sagemaker_session): profiler_config=ProfilerConfig(system_monitor_interval_millis=500), rules=[], sagemaker_session=sagemaker_session, + use_spot_instances=use_spot_instances, ) estimator.set_hyperparameters( num_layers=18, @@ -1022,7 +1072,15 @@ def test_single_algo_tuning_step(sagemaker_session): inputs=inputs, ) - assert tuning_step.to_request() == { + pipeline = Pipeline( + name="MyPipeline", + parameters=[data_source_uri_parameter, use_spot_instances], + steps=[tuning_step], + sagemaker_session=sagemaker_session, + ) + + step_dsl_list = json.loads(pipeline.definition())["Steps"] + assert step_dsl_list[0] == { "Name": "MyTuningStep", "Type": "Tuning", "Arguments": { @@ -1084,12 +1142,13 @@ def test_single_algo_tuning_step(sagemaker_session): "TrainingInputMode": "File", "TrainingImage": "fakeimage", }, + "EnableManagedSpotTraining": {"Get": "Parameters.UseSpotInstances"}, "InputDataConfig": [ { "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", - "S3Uri": data_source_uri_parameter, + "S3Uri": {"Get": "Parameters.DataSourceS3Uri"}, "S3DataDistributionType": "FullyReplicated", } }, diff --git a/tests/unit/sagemaker/workflow/test_utils.py b/tests/unit/sagemaker/workflow/test_utils.py index e534aa531e..4ab9066af1 100644 --- a/tests/unit/sagemaker/workflow/test_utils.py +++ b/tests/unit/sagemaker/workflow/test_utils.py @@ -117,7 +117,7 @@ def test_repack_model_step(estimator): hyperparameters = request_dict["Arguments"]["HyperParameters"] assert hyperparameters["inference_script"] == '"dummy_script.py"' - assert hyperparameters["model_archive"] == '"model.tar.gz"' + assert hyperparameters["model_archive"] == '"s3://my-bucket/model.tar.gz"' assert hyperparameters["sagemaker_program"] == '"_repack_model.py"' del request_dict["Arguments"]["HyperParameters"] @@ -136,7 +136,7 @@ def test_repack_model_step(estimator): "S3DataSource": { "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", - "S3Uri": f"s3://{BUCKET}", + "S3Uri": f"s3://{BUCKET}/model.tar.gz", } }, } @@ -172,7 +172,9 @@ def test_repack_model_step_with_source_dir(estimator, source_dir): hyperparameters = request_dict["Arguments"]["HyperParameters"] assert hyperparameters["inference_script"] == '"inference.py"' - assert hyperparameters["model_archive"] == '"model.tar.gz"' + assert hyperparameters["model_archive"].expr == { + "Std:Join": {"On": "", "Values": [{"Get": "Steps.MyStep"}]} + } assert hyperparameters["sagemaker_program"] == '"_repack_model.py"' del request_dict["Arguments"]["HyperParameters"]