diff --git a/src/sagemaker/exceptions.py b/src/sagemaker/exceptions.py index 1f17d5cce4..358ddc078a 100644 --- a/src/sagemaker/exceptions.py +++ b/src/sagemaker/exceptions.py @@ -23,6 +23,10 @@ def __init__(self, message, allowed_statuses, actual_status): super(UnexpectedStatusException, self).__init__(message) +class CapacityError(UnexpectedStatusException): + """Raised when resource status is not expected and fails with a reason of CapacityError""" + + class AsyncInferenceError(Exception): """The base exception class for Async Inference exceptions.""" diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index c50a22d3f8..29d49f56bc 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -1721,6 +1721,7 @@ def wait_for_auto_ml_job(self, job, poll=5): (dict): Return value from the ``DescribeAutoMLJob`` API. Raises: + exceptions.CapacityError: If the auto ml job fails with CapacityError. exceptions.UnexpectedStatusException: If the auto ml job fails. """ desc = _wait_until(lambda: _auto_ml_job_status(self.sagemaker_client, job), poll) @@ -1743,7 +1744,8 @@ def logs_for_auto_ml_job( # noqa: C901 - suppress complexity warning for this m completion (default: 5). Raises: - exceptions.UnexpectedStatusException: If waiting and the training job fails. + exceptions.CapacityError: If waiting and auto ml job fails with CapacityError. + exceptions.UnexpectedStatusException: If waiting and auto ml job fails. """ description = self.sagemaker_client.describe_auto_ml_job(AutoMLJobName=job_name) @@ -2845,6 +2847,10 @@ def wait_for_model_package(self, model_package_name, poll=5): Returns: dict: Return value from the ``DescribeEndpoint`` API. + + Raises: + exceptions.CapacityError: If the Model Package job fails with CapacityError. + exceptions.UnexpectedStatusException: If waiting and the Model Package job fails. """ desc = _wait_until( lambda: _create_model_package_status(self.sagemaker_client, model_package_name), poll @@ -2853,10 +2859,17 @@ def wait_for_model_package(self, model_package_name, poll=5): if status != "Completed": reason = desc.get("FailureReason", None) + message = "Error creating model package {package}: {status} Reason: {reason}".format( + package=model_package_name, status=status, reason=reason + ) + if "CapacityError" in str(reason): + raise exceptions.CapacityError( + message=message, + allowed_statuses=["InService"], + actual_status=status, + ) raise exceptions.UnexpectedStatusException( - message="Error creating model package {package}: {status} Reason: {reason}".format( - package=model_package_name, status=status, reason=reason - ), + message=message, allowed_statuses=["Completed"], actual_status=status, ) @@ -3147,6 +3160,7 @@ def wait_for_job(self, job, poll=5): (dict): Return value from the ``DescribeTrainingJob`` API. Raises: + exceptions.CapacityError: If the training job fails with CapacityError. exceptions.UnexpectedStatusException: If the training job fails. """ desc = _wait_until_training_done( @@ -3166,7 +3180,8 @@ def wait_for_processing_job(self, job, poll=5): (dict): Return value from the ``DescribeProcessingJob`` API. Raises: - exceptions.UnexpectedStatusException: If the compilation job fails. + exceptions.CapacityError: If the processing job fails with CapacityError. + exceptions.UnexpectedStatusException: If the processing job fails. """ desc = _wait_until(lambda: _processing_job_status(self.sagemaker_client, job), poll) self._check_job_status(job, desc, "ProcessingJobStatus") @@ -3183,6 +3198,7 @@ def wait_for_compilation_job(self, job, poll=5): (dict): Return value from the ``DescribeCompilationJob`` API. Raises: + exceptions.CapacityError: If the compilation job fails with CapacityError. exceptions.UnexpectedStatusException: If the compilation job fails. """ desc = _wait_until(lambda: _compilation_job_status(self.sagemaker_client, job), poll) @@ -3200,7 +3216,8 @@ def wait_for_edge_packaging_job(self, job, poll=5): (dict): Return value from the ``DescribeEdgePackagingJob`` API. Raises: - exceptions.UnexpectedStatusException: If the compilation job fails. + exceptions.CapacityError: If the edge packaging job fails with CapacityError. + exceptions.UnexpectedStatusException: If the edge packaging job fails. """ desc = _wait_until(lambda: _edge_packaging_job_status(self.sagemaker_client, job), poll) self._check_job_status(job, desc, "EdgePackagingJobStatus") @@ -3217,6 +3234,7 @@ def wait_for_tuning_job(self, job, poll=5): (dict): Return value from the ``DescribeHyperParameterTuningJob`` API. Raises: + exceptions.CapacityError: If the hyperparameter tuning job fails with CapacityError. exceptions.UnexpectedStatusException: If the hyperparameter tuning job fails. """ desc = _wait_until(lambda: _tuning_job_status(self.sagemaker_client, job), poll) @@ -3245,6 +3263,7 @@ def wait_for_transform_job(self, job, poll=5): (dict): Return value from the ``DescribeTransformJob`` API. Raises: + exceptions.CapacityError: If the transform job fails with CapacityError. exceptions.UnexpectedStatusException: If the transform job fails. """ desc = _wait_until(lambda: _transform_job_status(self.sagemaker_client, job), poll) @@ -3283,6 +3302,7 @@ def _check_job_status(self, job, desc, status_key_name): status_key_name (str): Status key name to check for. Raises: + exceptions.CapacityError: If the training job fails with CapacityError. exceptions.UnexpectedStatusException: If the training job fails. """ status = desc[status_key_name] @@ -3298,10 +3318,17 @@ def _check_job_status(self, job, desc, status_key_name): elif status != "Completed": reason = desc.get("FailureReason", "(No reason provided)") job_type = status_key_name.replace("JobStatus", " job") + message = "Error for {job_type} {job_name}: {status}. Reason: {reason}".format( + job_type=job_type, job_name=job, status=status, reason=reason + ) + if "CapacityError" in str(reason): + raise exceptions.CapacityError( + message=message, + allowed_statuses=["Completed", "Stopped"], + actual_status=status, + ) raise exceptions.UnexpectedStatusException( - message="Error for {job_type} {job_name}: {status}. Reason: {reason}".format( - job_type=job_type, job_name=job, status=status, reason=reason - ), + message=message, allowed_statuses=["Completed", "Stopped"], actual_status=status, ) @@ -3313,6 +3340,10 @@ def wait_for_endpoint(self, endpoint, poll=30): endpoint (str): Name of the ``Endpoint`` to wait for. poll (int): Polling interval in seconds (default: 5). + Raises: + exceptions.CapacityError: If the endpoint creation job fails with CapacityError. + exceptions.UnexpectedStatusException: If the endpoint creation job fails. + Returns: dict: Return value from the ``DescribeEndpoint`` API. """ @@ -3321,10 +3352,17 @@ def wait_for_endpoint(self, endpoint, poll=30): if status != "InService": reason = desc.get("FailureReason", None) + message = "Error hosting endpoint {endpoint}: {status}. Reason: {reason}.".format( + endpoint=endpoint, status=status, reason=reason + ) + if "CapacityError" in str(reason): + raise exceptions.CapacityError( + message=message, + allowed_statuses=["InService"], + actual_status=status, + ) raise exceptions.UnexpectedStatusException( - message="Error hosting endpoint {endpoint}: {status}. Reason: {reason}.".format( - endpoint=endpoint, status=status, reason=reason - ), + message=message, allowed_statuses=["InService"], actual_status=status, ) @@ -3649,6 +3687,7 @@ def logs_for_job( # noqa: C901 - suppress complexity warning for this method completion (default: 5). Raises: + exceptions.CapacityError: If the training job fails with CapacityError. exceptions.UnexpectedStatusException: If waiting and the training job fails. """ diff --git a/tests/integ/test_huggingface.py b/tests/integ/test_huggingface.py index 52d5da4fbf..3d52ca44ea 100644 --- a/tests/integ/test_huggingface.py +++ b/tests/integ/test_huggingface.py @@ -15,15 +15,14 @@ import os import pytest -import logging from sagemaker.huggingface import HuggingFace, HuggingFaceProcessor from sagemaker.huggingface.model import HuggingFaceModel, HuggingFacePredictor from sagemaker.utils import unique_name_from_base from tests import integ +from tests.integ.utils import gpu_list, retry_with_instance_list from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name -from sagemaker.exceptions import UnexpectedStatusException ROLE = "SageMakerRole" @@ -34,43 +33,34 @@ and integ.test_region() in integ.TRAINING_NO_P3_REGIONS, reason="no ml.p2 or ml.p3 instances in this region", ) +@retry_with_instance_list(gpu_list(integ.test_region())) def test_framework_processing_job_with_deps( sagemaker_session, - gpu_instance_type_list, huggingface_training_latest_version, huggingface_training_pytorch_latest_version, huggingface_pytorch_latest_training_py_version, + **kwargs, ): - for i_type in gpu_instance_type_list: - logging.info("Using the instance type: {}".format(i_type)) - with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - code_path = os.path.join(DATA_DIR, "dummy_code_bundle_with_reqs") - entry_point = "main_script.py" - - processor = HuggingFaceProcessor( - transformers_version=huggingface_training_latest_version, - pytorch_version=huggingface_training_pytorch_latest_version, - py_version=huggingface_pytorch_latest_training_py_version, - role=ROLE, - instance_count=1, - instance_type=i_type, - sagemaker_session=sagemaker_session, - base_job_name="test-huggingface", - ) - try: - processor.run( - code=entry_point, - source_dir=code_path, - inputs=[], - wait=True, - ) - except UnexpectedStatusException as e: - if "CapacityError" in str(e) and i_type != gpu_instance_type_list[-1]: - logging.warning("Failure using instance type: {}. {}".format(i_type, str(e))) - continue - else: - raise - break + with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): + code_path = os.path.join(DATA_DIR, "dummy_code_bundle_with_reqs") + entry_point = "main_script.py" + + processor = HuggingFaceProcessor( + transformers_version=huggingface_training_latest_version, + pytorch_version=huggingface_training_pytorch_latest_version, + py_version=huggingface_pytorch_latest_training_py_version, + role=ROLE, + instance_count=1, + instance_type=kwargs["instance_type"], + sagemaker_session=sagemaker_session, + base_job_name="test-huggingface", + ) + processor.run( + code=entry_point, + source_dir=code_path, + inputs=[], + wait=True, + ) @pytest.mark.release diff --git a/tests/integ/test_tf.py b/tests/integ/test_tf.py index d47c5c1f2b..bb865a2dd5 100644 --- a/tests/integ/test_tf.py +++ b/tests/integ/test_tf.py @@ -15,7 +15,6 @@ import numpy as np import os import time -import logging import pytest @@ -25,8 +24,8 @@ import tests.integ from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES, kms_utils, timeout from tests.integ.retry import retries +from tests.integ.utils import gpu_list, retry_with_instance_list from tests.integ.s3_utils import assert_s3_file_patterns_exist -from sagemaker.exceptions import UnexpectedStatusException ROLE = "SageMakerRole" @@ -48,41 +47,32 @@ and tests.integ.test_region() in tests.integ.TRAINING_NO_P3_REGIONS, reason="no ml.p2 or ml.p3 instances in this region", ) +@retry_with_instance_list(gpu_list(tests.integ.test_region())) def test_framework_processing_job_with_deps( sagemaker_session, - gpu_instance_type_list, tensorflow_training_latest_version, tensorflow_training_latest_py_version, + **kwargs, ): - for i_type in gpu_instance_type_list: - logging.info("Using the instance type: {}".format(i_type)) - with timeout.timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - code_path = os.path.join(DATA_DIR, "dummy_code_bundle_with_reqs") - entry_point = "main_script.py" - - processor = TensorFlowProcessor( - framework_version=tensorflow_training_latest_version, - py_version=tensorflow_training_latest_py_version, - role=ROLE, - instance_count=1, - instance_type=i_type, - sagemaker_session=sagemaker_session, - base_job_name="test-tensorflow", - ) - try: - processor.run( - code=entry_point, - source_dir=code_path, - inputs=[], - wait=True, - ) - except UnexpectedStatusException as e: - if "CapacityError" in str(e) and i_type != gpu_instance_type_list[-1]: - logging.warning("Failure using instance type: {}. {}".format(i_type, str(e))) - continue - else: - raise - break + with timeout.timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): + code_path = os.path.join(DATA_DIR, "dummy_code_bundle_with_reqs") + entry_point = "main_script.py" + + processor = TensorFlowProcessor( + framework_version=tensorflow_training_latest_version, + py_version=tensorflow_training_latest_py_version, + role=ROLE, + instance_count=1, + instance_type=kwargs["instance_type"], + sagemaker_session=sagemaker_session, + base_job_name="test-tensorflow", + ) + processor.run( + code=entry_point, + source_dir=code_path, + inputs=[], + wait=True, + ) def test_mnist_with_checkpoint_config( diff --git a/tests/integ/utils.py b/tests/integ/utils.py new file mode 100644 index 0000000000..53440f96f5 --- /dev/null +++ b/tests/integ/utils.py @@ -0,0 +1,71 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import +import logging +from functools import wraps + +from tests.conftest import NO_P3_REGIONS, NO_M4_REGIONS +from sagemaker.exceptions import CapacityError + + +def gpu_list(region): + if region in NO_P3_REGIONS: + return ["ml.p2.xlarge"] + else: + return ["ml.p3.2xlarge", "ml.p2.xlarge"] + + +def cpu_list(region): + if region in NO_M4_REGIONS: + return ["ml.m5.xlarge"] + else: + return ["ml.m4.xlarge", "ml.m5.xlarge"] + + +def retry_with_instance_list(instance_list): + """Decorator for running an Integ test with an instance_list and + break on first success + + Args: + instance_list (list): List of Compute instances for integ test. + Usage: + @retry_with_instance_list(instance_list=["ml.g3.2", "ml.g2"]) + def sample_function(): + print("xxxx....") + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + if not (instance_list and isinstance(instance_list, list)): + error_string = f"Parameter instance_list = {instance_list} \ + is expected to be a non-empty list of instance types." + raise Exception(error_string) + for i_type in instance_list: + logging.info(f"Using the instance type: {i_type} for {func.__name__}") + try: + kwargs.update({"instance_type": i_type}) + func(*args, **kwargs) + except CapacityError as e: + if i_type != instance_list[-1]: + logging.warning( + "Failure using instance type: {}. {}".format(i_type, str(e)) + ) + continue + else: + raise + break + + return wrapper + + return decorator diff --git a/tests/unit/test_exception_on_bad_status.py b/tests/unit/test_exception_on_bad_status.py index 1eaa832125..471cb3b9b6 100644 --- a/tests/unit/test_exception_on_bad_status.py +++ b/tests/unit/test_exception_on_bad_status.py @@ -84,6 +84,26 @@ def test_does_raise_when_incorrect_job_status(): assert "Stopped" in e.allowed_statuses +def test_does_raise_capacity_error_when_incorrect_job_status(): + try: + job = Mock() + sagemaker_session = get_sagemaker_session(returns_status="Failed") + sagemaker_session._check_job_status( + job, + { + "TransformationJobStatus": "Failed", + "FailureReason": "CapacityError: Unable to provision requested ML compute capacity", + }, + "TransformationJobStatus", + ) + assert False, "sagemaker.exceptions.CapacityError should have been raised but was not" + except Exception as e: + assert type(e) == sagemaker.exceptions.CapacityError + assert e.actual_status == "Failed" + assert "Completed" in e.allowed_statuses + assert "Stopped" in e.allowed_statuses + + def test_does_not_raise_when_successfully_deployed_endpoint(): try: sagemaker_session = get_sagemaker_session(returns_status="InService")