From 420aa4e3c3daa6bab9822f3aeed0606b9e32c01a Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Fri, 2 Feb 2024 13:04:09 -0800 Subject: [PATCH 1/9] Add distribution as input for RecordSet --- src/sagemaker/amazon/amazon_estimator.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index b4fc9d1f6d..4c6208b43e 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -343,6 +343,7 @@ def __init__( feature_dim: int, s3_data_type: Union[str, PipelineVariable] = "ManifestFile", channel: Union[str, PipelineVariable] = "train", + distribution: str = "ShardedByS3Key", ): """A collection of Amazon :class:~`Record` objects serialized and stored in S3. @@ -358,12 +359,15 @@ def __init__( single s3 manifest file, listing each s3 object to train on. channel (str or PipelineVariable): The SageMaker Training Job channel this RecordSet should be bound to + distribution (str): S3 data distribution type. + Valid values: 'ShardedByS3Key', 'FullyReplicated'. """ self.s3_data = s3_data self.feature_dim = feature_dim self.num_records = num_records self.s3_data_type = s3_data_type self.channel = channel + self.distribution = distribution def __repr__(self): """Return an unambiguous representation of this RecordSet""" @@ -377,7 +381,7 @@ def data_channel(self): def records_s3_input(self): """Return a TrainingInput to represent the training data""" return TrainingInput( - self.s3_data, distribution="ShardedByS3Key", s3_data_type=self.s3_data_type + self.s3_data, distribution=self.distribution, s3_data_type=self.s3_data_type ) From 7ea6d30265d67f00750361bcb3a3a6502352d52d Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Fri, 2 Feb 2024 13:24:49 -0800 Subject: [PATCH 2/9] bug fix --- src/sagemaker/amazon/amazon_estimator.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index 4c6208b43e..d8c3a539af 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -269,7 +269,7 @@ def fit( if wait: self.latest_training_job.wait(logs=logs) - def record_set(self, train, labels=None, channel="train", encrypt=False): + def record_set(self, train, labels=None, channel="train", encrypt=False, distribution=None): """Build a :class:`~RecordSet` from a numpy :class:`~ndarray` matrix and label vector. For the 2D ``ndarray`` ``train``, each row is converted to a @@ -294,6 +294,8 @@ def record_set(self, train, labels=None, channel="train", encrypt=False): should be assigned to. encrypt (bool): Specifies whether the objects uploaded to S3 are encrypted on the server side using AES-256 (default: ``False``). + distribution (str): The SageMaker TrainingJob channel s3 data + distribution type. Returns: RecordSet: A RecordSet referencing the encoded, uploading training @@ -316,6 +318,7 @@ def record_set(self, train, labels=None, channel="train", encrypt=False): num_records=train.shape[0], feature_dim=train.shape[1], channel=channel, + distribution=distribution, ) def _get_default_mini_batch_size(self, num_records: int): From cb76cbc7610c5f4546451c3d5a2cb74d7d66fc7f Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Fri, 2 Feb 2024 13:27:28 -0800 Subject: [PATCH 3/9] bug fix --- src/sagemaker/amazon/amazon_estimator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index d8c3a539af..c435ca4ad5 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -295,7 +295,7 @@ def record_set(self, train, labels=None, channel="train", encrypt=False, distrib encrypt (bool): Specifies whether the objects uploaded to S3 are encrypted on the server side using AES-256 (default: ``False``). distribution (str): The SageMaker TrainingJob channel s3 data - distribution type. + distribution type (default: ``False``). Returns: RecordSet: A RecordSet referencing the encoded, uploading training From 67bc5457b3d1ffcddc8f2f0c9ebbca741460694c Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Fri, 2 Feb 2024 14:38:32 -0800 Subject: [PATCH 4/9] bug fix --- src/sagemaker/amazon/amazon_estimator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index c435ca4ad5..f7a3c6ca5e 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -269,7 +269,7 @@ def fit( if wait: self.latest_training_job.wait(logs=logs) - def record_set(self, train, labels=None, channel="train", encrypt=False, distribution=None): + def record_set(self, train, labels=None, channel="train", encrypt=False, distribution="ShardedByS3Key"): """Build a :class:`~RecordSet` from a numpy :class:`~ndarray` matrix and label vector. For the 2D ``ndarray`` ``train``, each row is converted to a @@ -295,7 +295,7 @@ def record_set(self, train, labels=None, channel="train", encrypt=False, distrib encrypt (bool): Specifies whether the objects uploaded to S3 are encrypted on the server side using AES-256 (default: ``False``). distribution (str): The SageMaker TrainingJob channel s3 data - distribution type (default: ``False``). + distribution type (default: ``None``). Returns: RecordSet: A RecordSet referencing the encoded, uploading training From 8a1ca20e59984a1c1a4997caf2e077f4ff7523fa Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Fri, 2 Feb 2024 17:29:26 -0800 Subject: [PATCH 5/9] Add desc --- src/sagemaker/amazon/amazon_estimator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index f7a3c6ca5e..428e70168f 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -295,7 +295,7 @@ def record_set(self, train, labels=None, channel="train", encrypt=False, distrib encrypt (bool): Specifies whether the objects uploaded to S3 are encrypted on the server side using AES-256 (default: ``False``). distribution (str): The SageMaker TrainingJob channel s3 data - distribution type (default: ``None``). + distribution type (default: ``ShardedByS3Key``). Returns: RecordSet: A RecordSet referencing the encoded, uploading training @@ -362,7 +362,7 @@ def __init__( single s3 manifest file, listing each s3 object to train on. channel (str or PipelineVariable): The SageMaker Training Job channel this RecordSet should be bound to - distribution (str): S3 data distribution type. + distribution (str): The SageMaker TrainingJob S3 data distribution type. Valid values: 'ShardedByS3Key', 'FullyReplicated'. """ self.s3_data = s3_data From 175e62dd4e7076b2282e67f975bf7fdd8451f9a0 Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Thu, 8 Feb 2024 17:37:12 -0800 Subject: [PATCH 6/9] fix line too long --- src/sagemaker/amazon/amazon_estimator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index 428e70168f..20380a008c 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -269,7 +269,8 @@ def fit( if wait: self.latest_training_job.wait(logs=logs) - def record_set(self, train, labels=None, channel="train", encrypt=False, distribution="ShardedByS3Key"): + def record_set(self, train, labels=None, channel="train", encrypt=False, + distribution="ShardedByS3Key"): """Build a :class:`~RecordSet` from a numpy :class:`~ndarray` matrix and label vector. For the 2D ``ndarray`` ``train``, each row is converted to a From 8bf64f1724e05d7ebf3fd1eae2c68e88d304bacc Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Thu, 8 Feb 2024 18:33:57 -0800 Subject: [PATCH 7/9] fix format --- src/sagemaker/amazon/amazon_estimator.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index 20380a008c..6e3bce20dc 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -269,7 +269,11 @@ def fit( if wait: self.latest_training_job.wait(logs=logs) - def record_set(self, train, labels=None, channel="train", encrypt=False, + def record_set(self, + train, + labels=None, + channel="train", + encrypt=False, distribution="ShardedByS3Key"): """Build a :class:`~RecordSet` from a numpy :class:`~ndarray` matrix and label vector. From deb96647804e966ed6c33d5d132ab90e10fd794e Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Fri, 16 Feb 2024 14:52:22 -0800 Subject: [PATCH 8/9] reformat --- src/sagemaker/amazon/amazon_estimator.py | 51 +++++++++++++++++------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index 6e3bce20dc..a62e72bd49 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -133,14 +133,18 @@ def data_location(self, data_location: str): if not data_location.startswith("s3://"): raise ValueError( - 'Expecting an S3 URL beginning with "s3://". Got "{}"'.format(data_location) + 'Expecting an S3 URL beginning with "s3://". Got "{}"'.format( + data_location + ) ) if data_location[-1] != "/": data_location = data_location + "/" self._data_location = data_location @classmethod - def _prepare_init_params_from_job_description(cls, job_details, model_channel_name=None): + def _prepare_init_params_from_job_description( + cls, job_details, model_channel_name=None + ): """Convert the job description to init params that can be handled by the class constructor. Args: @@ -168,7 +172,9 @@ def _prepare_init_params_from_job_description(cls, job_details, model_channel_na del init_params["image_uri"] return init_params - def prepare_workflow_for_training(self, records=None, mini_batch_size=None, job_name=None): + def prepare_workflow_for_training( + self, records=None, mini_batch_size=None, job_name=None + ): """Calls _prepare_for_training. Used when setting up a workflow. Args: @@ -194,7 +200,9 @@ def _prepare_for_training(self, records, mini_batch_size=None, job_name=None): specified, one is generated, using the base name given to the constructor if applicable. """ - super(AmazonAlgorithmEstimatorBase, self)._prepare_for_training(job_name=job_name) + super(AmazonAlgorithmEstimatorBase, self)._prepare_for_training( + job_name=job_name + ) feature_dim = None @@ -260,7 +268,9 @@ def fit( will be unassociated. * `TrialComponentDisplayName` is used for display in Studio. """ - self._prepare_for_training(records, job_name=job_name, mini_batch_size=mini_batch_size) + self._prepare_for_training( + records, job_name=job_name, mini_batch_size=mini_batch_size + ) experiment_config = check_and_get_run_experiment_config(experiment_config) self.latest_training_job = _TrainingJob.start_new( @@ -269,12 +279,14 @@ def fit( if wait: self.latest_training_job.wait(logs=logs) - def record_set(self, - train, - labels=None, - channel="train", - encrypt=False, - distribution="ShardedByS3Key"): + def record_set( + self, + train, + labels=None, + channel="train", + encrypt=False, + distribution="ShardedByS3Key", + ): """Build a :class:`~RecordSet` from a numpy :class:`~ndarray` matrix and label vector. For the 2D ``ndarray`` ``train``, each row is converted to a @@ -311,7 +323,9 @@ def record_set(self, ) parsed_s3_url = urlparse(self.data_location) bucket, key_prefix = parsed_s3_url.netloc, parsed_s3_url.path - key_prefix = key_prefix + "{}-{}/".format(type(self).__name__, sagemaker_timestamp()) + key_prefix = key_prefix + "{}-{}/".format( + type(self).__name__, sagemaker_timestamp() + ) key_prefix = key_prefix.lstrip("/") logger.debug("Uploading to bucket %s and key_prefix %s", bucket, key_prefix) manifest_s3_file = upload_numpy_to_s3_shards( @@ -338,7 +352,9 @@ def _get_default_mini_batch_size(self, num_records: int): ) return 1 - return min(self.DEFAULT_MINI_BATCH_SIZE, max(1, int(num_records / self.instance_count))) + return min( + self.DEFAULT_MINI_BATCH_SIZE, max(1, int(num_records / self.instance_count)) + ) class RecordSet(object): @@ -447,7 +463,10 @@ def _build_shards(num_shards, array): shard_size = int(array.shape[0] / num_shards) if shard_size == 0: raise ValueError("Array length is less than num shards") - shards = [array[i * shard_size : i * shard_size + shard_size] for i in range(num_shards - 1)] + shards = [ + array[i * shard_size : i * shard_size + shard_size] + for i in range(num_shards - 1) + ] shards.append(array[(num_shards - 1) * shard_size :]) return shards @@ -494,7 +513,9 @@ def upload_numpy_to_s3_shards( manifest_str = json.dumps( [{"prefix": "s3://{}/{}".format(bucket, key_prefix)}] + uploaded_files ) - s3.Object(bucket, manifest_key).put(Body=manifest_str.encode("utf-8"), **extra_put_kwargs) + s3.Object(bucket, manifest_key).put( + Body=manifest_str.encode("utf-8"), **extra_put_kwargs + ) return "s3://{}/{}".format(bucket, manifest_key) except Exception as ex: # pylint: disable=broad-except try: From 227394e02f4505a83fef1c912086aa560c103844 Mon Sep 17 00:00:00 2001 From: Sifei Li Date: Fri, 16 Feb 2024 15:01:21 -0800 Subject: [PATCH 9/9] reformat --- src/sagemaker/amazon/amazon_estimator.py | 37 ++++++------------------ 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index a62e72bd49..bf4e44df25 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -133,18 +133,14 @@ def data_location(self, data_location: str): if not data_location.startswith("s3://"): raise ValueError( - 'Expecting an S3 URL beginning with "s3://". Got "{}"'.format( - data_location - ) + 'Expecting an S3 URL beginning with "s3://". Got "{}"'.format(data_location) ) if data_location[-1] != "/": data_location = data_location + "/" self._data_location = data_location @classmethod - def _prepare_init_params_from_job_description( - cls, job_details, model_channel_name=None - ): + def _prepare_init_params_from_job_description(cls, job_details, model_channel_name=None): """Convert the job description to init params that can be handled by the class constructor. Args: @@ -172,9 +168,7 @@ def _prepare_init_params_from_job_description( del init_params["image_uri"] return init_params - def prepare_workflow_for_training( - self, records=None, mini_batch_size=None, job_name=None - ): + def prepare_workflow_for_training(self, records=None, mini_batch_size=None, job_name=None): """Calls _prepare_for_training. Used when setting up a workflow. Args: @@ -200,9 +194,7 @@ def _prepare_for_training(self, records, mini_batch_size=None, job_name=None): specified, one is generated, using the base name given to the constructor if applicable. """ - super(AmazonAlgorithmEstimatorBase, self)._prepare_for_training( - job_name=job_name - ) + super(AmazonAlgorithmEstimatorBase, self)._prepare_for_training(job_name=job_name) feature_dim = None @@ -268,9 +260,7 @@ def fit( will be unassociated. * `TrialComponentDisplayName` is used for display in Studio. """ - self._prepare_for_training( - records, job_name=job_name, mini_batch_size=mini_batch_size - ) + self._prepare_for_training(records, job_name=job_name, mini_batch_size=mini_batch_size) experiment_config = check_and_get_run_experiment_config(experiment_config) self.latest_training_job = _TrainingJob.start_new( @@ -323,9 +313,7 @@ def record_set( ) parsed_s3_url = urlparse(self.data_location) bucket, key_prefix = parsed_s3_url.netloc, parsed_s3_url.path - key_prefix = key_prefix + "{}-{}/".format( - type(self).__name__, sagemaker_timestamp() - ) + key_prefix = key_prefix + "{}-{}/".format(type(self).__name__, sagemaker_timestamp()) key_prefix = key_prefix.lstrip("/") logger.debug("Uploading to bucket %s and key_prefix %s", bucket, key_prefix) manifest_s3_file = upload_numpy_to_s3_shards( @@ -352,9 +340,7 @@ def _get_default_mini_batch_size(self, num_records: int): ) return 1 - return min( - self.DEFAULT_MINI_BATCH_SIZE, max(1, int(num_records / self.instance_count)) - ) + return min(self.DEFAULT_MINI_BATCH_SIZE, max(1, int(num_records / self.instance_count))) class RecordSet(object): @@ -463,10 +449,7 @@ def _build_shards(num_shards, array): shard_size = int(array.shape[0] / num_shards) if shard_size == 0: raise ValueError("Array length is less than num shards") - shards = [ - array[i * shard_size : i * shard_size + shard_size] - for i in range(num_shards - 1) - ] + shards = [array[i * shard_size : i * shard_size + shard_size] for i in range(num_shards - 1)] shards.append(array[(num_shards - 1) * shard_size :]) return shards @@ -513,9 +496,7 @@ def upload_numpy_to_s3_shards( manifest_str = json.dumps( [{"prefix": "s3://{}/{}".format(bucket, key_prefix)}] + uploaded_files ) - s3.Object(bucket, manifest_key).put( - Body=manifest_str.encode("utf-8"), **extra_put_kwargs - ) + s3.Object(bucket, manifest_key).put(Body=manifest_str.encode("utf-8"), **extra_put_kwargs) return "s3://{}/{}".format(bucket, manifest_key) except Exception as ex: # pylint: disable=broad-except try: