diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index 1758aad280..4d34226d44 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -2201,6 +2201,7 @@ def tune( # noqa: C901 checkpoint_local_path=None, random_seed=None, environment=None, + hpo_resource_config=None, ): """Create an Amazon SageMaker hyperparameter tuning job. @@ -2286,6 +2287,22 @@ def tune( # noqa: C901 produce more consistent configurations for the same tuning job. (default: ``None``). environment (dict[str, str]) : Environment variables to be set for use during training jobs (default: ``None``) + hpo_resource_config (dict): The configuration for the hyperparameter tuning resources, + including the compute instances and storage volumes, used for training jobs launched + by the tuning job, where you must specify either + instance_configs or instance_count + instance_type + volume_size: + * instance_count (int): Number of EC2 instances to use for training. + The key in resource_config is 'InstanceCount'. + * instance_type (str): Type of EC2 instance to use for training, for example, + 'ml.c4.xlarge'. The key in resource_config is 'InstanceType'. + * volume_size (int or PipelineVariable): The volume size in GB of the data to be + processed for hyperparameter optimisation + * instance_configs (List[InstanceConfig]): A list containing the configuration(s) + for one or more resources for processing hyperparameter jobs. These resources + include compute instances and storage volumes to use in model training jobs. + * volume_kms_key_id: The AWS Key Management Service (AWS KMS) key + that Amazon SageMaker uses to encrypt data on the storage + volume attached to the ML compute instance(s) that run the training job. """ tune_request = { @@ -2311,6 +2328,7 @@ def tune( # noqa: C901 input_config=input_config, output_config=output_config, resource_config=resource_config, + hpo_resource_config=hpo_resource_config, vpc_config=vpc_config, stop_condition=stop_condition, enable_network_isolation=enable_network_isolation, @@ -2545,9 +2563,10 @@ def _map_training_config( input_mode, role, output_config, - resource_config, stop_condition, input_config=None, + resource_config=None, + hpo_resource_config=None, metric_definitions=None, image_uri=None, algorithm_arn=None, @@ -2625,13 +2644,17 @@ def _map_training_config( TrainingJobDefinition as described in https://botocore.readthedocs.io/en/latest/reference/services/sagemaker.html#SageMaker.Client.create_hyper_parameter_tuning_job """ + if hpo_resource_config is not None: + resource_config_map = {"HyperParameterTuningResourceConfig": hpo_resource_config} + else: + resource_config_map = {"ResourceConfig": resource_config} training_job_definition = { "StaticHyperParameters": static_hyperparameters, "RoleArn": role, "OutputDataConfig": output_config, - "ResourceConfig": resource_config, "StoppingCondition": stop_condition, + **resource_config_map, } algorithm_spec = {"TrainingInputMode": input_mode} diff --git a/src/sagemaker/tuner.py b/src/sagemaker/tuner.py index cff35a6cf0..45fe995b6c 100644 --- a/src/sagemaker/tuner.py +++ b/src/sagemaker/tuner.py @@ -383,6 +383,83 @@ def to_input_req(self): } +class InstanceConfig: + """Instance configuration for training jobs started by hyperparameter tuning. + + Contains the configuration(s) for one or more resources for processing hyperparameter jobs. + These resources include compute instances and storage volumes to use in model training jobs + launched by hyperparameter tuning jobs. + """ + + def __init__( + self, + instance_count: Union[int, PipelineVariable] = None, + instance_type: Union[str, PipelineVariable] = None, + volume_size: Union[int, PipelineVariable] = 30, + ): + """Creates a ``InstanceConfig`` instance. + + It takes instance configuration information for training + jobs that are created as the result of a hyperparameter tuning job. + + Args: + * instance_count (str or PipelineVariable): The number of compute instances of type + InstanceType to use. For distributed training, select a value greater than 1. + * instance_type (str or PipelineVariable): + The instance type used to run hyperparameter optimization tuning jobs. + * volume_size (int or PipelineVariable): The volume size in GB of the data to be + processed for hyperparameter optimization + """ + self.instance_count = instance_count + self.instance_type = instance_type + self.volume_size = volume_size + + @classmethod + def from_job_desc(cls, instance_config): + """Creates a ``InstanceConfig`` from an instance configuration response. + + This is the instance configuration from the DescribeTuningJob response. + + Args: + instance_config (dict): The expected format of the + ``instance_config`` contains one first-class field + + Returns: + sagemaker.tuner.InstanceConfig: De-serialized instance of + InstanceConfig containing the strategy configuration. + """ + return cls( + instance_count=instance_config["InstanceCount"], + instance_type=instance_config[" InstanceType "], + volume_size=instance_config["VolumeSizeInGB"], + ) + + def to_input_req(self): + """Converts the ``self`` instance to the desired input request format. + + Examples: + >>> strategy_config = InstanceConfig( + instance_count=1, + instance_type='ml.m4.xlarge', + volume_size=30 + ) + >>> strategy_config.to_input_req() + { + "InstanceCount":1, + "InstanceType":"ml.m4.xlarge", + "VolumeSizeInGB":30 + } + + Returns: + dict: Containing the instance configurations. + """ + return { + "InstanceCount": self.instance_count, + "InstanceType": self.instance_type, + "VolumeSizeInGB": self.volume_size, + } + + class HyperparameterTuner(object): """Defines interaction with Amazon SageMaker hyperparameter tuning jobs. @@ -482,6 +559,7 @@ def __init__( self.estimator = None self.objective_metric_name = None self._hyperparameter_ranges = None + self.static_hyperparameters = None self.metric_definitions = None self.estimator_dict = {estimator_name: estimator} self.objective_metric_name_dict = {estimator_name: objective_metric_name} @@ -489,7 +567,6 @@ def __init__( self.metric_definitions_dict = ( {estimator_name: metric_definitions} if metric_definitions is not None else {} ) - self.static_hyperparameters = None else: self.estimator = estimator self.objective_metric_name = objective_metric_name @@ -521,6 +598,31 @@ def __init__( self.warm_start_config = warm_start_config self.early_stopping_type = early_stopping_type self.random_seed = random_seed + self.instance_configs_dict = None + self.instance_configs = None + + def override_resource_config( + self, instance_configs: Union[List[InstanceConfig], Dict[str, List[InstanceConfig]]] + ): + """Override the instance configuration of the estimators used by the tuner. + + Args: + instance_configs (List[InstanceConfig] or Dict[str, List[InstanceConfig]): + The InstanceConfigs to use as an override for the instance configuration + of the estimator. ``None`` will remove the override. + """ + if isinstance(instance_configs, dict): + self._validate_dict_argument( + name="instance_configs", + value=instance_configs, + allowed_keys=list(self.estimator_dict.keys()), + ) + self.instance_configs_dict = instance_configs + else: + self.instance_configs = instance_configs + if self.estimator_dict is not None and self.estimator_dict.keys(): + estimator_names = list(self.estimator_dict.keys()) + self.instance_configs_dict = {estimator_names[0]: instance_configs} def _prepare_for_tuning(self, job_name=None, include_cls_metadata=False): """Prepare the tuner instance for tuning (fit).""" @@ -589,7 +691,6 @@ def _prepare_job_name_for_tuning(self, job_name=None): def _prepare_static_hyperparameters_for_tuning(self, include_cls_metadata=False): """Prepare static hyperparameters for all estimators before tuning.""" - self.static_hyperparameters = None if self.estimator is not None: self.static_hyperparameters = self._prepare_static_hyperparameters( self.estimator, self._hyperparameter_ranges, include_cls_metadata @@ -1817,6 +1918,7 @@ def _get_tuner_args(cls, tuner, inputs): estimator=tuner.estimator, static_hyperparameters=tuner.static_hyperparameters, metric_definitions=tuner.metric_definitions, + instance_configs=tuner.instance_configs, ) if tuner.estimator_dict is not None: @@ -1830,12 +1932,44 @@ def _get_tuner_args(cls, tuner, inputs): tuner.objective_type, tuner.objective_metric_name_dict[estimator_name], tuner.hyperparameter_ranges_dict()[estimator_name], + tuner.instance_configs_dict.get(estimator_name, None) + if tuner.instance_configs_dict is not None + else None, ) for estimator_name in sorted(tuner.estimator_dict.keys()) ] return tuner_args + @staticmethod + def _prepare_hp_resource_config( + instance_configs: List[InstanceConfig], + instance_count: int, + instance_type: str, + volume_size: int, + volume_kms_key: str, + ): + """Placeholder hpo resource config for one estimator of the tuner.""" + resource_config = {} + if volume_kms_key is not None: + resource_config["VolumeKmsKeyId"] = volume_kms_key + + if instance_configs is None: + resource_config["InstanceCount"] = instance_count + resource_config["InstanceType"] = instance_type + resource_config["VolumeSizeInGB"] = volume_size + else: + resource_config["InstanceConfigs"] = _TuningJob._prepare_instance_configs( + instance_configs + ) + + return resource_config + + @staticmethod + def _prepare_instance_configs(instance_configs: List[InstanceConfig]): + """Prepare instance config for create tuning request.""" + return [config.to_input_req() for config in instance_configs] + @staticmethod def _prepare_training_config( inputs, @@ -1846,10 +1980,20 @@ def _prepare_training_config( objective_type=None, objective_metric_name=None, parameter_ranges=None, + instance_configs=None, ): """Prepare training config for one estimator.""" training_config = _Job._load_config(inputs, estimator) + del training_config["resource_config"] + training_config["hpo_resource_config"] = _TuningJob._prepare_hp_resource_config( + instance_configs, + estimator.instance_count, + estimator.instance_type, + estimator.volume_size, + estimator.volume_kms_key, + ) + training_config["input_mode"] = estimator.input_mode training_config["metric_definitions"] = metric_definitions diff --git a/tests/integ/test_tuner.py b/tests/integ/test_tuner.py index 55c1542f7d..041afb47c7 100644 --- a/tests/integ/test_tuner.py +++ b/tests/integ/test_tuner.py @@ -35,6 +35,7 @@ ContinuousParameter, CategoricalParameter, HyperparameterTuner, + InstanceConfig, WarmStartConfig, WarmStartTypes, create_transfer_learning_tuner, @@ -97,6 +98,7 @@ def _tune_and_deploy( job_name=None, warm_start_config=None, early_stopping_type="Off", + instance_configs=None, ): tuner = _tune( kmeans_estimator, @@ -105,6 +107,7 @@ def _tune_and_deploy( warm_start_config=warm_start_config, job_name=job_name, early_stopping_type=early_stopping_type, + instance_configs=instance_configs, ) _deploy(kmeans_train_set, sagemaker_session, tuner, early_stopping_type, cpu_instance_type) @@ -134,6 +137,7 @@ def _tune( max_jobs=2, max_parallel_jobs=2, early_stopping_type="Off", + instance_configs=None, ): with timeout(minutes=TUNING_DEFAULT_TIMEOUT_MINUTES): @@ -148,7 +152,7 @@ def _tune( warm_start_config=warm_start_config, early_stopping_type=early_stopping_type, ) - + tuner.override_resource_config(instance_configs=instance_configs) records = kmeans_estimator.record_set(kmeans_train_set[0][:100]) test_record_set = kmeans_estimator.record_set(kmeans_train_set[0][:100], channel="test") @@ -173,6 +177,25 @@ def test_tuning_kmeans( ) +@pytest.mark.release +def test_tuning_kmeans_with_instance_configs( + sagemaker_session, kmeans_train_set, kmeans_estimator, hyperparameter_ranges, cpu_instance_type +): + job_name = unique_name_from_base("tst-fit") + _tune_and_deploy( + kmeans_estimator, + kmeans_train_set, + sagemaker_session, + cpu_instance_type, + hyperparameter_ranges=hyperparameter_ranges, + job_name=job_name, + instance_configs=[ + InstanceConfig(instance_count=1, instance_type="ml.m4.2xlarge", volume_size=30), + InstanceConfig(instance_count=1, instance_type="ml.m4.xlarge", volume_size=30), + ], + ) + + def test_tuning_kmeans_identical_dataset_algorithm_tuner_raw( sagemaker_session, kmeans_train_set, kmeans_estimator, hyperparameter_ranges ): diff --git a/tests/unit/sagemaker/workflow/test_steps.py b/tests/unit/sagemaker/workflow/test_steps.py index f2046cc00f..7af8d04411 100644 --- a/tests/unit/sagemaker/workflow/test_steps.py +++ b/tests/unit/sagemaker/workflow/test_steps.py @@ -1133,7 +1133,7 @@ def test_single_algo_tuning_step(sagemaker_session): }, "RoleArn": "DummyRole", "OutputDataConfig": {"S3OutputPath": "s3://my-bucket/"}, - "ResourceConfig": { + "HyperParameterTuningResourceConfig": { "InstanceCount": 1, "InstanceType": "ml.c5.4xlarge", "VolumeSizeInGB": 30, @@ -1285,7 +1285,7 @@ def test_multi_algo_tuning_step(sagemaker_session): }, "RoleArn": "DummyRole", "OutputDataConfig": {"S3OutputPath": "s3://my-bucket/"}, - "ResourceConfig": { + "HyperParameterTuningResourceConfig": { "InstanceCount": {"Get": "Parameters.InstanceCount"}, "InstanceType": "ml.c5.4xlarge", "VolumeSizeInGB": 30, @@ -1352,7 +1352,7 @@ def test_multi_algo_tuning_step(sagemaker_session): }, "RoleArn": "DummyRole", "OutputDataConfig": {"S3OutputPath": "s3://my-bucket/"}, - "ResourceConfig": { + "HyperParameterTuningResourceConfig": { "InstanceCount": {"Get": "Parameters.InstanceCount"}, "InstanceType": "ml.c5.4xlarge", "VolumeSizeInGB": 30, diff --git a/tests/unit/test_tuner.py b/tests/unit/test_tuner.py index c7e489784b..a9c74deb8b 100644 --- a/tests/unit/test_tuner.py +++ b/tests/unit/test_tuner.py @@ -45,6 +45,7 @@ from sagemaker.workflow.functions import JsonGet, Join from sagemaker.workflow.parameters import ParameterString, ParameterInteger +from src.sagemaker.tuner import InstanceConfig from .tuner_test_utils import * # noqa: F403 @@ -120,6 +121,27 @@ def test_prepare_for_training(tuner): assert tuner.static_hyperparameters["hp2"] == hp2 +def test_prepare_for_tuning_with_instance_config_overrides(tuner, sagemaker_session): + tuner.estimator = PCA( + ROLE, + INSTANCE_COUNT, + INSTANCE_TYPE, + NUM_COMPONENTS, + sagemaker_session=sagemaker_session, + ) + + tuner._prepare_for_tuning() + tuner.override_resource_config( + instance_configs=[ + InstanceConfig(instance_count=1, instance_type="ml.m4.2xlarge"), + InstanceConfig(instance_count=1, instance_type="ml.m4.4xlarge"), + ] + ) + + assert "sagemaker_estimator_class_name" not in tuner.static_hyperparameters + assert "sagemaker_estimator_module" not in tuner.static_hyperparameters + + def test_prepare_for_tuning_with_amazon_estimator(tuner, sagemaker_session): tuner.estimator = PCA( ROLE, @@ -263,6 +285,66 @@ def test_fit_pca_with_early_stopping(sagemaker_session, tuner): assert tune_kwargs["tuning_config"]["early_stopping_type"] == "Auto" +def test_fit_pca_with_flexible_instance_types(sagemaker_session, tuner): + pca = PCA( + ROLE, + INSTANCE_COUNT, + INSTANCE_TYPE, + NUM_COMPONENTS, + base_job_name="pca", + sagemaker_session=sagemaker_session, + ) + + tuner.override_resource_config( + instance_configs=[ + InstanceConfig(instance_count=1, instance_type="ml.m4.2xlarge"), + InstanceConfig(instance_count=1, instance_type="ml.m4.4xlarge"), + ] + ) + + tuner.estimator = pca + + records = RecordSet(s3_data=INPUTS, num_records=1, feature_dim=1) + tuner.fit(records) + + _, _, tune_kwargs = sagemaker_session.create_tuning_job.mock_calls[0] + + assert "hpo_resource_config" in tune_kwargs["training_config"] + assert "InstanceConfigs" in tune_kwargs["training_config"]["hpo_resource_config"] + assert "InstanceCount" not in tune_kwargs["training_config"]["hpo_resource_config"] + + +def test_fit_pca_with_removed_flexible_instance_types(sagemaker_session, tuner): + pca = PCA( + ROLE, + INSTANCE_COUNT, + INSTANCE_TYPE, + NUM_COMPONENTS, + base_job_name="pca", + sagemaker_session=sagemaker_session, + ) + + tuner.override_resource_config( + instance_configs=[ + InstanceConfig(instance_count=1, instance_type="ml.m4.2xlarge"), + InstanceConfig(instance_count=1, instance_type="ml.m4.4xlarge"), + ] + ) + + tuner.override_resource_config(instance_configs=None) + + tuner.estimator = pca + + records = RecordSet(s3_data=INPUTS, num_records=1, feature_dim=1) + tuner.fit(records) + + _, _, tune_kwargs = sagemaker_session.create_tuning_job.mock_calls[0] + + assert "hpo_resource_config" in tune_kwargs["training_config"] + assert "InstanceConfigs" not in tune_kwargs["training_config"]["hpo_resource_config"] + assert "InstanceCount" in tune_kwargs["training_config"]["hpo_resource_config"] + + def test_fit_pca_with_vpc_config(sagemaker_session, tuner): subnets = ["foo"] security_group_ids = ["bar"] @@ -397,6 +479,44 @@ def test_fit_multi_estimators_invalid_inputs( ) +def test_multi_estimators_flexible_instance_types(sagemaker_session): + (tuner, estimator_one, estimator_two) = _create_multi_estimator_tuner(sagemaker_session) + + records = {ESTIMATOR_NAME_TWO: RecordSet(s3_data=INPUTS, num_records=1, feature_dim=1)} + + estimator_kwargs = {ESTIMATOR_NAME_TWO: {"mini_batch_size": 4000}} + + instance_configs1 = [ + InstanceConfig(instance_count=1, instance_type="ml.m4.2xlarge"), + InstanceConfig(instance_count=1, instance_type="ml.m4.4xlarge"), + ] + instance_configs2 = [ + InstanceConfig(instance_count=1, instance_type="ml.m4.xlarge"), + InstanceConfig(instance_count=1, instance_type="ml.m4.2xlarge"), + ] + + tuner.override_resource_config( + instance_configs={ + ESTIMATOR_NAME: instance_configs1, + ESTIMATOR_NAME_TWO: instance_configs2, + } + ) + + tuner.fit(inputs=records, include_cls_metadata={}, estimator_kwargs=estimator_kwargs) + + _, _, tune_kwargs = sagemaker_session.create_tuning_job.mock_calls[0] + + training_config_one = tune_kwargs["training_config_list"][0] + training_config_two = tune_kwargs["training_config_list"][1] + + assert "hpo_resource_config" in training_config_one + assert "InstanceConfigs" in training_config_one["hpo_resource_config"] + assert "InstanceCount" not in training_config_one["hpo_resource_config"] + assert "hpo_resource_config" in training_config_two + assert "InstanceConfigs" in training_config_two["hpo_resource_config"] + assert "InstanceCount" not in training_config_two["hpo_resource_config"] + + def test_fit_multi_estimators(sagemaker_session): (tuner, estimator_one, estimator_two) = _create_multi_estimator_tuner(sagemaker_session)