Skip to content

feat: custom base job name for jumpstart models/estimators #2970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
version: 2

python:
version: 3.6
version: 3.9
install:
- method: pip
path: .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,25 @@ TensorFlow API

.. function:: smdistributed.dataparallel.tensorflow.allreduce(tensor, param_index, num_params, compression=Compression.none, op=ReduceOp.AVERAGE)

Performs an all-reduce operation on a tensor (``tf.Tensor``).
Performs an ``allreduce`` operation on a tensor (``tf.Tensor``).

The ``smdistributed.dataparallel`` package's AllReduce API for TensorFlow to allreduce
gradient tensors. By default, ``smdistributed.dataparallel`` allreduce averages the
gradient tensors across participating workers.

.. note::

:class:`smdistributed.dataparallel.tensorflow.allreduce()` should
only be used to allreduce gradient tensors.
For other (non-gradient) tensors, you must use
:class:`smdistributed.dataparallel.tensorflow.oob_allreduce()`.
If you use :class:`smdistributed.dataparallel.tensorflow.allreduce()`
for non-gradient tensors,
the distributed training job might stall or stop.

``smdistributed.dataparallel`` AllReduce API can be used for all
reducing gradient tensors or any other tensors. By
default, ``smdistributed.dataparallel`` AllReduce averages the
tensors across the participating workers.
**Inputs:**

- ``tensor (tf.Tensor)(required)``: The tensor to be all-reduced. The shape of the input must be identical across all ranks.
- ``tensor (tf.Tensor)(required)``: The tensor to be allreduced. The shape of the input must be identical across all ranks.
- ``param_index (int)(required):`` 0 if you are reducing a single tensor. Index of the tensor if you are reducing a list of tensors.
- ``num_params (int)(required):`` len(tensor).
- ``compression (smdistributed.dataparallel.tensorflow.Compression)(optional)``: Compression algorithm used to reduce the amount of data sent and received by each worker node. Defaults to not using compression.
Expand Down Expand Up @@ -306,9 +315,9 @@ TensorFlow API

.. function:: smdistributed.dataparallel.tensorflow.oob_allreduce(tensor, compression=Compression.none, op=ReduceOp.AVERAGE)

OutOfBand (oob) AllReduce is simplified AllReduce function for use cases
Out-of-band (oob) AllReduce is simplified AllReduce function for use-cases
such as calculating total loss across all the GPUs in the training.
oob_allreduce average the tensors, as reduction operation, across the
``oob_allreduce`` average the tensors, as reduction operation, across the
worker nodes.

**Inputs:**
Expand All @@ -326,15 +335,25 @@ TensorFlow API

- ``None``

.. rubric:: Notes

``smdistributed.dataparallel.tensorflow.oob_allreduce``, in most
cases, is ~2x slower
than ``smdistributed.dataparallel.tensorflow.allreduce``  so it is not
recommended to be used for performing gradient reduction during the
training
process. ``smdistributed.dataparallel.tensorflow.oob_allreduce`` internally
uses NCCL AllReduce with ``ncclSum`` as the reduction operation.
.. note::

In most cases, the :class:`smdistributed.dataparallel.tensorflow.oob_allreduce()`
function is ~2x slower
than :class:`smdistributed.dataparallel.tensorflow.allreduce()`. It is not
recommended to use the :class:`smdistributed.dataparallel.tensorflow.oob_allreduce()`
function for performing gradient
reduction during the training process.
``smdistributed.dataparallel.tensorflow.oob_allreduce`` internally
uses NCCL AllReduce with ``ncclSum`` as the reduction operation.

.. note::

:class:`smdistributed.dataparallel.tensorflow.oob_allreduce()` should
only be used to allreduce non-gradient tensors.
If you use :class:`smdistributed.dataparallel.tensorflow.allreduce()`
for non-gradient tensors,
the distributed training job might stall or stop.
To allreduce gradients, use :class:`smdistributed.dataparallel.tensorflow.allreduce()`.


