diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index c6be941b7f..f655645ffd 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -1879,9 +1879,7 @@ def _add_spot_checkpoint_args(cls, local_mode, estimator, train_args): if estimator.use_spot_instances: if local_mode: raise ValueError("Spot training is not supported in local mode.") - # estimator.use_spot_instances may be a Pipeline ParameterBoolean object - # which is parsed during the Pipeline execution runtime - train_args["use_spot_instances"] = estimator.use_spot_instances + train_args["use_spot_instances"] = True if estimator.checkpoint_s3_uri: if local_mode: diff --git a/src/sagemaker/model.py b/src/sagemaker/model.py index 104b156646..317e865ddd 100644 --- a/src/sagemaker/model.py +++ b/src/sagemaker/model.py @@ -37,7 +37,6 @@ from sagemaker.utils import unique_name_from_base from sagemaker.async_inference import AsyncInferenceConfig from sagemaker.predictor_async import AsyncPredictor -from sagemaker.workflow.entities import PipelineVariable LOGGER = logging.getLogger("sagemaker") @@ -444,7 +443,7 @@ def _upload_code(self, key_prefix: str, repack: bool = False) -> None: ) if repack and self.model_data is not None and self.entry_point is not None: - if isinstance(self.model_data, PipelineVariable): + if isinstance(self.model_data, sagemaker.workflow.properties.Properties): # model is not yet there, defer repacking to later during pipeline execution return diff --git a/src/sagemaker/processing.py b/src/sagemaker/processing.py index 25fe2f4376..b9136c6c0e 100644 --- a/src/sagemaker/processing.py +++ b/src/sagemaker/processing.py @@ -36,7 +36,7 @@ from sagemaker.session import Session from sagemaker.workflow.properties import Properties from sagemaker.workflow.parameters import Parameter -from sagemaker.workflow.entities import Expression, PipelineVariable +from sagemaker.workflow.entities import Expression from sagemaker.dataset_definition.inputs import S3Input, DatasetDefinition from sagemaker.apiutils._base_types import ApiObject from sagemaker.s3 import S3Uploader @@ -233,12 +233,6 @@ def _normalize_args( kms_key (str): The ARN of the KMS key that is used to encrypt the user code file (default: None). """ - if code and isinstance(code, PipelineVariable): - raise ValueError( - "code argument has to be a valid S3 URI or local file path " - + "rather than a pipeline variable" - ) - self._current_job_name = self._generate_current_job_name(job_name=job_name) inputs_with_code = self._include_code_in_inputs(inputs, code, kms_key) diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 4cf48a498b..29d49f56bc 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -763,8 +763,6 @@ def _get_train_request( # noqa: C901 train_request["EnableInterContainerTrafficEncryption"] = encrypt_inter_container_traffic if use_spot_instances: - # estimator.use_spot_instances may be a Pipeline ParameterBoolean object - # which is parsed during the Pipeline execution runtime train_request["EnableManagedSpotTraining"] = use_spot_instances if checkpoint_s3_uri: @@ -2342,17 +2340,13 @@ def _map_training_config( training_job_definition["VpcConfig"] = vpc_config if enable_network_isolation: - training_job_definition["EnableNetworkIsolation"] = enable_network_isolation + training_job_definition["EnableNetworkIsolation"] = True if encrypt_inter_container_traffic: - training_job_definition[ - "EnableInterContainerTrafficEncryption" - ] = encrypt_inter_container_traffic + training_job_definition["EnableInterContainerTrafficEncryption"] = True if use_spot_instances: - # use_spot_instances may be a Pipeline ParameterBoolean object - # which is parsed during the Pipeline execution runtime - training_job_definition["EnableManagedSpotTraining"] = use_spot_instances + training_job_definition["EnableManagedSpotTraining"] = True if checkpoint_s3_uri: checkpoint_config = {"S3Uri": checkpoint_s3_uri} diff --git a/src/sagemaker/tensorflow/model.py b/src/sagemaker/tensorflow/model.py index 97a6f7ed00..0b7c369f48 100644 --- a/src/sagemaker/tensorflow/model.py +++ b/src/sagemaker/tensorflow/model.py @@ -21,7 +21,6 @@ from sagemaker.deprecations import removed_kwargs from sagemaker.predictor import Predictor from sagemaker.serializers import JSONSerializer -from sagemaker.workflow.entities import PipelineVariable class TensorFlowPredictor(Predictor): @@ -331,9 +330,7 @@ def prepare_container_def(self, instance_type=None, accelerator_type=None): image_uri = self._get_image_uri(instance_type, accelerator_type) env = self._get_container_env() - # If self.model_data is pipeline variable, model is not yet there. - # So defer repacking to later during pipeline execution - if self.entry_point and not isinstance(self.model_data, PipelineVariable): + if self.entry_point: key_prefix = sagemaker.fw_utils.model_code_key_prefix( self.key_prefix, self.name, image_uri ) diff --git a/src/sagemaker/tuner.py b/src/sagemaker/tuner.py index d9762ae750..85cf811d64 100644 --- a/src/sagemaker/tuner.py +++ b/src/sagemaker/tuner.py @@ -39,6 +39,9 @@ ParameterRange, ) from sagemaker.workflow.entities import PipelineVariable +from sagemaker.workflow.parameters import Parameter as PipelineParameter +from sagemaker.workflow.functions import JsonGet as PipelineJsonGet +from sagemaker.workflow.functions import Join as PipelineJoin from sagemaker.session import Session from sagemaker.utils import base_from_name, base_name_from_image, name_from_base @@ -61,6 +64,18 @@ logger = logging.getLogger(__name__) +def is_pipeline_parameters(value): + """Determine if a value is a pipeline parameter or function representation + + Args: + value (float or int): The value to be verified. + + Returns: + bool: True if it is, False otherwise. + """ + return isinstance(value, (PipelineParameter, PipelineJsonGet, PipelineJoin)) + + class WarmStartTypes(Enum): """Warm Start Configuration type. diff --git a/src/sagemaker/workflow/_repack_model.py b/src/sagemaker/workflow/_repack_model.py index 3cfa6760b3..f98f170f39 100644 --- a/src/sagemaker/workflow/_repack_model.py +++ b/src/sagemaker/workflow/_repack_model.py @@ -39,14 +39,14 @@ def repack(inference_script, model_archive, dependencies=None, source_dir=None): Args: inference_script (str): The path to the custom entry point. - model_archive (str): The name or path (e.g. s3 uri) of the model TAR archive. + model_archive (str): The name of the model TAR archive. dependencies (str): A space-delimited string of paths to custom dependencies. source_dir (str): The path to a custom source directory. """ # the data directory contains a model archive generated by a previous training job data_directory = "/opt/ml/input/data/training" - model_path = os.path.join(data_directory, model_archive.split("/")[-1]) + model_path = os.path.join(data_directory, model_archive) # create a temporary directory with tempfile.TemporaryDirectory() as tmp: diff --git a/src/sagemaker/workflow/_utils.py b/src/sagemaker/workflow/_utils.py index c8dbfc610d..ad756d90f0 100644 --- a/src/sagemaker/workflow/_utils.py +++ b/src/sagemaker/workflow/_utils.py @@ -134,6 +134,12 @@ def __init__( self._model_data = model_data self.sagemaker_session = sagemaker_session self.role = role + if isinstance(model_data, Properties): + self._model_prefix = model_data + self._model_archive = "model.tar.gz" + else: + self._model_prefix = "/".join(self._model_data.split("/")[:-1]) + self._model_archive = self._model_data.split("/")[-1] self._entry_point = entry_point self._entry_point_basename = os.path.basename(self._entry_point) self._source_dir = source_dir @@ -155,7 +161,7 @@ def __init__( role=self.role, hyperparameters={ "inference_script": self._entry_point_basename, - "model_archive": self._model_data, + "model_archive": self._model_archive, "dependencies": dependencies_hyperparameter, "source_dir": self._source_dir, }, @@ -164,7 +170,7 @@ def __init__( **kwargs, ) repacker.disable_profiler = True - inputs = TrainingInput(self._model_data) + inputs = TrainingInput(self._model_prefix) # super! super(_RepackModelStep, self).__init__( diff --git a/src/sagemaker/workflow/airflow.py b/src/sagemaker/workflow/airflow.py index 7c78543702..739abc841a 100644 --- a/src/sagemaker/workflow/airflow.py +++ b/src/sagemaker/workflow/airflow.py @@ -184,9 +184,7 @@ def training_base_config(estimator, inputs=None, job_name=None, mini_batch_size= train_config["VpcConfig"] = job_config["vpc_config"] if estimator.use_spot_instances: - # estimator.use_spot_instances may be a Pipeline ParameterBoolean object - # which is parsed during the Pipeline execution runtime - train_config["EnableManagedSpotTraining"] = estimator.use_spot_instances + train_config["EnableManagedSpotTraining"] = True if estimator.hyperparameters() is not None: hyperparameters = {str(k): str(v) for (k, v) in estimator.hyperparameters().items()} diff --git a/src/sagemaker/workflow/clarify_check_step.py b/src/sagemaker/workflow/clarify_check_step.py index af4b7d7f60..b2ae30a7f6 100644 --- a/src/sagemaker/workflow/clarify_check_step.py +++ b/src/sagemaker/workflow/clarify_check_step.py @@ -37,8 +37,8 @@ from sagemaker.model_monitor.model_monitoring import _MODEL_MONITOR_S3_PATH from sagemaker.processing import ProcessingInput, ProcessingOutput, ProcessingJob from sagemaker.utils import name_from_base -from sagemaker.workflow import PipelineNonPrimitiveInputTypes -from sagemaker.workflow.entities import RequestType, PipelineVariable +from sagemaker.workflow import PipelineNonPrimitiveInputTypes, ExecutionVariable, Parameter +from sagemaker.workflow.entities import RequestType, Expression from sagemaker.workflow.properties import Properties from sagemaker.workflow.steps import Step, StepTypeEnum, CacheConfig from sagemaker.workflow.check_job_config import CheckJobConfig @@ -194,7 +194,8 @@ def __init__( ) if isinstance( - clarify_check_config.data_config.s3_analysis_config_output_path, PipelineVariable + clarify_check_config.data_config.s3_analysis_config_output_path, + (ExecutionVariable, Expression, Parameter, Properties), ): raise RuntimeError( "s3_analysis_config_output_path cannot be of type " @@ -202,7 +203,8 @@ def __init__( ) if not clarify_check_config.data_config.s3_analysis_config_output_path and isinstance( - clarify_check_config.data_config.s3_output_path, PipelineVariable + clarify_check_config.data_config.s3_output_path, + (ExecutionVariable, Expression, Parameter, Properties), ): raise RuntimeError( "`s3_output_path` cannot be of type ExecutionVariable/Expression/Parameter" diff --git a/src/sagemaker/workflow/conditions.py b/src/sagemaker/workflow/conditions.py index 2f9c923569..065cf01315 100644 --- a/src/sagemaker/workflow/conditions.py +++ b/src/sagemaker/workflow/conditions.py @@ -28,7 +28,6 @@ Expression, PrimitiveType, RequestType, - PipelineVariable, ) from sagemaker.workflow.execution_variables import ExecutionVariable from sagemaker.workflow.parameters import Parameter @@ -262,6 +261,6 @@ def primitive_or_expr( Returns: Either the expression of the value or the primitive value. """ - if isinstance(value, PipelineVariable): + if isinstance(value, (ExecutionVariable, Expression, Parameter, Properties)): return value.expr return value diff --git a/src/sagemaker/workflow/quality_check_step.py b/src/sagemaker/workflow/quality_check_step.py index 5c27e21b0c..b9a688cecd 100644 --- a/src/sagemaker/workflow/quality_check_step.py +++ b/src/sagemaker/workflow/quality_check_step.py @@ -22,9 +22,9 @@ from sagemaker import s3 from sagemaker.model_monitor import ModelMonitor from sagemaker.processing import ProcessingOutput, ProcessingJob, Processor, ProcessingInput -from sagemaker.workflow import PipelineNonPrimitiveInputTypes +from sagemaker.workflow import PipelineNonPrimitiveInputTypes, ExecutionVariable, Parameter -from sagemaker.workflow.entities import RequestType, PipelineVariable +from sagemaker.workflow.entities import RequestType, Expression from sagemaker.workflow.properties import ( Properties, ) @@ -279,7 +279,7 @@ def _generate_baseline_job_inputs(self): _CONTAINER_BASE_PATH, _CONTAINER_INPUT_PATH, _BASELINE_DATASET_INPUT_NAME ) ) - if isinstance(baseline_dataset, PipelineVariable): + if isinstance(baseline_dataset, (ExecutionVariable, Expression, Parameter, Properties)): baseline_dataset_input = ProcessingInput( source=self.quality_check_config.baseline_dataset, destination=baseline_dataset_des, diff --git a/tests/integ/sagemaker/workflow/test_model_registration.py b/tests/integ/sagemaker/workflow/test_model_registration.py index 1d31597b25..193bbb9755 100644 --- a/tests/integ/sagemaker/workflow/test_model_registration.py +++ b/tests/integ/sagemaker/workflow/test_model_registration.py @@ -20,7 +20,6 @@ from botocore.exceptions import WaiterError import tests -from sagemaker.tensorflow import TensorFlow, TensorFlowModel from tests.integ.retry import retries from sagemaker.drift_check_baselines import DriftCheckBaselines from sagemaker import ( @@ -746,101 +745,3 @@ def test_model_registration_with_model_repack( pipeline.delete() except Exception: pass - - -def test_model_registration_with_tensorflow_model_with_pipeline_model( - sagemaker_session, role, tf_full_version, tf_full_py_version, pipeline_name, region_name -): - base_dir = os.path.join(DATA_DIR, "tensorflow_mnist") - entry_point = os.path.join(base_dir, "mnist_v2.py") - input_path = sagemaker_session.upload_data( - path=os.path.join(base_dir, "data"), - key_prefix="integ-test-data/tf-scriptmode/mnist/training", - ) - inputs = TrainingInput(s3_data=input_path) - - instance_count = ParameterInteger(name="InstanceCount", default_value=1) - instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") - - tensorflow_estimator = TensorFlow( - entry_point=entry_point, - role=role, - instance_count=instance_count, - instance_type=instance_type, - framework_version=tf_full_version, - py_version=tf_full_py_version, - sagemaker_session=sagemaker_session, - ) - step_train = TrainingStep( - name="MyTrain", - estimator=tensorflow_estimator, - inputs=inputs, - ) - - model = TensorFlowModel( - entry_point=entry_point, - framework_version="2.4", - model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, - role=role, - sagemaker_session=sagemaker_session, - ) - - pipeline_model = PipelineModel( - name="MyModelPipeline", models=[model], role=role, sagemaker_session=sagemaker_session - ) - - step_register_model = RegisterModel( - name="MyRegisterModel", - model=pipeline_model, - model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, - content_types=["application/json"], - response_types=["application/json"], - inference_instances=["ml.t2.medium", "ml.m5.large"], - transform_instances=["ml.m5.large"], - model_package_group_name=f"{pipeline_name}TestModelPackageGroup", - ) - - pipeline = Pipeline( - name=pipeline_name, - parameters=[ - instance_count, - instance_type, - ], - steps=[step_train, step_register_model], - sagemaker_session=sagemaker_session, - ) - - try: - response = pipeline.create(role) - create_arn = response["PipelineArn"] - - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", - create_arn, - ) - - for _ in retries( - max_retry_count=5, - exception_message_prefix="Waiting for a successful execution of pipeline", - seconds_to_sleep=10, - ): - execution = pipeline.start(parameters={}) - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", - execution.arn, - ) - try: - execution.wait(delay=30, max_attempts=60) - except WaiterError: - pass - execution_steps = execution.list_steps() - - assert len(execution_steps) == 3 - for step in execution_steps: - assert step["StepStatus"] == "Succeeded" - break - finally: - try: - pipeline.delete() - except Exception: - pass diff --git a/tests/integ/sagemaker/workflow/test_training_steps.py b/tests/integ/sagemaker/workflow/test_training_steps.py index 248dacac44..0f1ba84a55 100644 --- a/tests/integ/sagemaker/workflow/test_training_steps.py +++ b/tests/integ/sagemaker/workflow/test_training_steps.py @@ -13,22 +13,19 @@ from __future__ import absolute_import import os -import re import uuid import logging import pytest from botocore.exceptions import WaiterError -from sagemaker import TrainingInput, get_execution_role, utils, image_uris +from sagemaker import TrainingInput, get_execution_role, utils from sagemaker.debugger import ( DebuggerHookConfig, Rule, rule_configs, ) -from sagemaker.estimator import Estimator from sagemaker.pytorch.estimator import PyTorch -from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger, ParameterString from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.steps import TrainingStep @@ -154,72 +151,3 @@ def test_training_job_with_debugger_and_profiler( pipeline.delete() except Exception: pass - - -def test_training_step_with_output_path_as_join( - sagemaker_session, role, tf_full_version, tf_full_py_version, pipeline_name, region_name -): - base_dir = os.path.join(DATA_DIR, "dummy_tensor") - input_path = sagemaker_session.upload_data( - path=base_dir, key_prefix="integ-test-data/estimator/training" - ) - inputs = TrainingInput(s3_data=input_path) - - instance_count = ParameterInteger(name="InstanceCount", default_value=1) - instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") - output_path = Join( - on="/", values=["s3:/", f"{sagemaker_session.default_bucket()}", f"{pipeline_name}Train"] - ) - - image_uri = image_uris.retrieve("factorization-machines", sagemaker_session.boto_region_name) - estimator = Estimator( - image_uri=image_uri, - role=role, - instance_count=instance_count, - instance_type=instance_type, - sagemaker_session=sagemaker_session, - output_path=output_path, - ) - estimator.set_hyperparameters( - num_factors=10, feature_dim=784, mini_batch_size=100, predictor_type="binary_classifier" - ) - step_train = TrainingStep( - name="MyTrain", - estimator=estimator, - inputs=inputs, - ) - - pipeline = Pipeline( - name=pipeline_name, - parameters=[instance_count, instance_type], - steps=[step_train], - sagemaker_session=sagemaker_session, - ) - - try: - response = pipeline.create(role) - create_arn = response["PipelineArn"] - - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", - create_arn, - ) - - execution = pipeline.start(parameters={}) - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", - execution.arn, - ) - try: - execution.wait(delay=30, max_attempts=60) - except WaiterError: - pass - execution_steps = execution.list_steps() - - assert len(execution_steps) == 1 - assert execution_steps[0]["StepName"] == "MyTrain" - finally: - try: - pipeline.delete() - except Exception: - pass diff --git a/tests/integ/sagemaker/workflow/test_tuning_steps.py b/tests/integ/sagemaker/workflow/test_tuning_steps.py index 2fe0161a61..347420c7e0 100644 --- a/tests/integ/sagemaker/workflow/test_tuning_steps.py +++ b/tests/integ/sagemaker/workflow/test_tuning_steps.py @@ -16,9 +16,7 @@ import re import pytest -from botocore.exceptions import WaiterError -from tests.integ.retry import retries from sagemaker import TrainingInput, Model, get_execution_role, utils from sagemaker.dataset_definition import DatasetDefinition, AthenaDatasetDefinition from sagemaker.inputs import CreateModelInput @@ -156,8 +154,6 @@ def test_tuning_single_algo( ), sagemaker_session=sagemaker_session, role=role, - entry_point=entry_point, - source_dir=base_dir, ) step_second_best_model = CreateModelStep( @@ -181,26 +177,11 @@ def test_tuning_single_algo( create_arn, ) - for _ in retries( - max_retry_count=5, - exception_message_prefix="Waiting for a successful execution of pipeline", - seconds_to_sleep=10, - ): - execution = pipeline.start(parameters={}) - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", - execution.arn, - ) - try: - execution.wait(delay=30, max_attempts=60) - except WaiterError: - pass - execution_steps = execution.list_steps() - - assert len(execution_steps) == 3 - for step in execution_steps: - assert step["StepStatus"] == "Succeeded" - break + execution = pipeline.start(parameters={}) + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", + execution.arn, + ) finally: try: pipeline.delete() diff --git a/tests/integ/sagemaker/workflow/test_workflow.py b/tests/integ/sagemaker/workflow/test_workflow.py index 09e8701d07..e0c2c3219d 100644 --- a/tests/integ/sagemaker/workflow/test_workflow.py +++ b/tests/integ/sagemaker/workflow/test_workflow.py @@ -23,10 +23,6 @@ from botocore.exceptions import WaiterError import pandas as pd -from tests.integ.retry import retries -from sagemaker.parameter import IntegerParameter -from sagemaker.pytorch import PyTorch -from sagemaker.tuner import HyperparameterTuner from tests.integ.timeout import timeout from sagemaker.session import Session @@ -75,7 +71,6 @@ TransformStep, TransformInput, PropertyFile, - TuningStep, ) from sagemaker.workflow.step_collections import RegisterModel from sagemaker.workflow.pipeline import Pipeline @@ -1013,111 +1008,3 @@ def test_create_and_update_with_parallelism_config( pipeline.delete() except Exception: pass - - -def test_model_registration_with_tuning_model( - sagemaker_session, - role, - cpu_instance_type, - pipeline_name, - region_name, -): - base_dir = os.path.join(DATA_DIR, "pytorch_mnist") - entry_point = os.path.join(base_dir, "mnist.py") - input_path = sagemaker_session.upload_data( - path=os.path.join(base_dir, "training"), - key_prefix="integ-test-data/pytorch_mnist/training", - ) - inputs = TrainingInput(s3_data=input_path) - - instance_count = ParameterInteger(name="InstanceCount", default_value=1) - instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") - - pytorch_estimator = PyTorch( - entry_point=entry_point, - role=role, - framework_version="1.5.0", - py_version="py3", - instance_count=instance_count, - instance_type=instance_type, - sagemaker_session=sagemaker_session, - enable_sagemaker_metrics=True, - max_retry_attempts=3, - ) - - min_batch_size = ParameterString(name="MinBatchSize", default_value="64") - max_batch_size = ParameterString(name="MaxBatchSize", default_value="128") - hyperparameter_ranges = { - "batch-size": IntegerParameter(min_batch_size, max_batch_size), - } - - tuner = HyperparameterTuner( - estimator=pytorch_estimator, - objective_metric_name="test:acc", - objective_type="Maximize", - hyperparameter_ranges=hyperparameter_ranges, - metric_definitions=[{"Name": "test:acc", "Regex": "Overall test accuracy: (.*?);"}], - max_jobs=2, - max_parallel_jobs=2, - ) - - step_tune = TuningStep( - name="my-tuning-step", - tuner=tuner, - inputs=inputs, - ) - - step_register_best = RegisterModel( - name="my-model-regis", - estimator=pytorch_estimator, - model_data=step_tune.get_top_model_s3_uri( - top_k=0, - s3_bucket=sagemaker_session.default_bucket(), - ), - content_types=["text/csv"], - response_types=["text/csv"], - inference_instances=["ml.t2.medium", "ml.m5.large"], - transform_instances=["ml.m5.large"], - entry_point=entry_point, - ) - - pipeline = Pipeline( - name=pipeline_name, - parameters=[instance_count, instance_type, min_batch_size, max_batch_size], - steps=[step_tune, step_register_best], - sagemaker_session=sagemaker_session, - ) - - try: - response = pipeline.create(role) - create_arn = response["PipelineArn"] - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", - create_arn, - ) - - for _ in retries( - max_retry_count=5, - exception_message_prefix="Waiting for a successful execution of pipeline", - seconds_to_sleep=10, - ): - execution = pipeline.start(parameters={}) - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", - execution.arn, - ) - try: - execution.wait(delay=30, max_attempts=60) - except WaiterError: - pass - execution_steps = execution.list_steps() - - assert len(execution_steps) == 3 - for step in execution_steps: - assert step["StepStatus"] == "Succeeded" - break - finally: - try: - pipeline.delete() - except Exception: - pass diff --git a/tests/unit/sagemaker/workflow/test_repack_model_script.py b/tests/unit/sagemaker/workflow/test_repack_model_script.py index 043f5d3963..69c9e7b740 100644 --- a/tests/unit/sagemaker/workflow/test_repack_model_script.py +++ b/tests/unit/sagemaker/workflow/test_repack_model_script.py @@ -10,6 +10,7 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. +# language governing permissions and limitations under the License. from __future__ import absolute_import from sagemaker.workflow import _repack_model @@ -35,8 +36,7 @@ def test_repack_entry_point_only(tmp): open(fake_model_path, "w") # create model.tar.gz - model_tar_path = "s3://my-bucket/model-%s.tar.gz" % time.time() - model_tar_name = model_tar_path.split("/")[-1] + model_tar_name = "model-%s.tar.gz" % time.time() model_tar_location = os.path.join(tmp, model_tar_name) with tarfile.open(model_tar_location, mode="w:gz") as t: t.add(fake_model_path, arcname=model_name) @@ -54,7 +54,7 @@ def test_repack_entry_point_only(tmp): ) # repack - _repack_model.repack(inference_script="inference.py", model_archive=model_tar_path) + _repack_model.repack(inference_script="inference.py", model_archive=model_tar_name) # /opt/ml/model should now have the original model and the inference script assert os.path.exists(os.path.join("/opt/ml/model", model_name)) diff --git a/tests/unit/sagemaker/workflow/test_step_collections.py b/tests/unit/sagemaker/workflow/test_step_collections.py index 899c9ab7b2..ea810796f4 100644 --- a/tests/unit/sagemaker/workflow/test_step_collections.py +++ b/tests/unit/sagemaker/workflow/test_step_collections.py @@ -472,7 +472,7 @@ def test_register_model_with_model_repack_with_estimator( "HyperParameters": { "inference_script": '"dummy_script.py"', "dependencies": f'"{dummy_requirements}"', - "model_archive": '"s3://my-bucket/model.tar.gz"', + "model_archive": '"model.tar.gz"', "sagemaker_program": '"_repack_model.py"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', @@ -485,7 +485,7 @@ def test_register_model_with_model_repack_with_estimator( "S3DataSource": { "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", - "S3Uri": f"s3://{BUCKET}/model.tar.gz", + "S3Uri": f"s3://{BUCKET}", } }, } @@ -596,7 +596,7 @@ def test_register_model_with_model_repack_with_model(model, model_metrics, drift }, "HyperParameters": { "inference_script": '"dummy_script.py"', - "model_archive": '"s3://my-bucket/model.tar.gz"', + "model_archive": '"model.tar.gz"', "sagemaker_program": '"_repack_model.py"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', @@ -610,7 +610,7 @@ def test_register_model_with_model_repack_with_model(model, model_metrics, drift "S3DataSource": { "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", - "S3Uri": f"s3://{BUCKET}/model.tar.gz", + "S3Uri": f"s3://{BUCKET}", } }, } @@ -726,7 +726,7 @@ def test_register_model_with_model_repack_with_pipeline_model( "HyperParameters": { "dependencies": "null", "inference_script": '"dummy_script.py"', - "model_archive": '"s3://my-bucket/model.tar.gz"', + "model_archive": '"model.tar.gz"', "sagemaker_program": '"_repack_model.py"', "sagemaker_container_log_level": "20", "sagemaker_region": f'"{REGION}"', @@ -739,7 +739,7 @@ def test_register_model_with_model_repack_with_pipeline_model( "S3DataSource": { "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", - "S3Uri": f"s3://{BUCKET}/model.tar.gz", + "S3Uri": f"s3://{BUCKET}", } }, } @@ -927,7 +927,7 @@ def test_estimator_transformer_with_model_repack_with_estimator(estimator): "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", - "S3Uri": "s3://my-bucket/model.tar.gz", + "S3Uri": "s3://my-bucket", "S3DataDistributionType": "FullyReplicated", } }, @@ -936,7 +936,7 @@ def test_estimator_transformer_with_model_repack_with_estimator(estimator): ], "HyperParameters": { "inference_script": '"dummy_script.py"', - "model_archive": '"s3://my-bucket/model.tar.gz"', + "model_archive": '"model.tar.gz"', "dependencies": "null", "source_dir": "null", "sagemaker_program": '"_repack_model.py"', diff --git a/tests/unit/sagemaker/workflow/test_steps.py b/tests/unit/sagemaker/workflow/test_steps.py index 0b5265f0a8..64da232c9e 100644 --- a/tests/unit/sagemaker/workflow/test_steps.py +++ b/tests/unit/sagemaker/workflow/test_steps.py @@ -45,10 +45,9 @@ ) from sagemaker.network import NetworkConfig from sagemaker.transformer import Transformer -from sagemaker.workflow.functions import Join from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.properties import Properties, PropertyFile -from sagemaker.workflow.parameters import ParameterString, ParameterInteger, ParameterBoolean +from sagemaker.workflow.parameters import ParameterString, ParameterInteger from sagemaker.workflow.retry import ( StepRetryPolicy, StepExceptionTypeEnum, @@ -296,8 +295,6 @@ def test_training_step_base_estimator(sagemaker_session): ) training_epochs_parameter = ParameterInteger(name="TrainingEpochs", default_value=5) training_batch_size_parameter = ParameterInteger(name="TrainingBatchSize", default_value=500) - use_spot_instances = ParameterBoolean(name="UseSpotInstances", default_value=False) - output_path = Join(on="/", values=["s3:/", "a", "b"]) estimator = Estimator( image_uri=IMAGE_URI, role=ROLE, @@ -310,8 +307,6 @@ def test_training_step_base_estimator(sagemaker_session): }, rules=[], sagemaker_session=sagemaker_session, - output_path=output_path, - use_spot_instances=use_spot_instances, ) inputs = TrainingInput(s3_data=data_source_uri_parameter) cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") @@ -333,12 +328,10 @@ def test_training_step_base_estimator(sagemaker_session): data_source_uri_parameter, training_epochs_parameter, training_batch_size_parameter, - use_spot_instances, ], steps=[step], sagemaker_session=sagemaker_session, ) - assert json.loads(pipeline.definition())["Steps"][0] == { "Name": "MyTrainingStep", "Type": "Training", @@ -347,7 +340,6 @@ def test_training_step_base_estimator(sagemaker_session): "DependsOn": ["TestStep", "AnotherTestStep"], "Arguments": { "AlgorithmSpecification": {"TrainingImage": IMAGE_URI, "TrainingInputMode": "File"}, - "EnableManagedSpotTraining": {"Get": "Parameters.UseSpotInstances"}, "HyperParameters": { "batch-size": { "Std:Join": { @@ -374,9 +366,7 @@ def test_training_step_base_estimator(sagemaker_session): }, } ], - "OutputDataConfig": { - "S3OutputPath": {"Std:Join": {"On": "/", "Values": ["s3:/", "a", "b"]}} - }, + "OutputDataConfig": {"S3OutputPath": f"s3://{BUCKET}/"}, "ResourceConfig": { "InstanceCount": {"Get": "Parameters.InstanceCount"}, "InstanceType": {"Get": "Parameters.InstanceType"}, @@ -386,7 +376,7 @@ def test_training_step_base_estimator(sagemaker_session): "StoppingCondition": {"MaxRuntimeInSeconds": 86400}, "ProfilerConfig": { "ProfilingIntervalInMilliseconds": 500, - "S3OutputPath": {"Std:Join": {"On": "/", "Values": ["s3:/", "a", "b"]}}, + "S3OutputPath": f"s3://{BUCKET}/", }, }, "CacheConfig": {"Enabled": True, "ExpireAfter": "PT1H"}, @@ -691,44 +681,6 @@ def test_processing_step_normalizes_args_with_local_code(mock_normalize_args, sc ) -def test_processing_step_normalizes_args_with_param_str_local_code( - sagemaker_session, script_processor -): - cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") - code_param = ParameterString(name="Script", default_value="S3://my-bucket/file_name.py") - inputs = [ - ProcessingInput( - source=f"s3://{BUCKET}/processing_manifest", - destination="processing_manifest", - ) - ] - outputs = [ - ProcessingOutput( - source=f"s3://{BUCKET}/processing_manifest", - destination="processing_manifest", - ) - ] - step = ProcessingStep( - name="MyProcessingStep", - processor=script_processor, - code=code_param, - inputs=inputs, - outputs=outputs, - job_arguments=["arg1", "arg2"], - cache_config=cache_config, - ) - pipeline = Pipeline( - name="MyPipeline", - parameters=[code_param], - steps=[step], - sagemaker_session=sagemaker_session, - ) - with pytest.raises(ValueError) as error: - pipeline.definition() - - assert "has to be a valid S3 URI or local file path" in str(error.value) - - @patch("sagemaker.processing.ScriptProcessor._normalize_args") def test_processing_step_normalizes_args_with_s3_code(mock_normalize_args, script_processor): cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") @@ -1019,7 +971,6 @@ def test_single_algo_tuning_step(sagemaker_session): data_source_uri_parameter = ParameterString( name="DataSourceS3Uri", default_value=f"s3://{BUCKET}/train_manifest" ) - use_spot_instances = ParameterBoolean(name="UseSpotInstances", default_value=False) estimator = Estimator( image_uri=IMAGE_URI, role=ROLE, @@ -1028,7 +979,6 @@ def test_single_algo_tuning_step(sagemaker_session): profiler_config=ProfilerConfig(system_monitor_interval_millis=500), rules=[], sagemaker_session=sagemaker_session, - use_spot_instances=use_spot_instances, ) estimator.set_hyperparameters( num_layers=18, @@ -1072,15 +1022,7 @@ def test_single_algo_tuning_step(sagemaker_session): inputs=inputs, ) - pipeline = Pipeline( - name="MyPipeline", - parameters=[data_source_uri_parameter, use_spot_instances], - steps=[tuning_step], - sagemaker_session=sagemaker_session, - ) - - step_dsl_list = json.loads(pipeline.definition())["Steps"] - assert step_dsl_list[0] == { + assert tuning_step.to_request() == { "Name": "MyTuningStep", "Type": "Tuning", "Arguments": { @@ -1142,13 +1084,12 @@ def test_single_algo_tuning_step(sagemaker_session): "TrainingInputMode": "File", "TrainingImage": "fakeimage", }, - "EnableManagedSpotTraining": {"Get": "Parameters.UseSpotInstances"}, "InputDataConfig": [ { "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", - "S3Uri": {"Get": "Parameters.DataSourceS3Uri"}, + "S3Uri": data_source_uri_parameter, "S3DataDistributionType": "FullyReplicated", } }, diff --git a/tests/unit/sagemaker/workflow/test_utils.py b/tests/unit/sagemaker/workflow/test_utils.py index 413afd7b58..db57bb8bb4 100644 --- a/tests/unit/sagemaker/workflow/test_utils.py +++ b/tests/unit/sagemaker/workflow/test_utils.py @@ -118,7 +118,7 @@ def test_repack_model_step(estimator): hyperparameters = request_dict["Arguments"]["HyperParameters"] assert hyperparameters["inference_script"] == '"dummy_script.py"' - assert hyperparameters["model_archive"] == '"s3://my-bucket/model.tar.gz"' + assert hyperparameters["model_archive"] == '"model.tar.gz"' assert hyperparameters["sagemaker_program"] == '"_repack_model.py"' del request_dict["Arguments"]["HyperParameters"] @@ -137,7 +137,7 @@ def test_repack_model_step(estimator): "S3DataSource": { "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", - "S3Uri": f"s3://{BUCKET}/model.tar.gz", + "S3Uri": f"s3://{BUCKET}", } }, } @@ -173,9 +173,7 @@ def test_repack_model_step_with_source_dir(estimator, source_dir): hyperparameters = request_dict["Arguments"]["HyperParameters"] assert hyperparameters["inference_script"] == '"inference.py"' - assert hyperparameters["model_archive"].expr == { - "Std:Join": {"On": "", "Values": [{"Get": "Steps.MyStep"}]} - } + assert hyperparameters["model_archive"] == '"model.tar.gz"' assert hyperparameters["sagemaker_program"] == '"_repack_model.py"' del request_dict["Arguments"]["HyperParameters"]