.. function:: smdistributed.dataparallel.tensorflow.overlap(tensor)
Expand Down
2 changes: 1 addition & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Placeholder docstring"""
"""Configuration for generating readthedocs docstrings."""
from __future__ import absolute_import

import pkg_resources
Expand Down
9 changes: 4 additions & 5 deletions doc/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -773,11 +773,10 @@ Deployment may take about 5 minutes.
   instance_type=instance_type,
)

Because ``catboost`` and ``lightgbm`` rely on the PyTorch Deep Learning Containers
image, the corresponding Models and Endpoints display the “pytorch”
prefix when viewed in the AWS console. To verify that these models
were created successfully with your desired base model, refer to
the ``Tags`` section.
Because the model and script URIs are distributed by SageMaker JumpStart,
the endpoint, endpoint config and model resources will be prefixed with
``sagemaker-jumpstart``. Refer to the model ``Tags`` to inspect the
JumpStart artifacts involved in the model creation.

Perform Inference
-----------------
Expand Down
18 changes: 15 additions & 3 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from sagemaker.job import _Job
from sagemaker.jumpstart.utils import (
add_jumpstart_tags,
get_jumpstart_base_name_if_jumpstart_model,
update_inference_tags_with_jumpstart_training_tags,
)
from sagemaker.local import LocalSession
Expand Down Expand Up @@ -569,8 +570,11 @@ def prepare_workflow_for_training(self, job_name=None):
def _ensure_base_job_name(self):
"""Set ``self.base_job_name`` if it is not set already."""
# honor supplied base_job_name or generate it
if self.base_job_name is None:
self.base_job_name = base_name_from_image(self.training_image_uri())
self.base_job_name = (
self.base_job_name
or get_jumpstart_base_name_if_jumpstart_model(self.source_dir, self.model_uri)
or base_name_from_image(self.training_image_uri())
)

def _get_or_create_name(self, name=None):
"""Generate a name based on the base job name or training image if needed.
Expand Down Expand Up @@ -1208,7 +1212,15 @@ def deploy(
is_serverless = serverless_inference_config is not None
self._ensure_latest_training_job()
self._ensure_base_job_name()
default_name = name_from_base(self.base_job_name)

jumpstart_base_name = get_jumpstart_base_name_if_jumpstart_model(
kwargs.get("source_dir"), self.source_dir, kwargs.get("model_data"), self.model_uri
)
default_name = (
name_from_base(jumpstart_base_name)
if jumpstart_base_name
else name_from_base(self.base_job_name)
)
endpoint_name = endpoint_name or default_name
model_name = model_name or default_name

Expand Down
13 changes: 7 additions & 6 deletions src/sagemaker/huggingface/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ def __init__(
compiler_config=None,
**kwargs,
):
"""This ``Estimator`` executes a HuggingFace script in a managed execution environment.
"""This estimator runs a Hugging Face training script in a SageMaker training environment.

The managed HuggingFace environment is an Amazon-built Docker container that executes
functions defined in the supplied ``entry_point`` Python script within a SageMaker
Training Job.
The estimator initiates the SageMaker-managed Hugging Face environment
by using the pre-built Hugging Face Docker container and runs
the Hugging Face training script that user provides through
the ``entry_point`` argument.

Training is started by calling
:meth:`~sagemaker.amazon.estimator.Framework.fit` on this Estimator.
After configuring the estimator class, use the class method
:meth:`~sagemaker.amazon.estimator.Framework.fit()` to start a training job.

Args:
py_version (str): Python version you want to use for executing your model training
Expand Down
33 changes: 32 additions & 1 deletion src/sagemaker/image_uri_config/neo-tensorflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"1.11.0": "1.15.3",
"1.12.0": "1.15.3",
"1.13.0": "1.15.3",
"1.14.0": "1.15.3"
"1.14.0": "1.15.3",
"2.4.2": "2.4.2"
},
"versions": {
"1.15.3": {
Expand Down Expand Up @@ -44,6 +45,36 @@
"us-west-2": "301217895009"
},
"repository": "sagemaker-inference-tensorflow"
},
"2.4.2": {
"py_versions": ["py3"],
"registries": {
"af-south-1": "774647643957",
"ap-east-1": "110948597952",
"ap-northeast-1": "941853720454",
"ap-northeast-2": "151534178276",
"ap-northeast-3": "925152966179",
"ap-south-1": "763008648453",
"ap-southeast-1": "324986816169",
"ap-southeast-2": "355873309152",
"ca-central-1": "464438896020",
"cn-north-1": "472730292857",
"cn-northwest-1": "474822919863",
"eu-central-1": "746233611703",
"eu-north-1": "601324751636",
"eu-south-1": "966458181534",
"eu-west-1": "802834080501",
"eu-west-2": "205493899709",
"eu-west-3": "254080097072",
"me-south-1": "836785723513",
"sa-east-1": "756306329178",
"us-east-1": "785573368785",
"us-east-2": "007439368137",
"us-gov-west-1": "263933020539",
"us-west-1": "710691900526",
"us-west-2": "301217895009"
},
"repository": "sagemaker-inference-tensorflow"
}
}
}
2 changes: 1 addition & 1 deletion src/sagemaker/jumpstart/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _get_manifest_key_from_model_id_semantic_version(
)

else:
possible_model_ids = [header.model_id for header in manifest.values()]
possible_model_ids = [header.model_id for header in manifest.values()] # type: ignore
closest_model_id = get_close_matches(model_id, possible_model_ids, n=1, cutoff=0)[0]
error_msg += f"Did you mean to use model ID '{closest_model_id}'?"

Expand Down
2 changes: 2 additions & 0 deletions src/sagemaker/jumpstart/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,5 @@
SUPPORTED_JUMPSTART_SCOPES = set(scope.value for scope in JumpStartScriptScope)

ENV_VARIABLE_JUMPSTART_CONTENT_BUCKET_OVERRIDE = "AWS_JUMPSTART_CONTENT_BUCKET_OVERRIDE"

JUMPSTART_RESOURCE_BASE_NAME = "sagemaker-jumpstart"
16 changes: 16 additions & 0 deletions src/sagemaker/jumpstart/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,22 @@ def add_single_jumpstart_tag(
return curr_tags


def get_jumpstart_base_name_if_jumpstart_model(
*uris: Optional[str],
) -> Optional[str]:
"""Return default JumpStart base name if a URI belongs to JumpStart.

If no URIs belong to JumpStart, return None.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please address Mufis' comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did (*uris (Optional[str]): URI to test for association with JumpStart.). I believe you're looking at an old version.


Args:
*uris (Optional[str]): URI to test for association with JumpStart.
"""
for uri in uris:
if is_jumpstart_model_uri(uri):
return constants.JUMPSTART_RESOURCE_BASE_NAME
return None


def add_jumpstart_tags(
tags: Optional[List[Dict[str, str]]] = None,
inference_model_uri: Optional[str] = None,
Expand Down
32 changes: 23 additions & 9 deletions src/sagemaker/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from sagemaker.predictor import PredictorBase
from sagemaker.serverless import ServerlessInferenceConfig
from sagemaker.transformer import Transformer
from sagemaker.jumpstart.utils import add_jumpstart_tags
from sagemaker.jumpstart.utils import add_jumpstart_tags, get_jumpstart_base_name_if_jumpstart_model
from sagemaker.utils import unique_name_from_base
from sagemaker.async_inference import AsyncInferenceConfig
from sagemaker.predictor_async import AsyncPredictor
Expand Down Expand Up @@ -466,7 +466,7 @@ def _upload_code(self, key_prefix: str, repack: bool = False) -> None:
)

def _script_mode_env_vars(self):
"""Placeholder docstring"""
"""Returns a mapping of environment variables for script mode execution"""
script_name = None
dir_name = None
if self.uploaded_code:
Expand All @@ -478,8 +478,11 @@ def _script_mode_env_vars(self):
elif self.entry_point is not None:
script_name = self.entry_point
if self.source_dir is not None:
dir_name = "file://" + self.source_dir

dir_name = (
self.source_dir
if self.source_dir.startswith("s3://")
else "file://" + self.source_dir
)
return {
SCRIPT_PARAM_NAME.upper(): script_name or str(),
DIR_PARAM_NAME.upper(): dir_name or str(),
Expand Down Expand Up @@ -514,7 +517,9 @@ def _create_sagemaker_model(self, instance_type=None, accelerator_type=None, tag
"""
container_def = self.prepare_container_def(instance_type, accelerator_type=accelerator_type)

self._ensure_base_name_if_needed(container_def["Image"])
self._ensure_base_name_if_needed(
image_uri=container_def["Image"], script_uri=self.source_dir, model_uri=self.model_data
)
self._set_model_name_if_needed()

enable_network_isolation = self.enable_network_isolation()
Expand All @@ -529,10 +534,17 @@ def _create_sagemaker_model(self, instance_type=None, accelerator_type=None, tag
tags=tags,
)

def _ensure_base_name_if_needed(self, image_uri):
"""Create a base name from the image URI if there is no model name provided."""
def _ensure_base_name_if_needed(self, image_uri, script_uri, model_uri):
"""Create a base name from the image URI if there is no model name provided.

If a JumpStart script or model uri is used, select the JumpStart base name.
"""
if self.name is None:
self._base_name = self._base_name or utils.base_name_from_image(image_uri)
self._base_name = (
self._base_name
or get_jumpstart_base_name_if_jumpstart_model(script_uri, model_uri)
or utils.base_name_from_image(image_uri)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking: I would prefer to define a jumpstart_base_name variable here as well.


def _set_model_name_if_needed(self):
"""Generate a new model name if ``self._base_name`` is present."""
Expand Down Expand Up @@ -963,7 +975,9 @@ def deploy(

compiled_model_suffix = None if is_serverless else "-".join(instance_type.split(".")[:-1])
if self._is_compiled_model and not is_serverless:
self._ensure_base_name_if_needed(self.image_uri)
self._ensure_base_name_if_needed(
image_uri=self.image_uri, script_uri=self.source_dir, model_uri=self.model_data
)
if self._base_name is not None:
self._base_name = "-".join((self._base_name, compiled_model_suffix))

Expand Down
35 changes: 34 additions & 1 deletion src/sagemaker/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import csv
import io
import json

import numpy as np
from six import with_metaclass

Expand Down Expand Up @@ -357,3 +356,37 @@ def serialize(self, data):
return data.read()

raise ValueError("Unable to handle input format: %s" % type(data))


class DataSerializer(SimpleBaseSerializer):
"""Serialize data in any file by extracting raw bytes from the file."""

def __init__(self, content_type="file-path/raw-bytes"):
"""Initialize a ``DataSerializer`` instance.

Args:
content_type (str): The MIME type to signal to the inference endpoint when sending
request data (default: "file-path/raw-bytes").
"""
super(DataSerializer, self).__init__(content_type=content_type)

def serialize(self, data):
"""Serialize file data to a raw bytes.

Args:
data (object): Data to be serialized. The data can be a string
representing file-path or the raw bytes from a file.
Returns:
raw-bytes: The data serialized as raw-bytes from the input.
"""
if isinstance(data, str):
try:
with open(data, "rb") as data_file:
data_file_info = data_file.read()
return data_file_info
except Exception as e:
raise ValueError(f"Could not open/read file: {data}. {e}")
if isinstance(data, bytes):
return data

raise ValueError(f"Object of type {type(data)} is not Data serializable.")
Loading