Skip to content

feature: pluggable instance fallback mechanism, add CapacityError #3033

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/sagemaker/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def __init__(self, message, allowed_statuses, actual_status):
super(UnexpectedStatusException, self).__init__(message)


class CapacityError(UnexpectedStatusException):
"""Raised when resource status is not expected and fails with a reason of CapacityError"""


class AsyncInferenceError(Exception):
"""The base exception class for Async Inference exceptions."""

Expand Down
63 changes: 51 additions & 12 deletions src/sagemaker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,7 @@ def wait_for_auto_ml_job(self, job, poll=5):
(dict): Return value from the ``DescribeAutoMLJob`` API.

Raises:
exceptions.CapacityError: If the auto ml job fails with CapacityError.
exceptions.UnexpectedStatusException: If the auto ml job fails.
"""
desc = _wait_until(lambda: _auto_ml_job_status(self.sagemaker_client, job), poll)
Expand All @@ -1743,7 +1744,8 @@ def logs_for_auto_ml_job( # noqa: C901 - suppress complexity warning for this m
completion (default: 5).

Raises:
exceptions.UnexpectedStatusException: If waiting and the training job fails.
exceptions.CapacityError: If waiting and auto ml job fails with CapacityError.
exceptions.UnexpectedStatusException: If waiting and auto ml job fails.
"""

description = self.sagemaker_client.describe_auto_ml_job(AutoMLJobName=job_name)
Expand Down Expand Up @@ -2845,6 +2847,10 @@ def wait_for_model_package(self, model_package_name, poll=5):

Returns:
dict: Return value from the ``DescribeEndpoint`` API.

Raises:
exceptions.CapacityError: If the Model Package job fails with CapacityError.
exceptions.UnexpectedStatusException: If waiting and the Model Package job fails.
"""
desc = _wait_until(
lambda: _create_model_package_status(self.sagemaker_client, model_package_name), poll
Expand All @@ -2853,10 +2859,17 @@ def wait_for_model_package(self, model_package_name, poll=5):

if status != "Completed":
reason = desc.get("FailureReason", None)
message = "Error creating model package {package}: {status} Reason: {reason}".format(
package=model_package_name, status=status, reason=reason
)
if "CapacityError" in str(reason):
raise exceptions.CapacityError(
message=message,
allowed_statuses=["InService"],
actual_status=status,
)
raise exceptions.UnexpectedStatusException(
message="Error creating model package {package}: {status} Reason: {reason}".format(
package=model_package_name, status=status, reason=reason
),
message=message,
allowed_statuses=["Completed"],
actual_status=status,
)
Expand Down Expand Up @@ -3147,6 +3160,7 @@ def wait_for_job(self, job, poll=5):
(dict): Return value from the ``DescribeTrainingJob`` API.

Raises:
exceptions.CapacityError: If the training job fails with CapacityError.
exceptions.UnexpectedStatusException: If the training job fails.
"""
desc = _wait_until_training_done(
Expand All @@ -3166,7 +3180,8 @@ def wait_for_processing_job(self, job, poll=5):
(dict): Return value from the ``DescribeProcessingJob`` API.

Raises:
exceptions.UnexpectedStatusException: If the compilation job fails.
exceptions.CapacityError: If the processing job fails with CapacityError.
exceptions.UnexpectedStatusException: If the processing job fails.
"""
desc = _wait_until(lambda: _processing_job_status(self.sagemaker_client, job), poll)
self._check_job_status(job, desc, "ProcessingJobStatus")
Expand All @@ -3183,6 +3198,7 @@ def wait_for_compilation_job(self, job, poll=5):
(dict): Return value from the ``DescribeCompilationJob`` API.

Raises:
exceptions.CapacityError: If the compilation job fails with CapacityError.
exceptions.UnexpectedStatusException: If the compilation job fails.
"""
desc = _wait_until(lambda: _compilation_job_status(self.sagemaker_client, job), poll)
Expand All @@ -3200,7 +3216,8 @@ def wait_for_edge_packaging_job(self, job, poll=5):
(dict): Return value from the ``DescribeEdgePackagingJob`` API.

Raises:
exceptions.UnexpectedStatusException: If the compilation job fails.
exceptions.CapacityError: If the edge packaging job fails with CapacityError.
exceptions.UnexpectedStatusException: If the edge packaging job fails.
"""
desc = _wait_until(lambda: _edge_packaging_job_status(self.sagemaker_client, job), poll)
self._check_job_status(job, desc, "EdgePackagingJobStatus")
Expand All @@ -3217,6 +3234,7 @@ def wait_for_tuning_job(self, job, poll=5):
(dict): Return value from the ``DescribeHyperParameterTuningJob`` API.

Raises:
exceptions.CapacityError: If the hyperparameter tuning job fails with CapacityError.
exceptions.UnexpectedStatusException: If the hyperparameter tuning job fails.
"""
desc = _wait_until(lambda: _tuning_job_status(self.sagemaker_client, job), poll)
Expand Down Expand Up @@ -3245,6 +3263,7 @@ def wait_for_transform_job(self, job, poll=5):
(dict): Return value from the ``DescribeTransformJob`` API.

Raises:
exceptions.CapacityError: If the transform job fails with CapacityError.
exceptions.UnexpectedStatusException: If the transform job fails.
"""
desc = _wait_until(lambda: _transform_job_status(self.sagemaker_client, job), poll)
Expand Down Expand Up @@ -3283,6 +3302,7 @@ def _check_job_status(self, job, desc, status_key_name):
status_key_name (str): Status key name to check for.

Raises:
exceptions.CapacityError: If the training job fails with CapacityError.
exceptions.UnexpectedStatusException: If the training job fails.
"""
status = desc[status_key_name]
Expand All @@ -3298,10 +3318,17 @@ def _check_job_status(self, job, desc, status_key_name):
elif status != "Completed":
reason = desc.get("FailureReason", "(No reason provided)")
job_type = status_key_name.replace("JobStatus", " job")
message = "Error for {job_type} {job_name}: {status}. Reason: {reason}".format(
job_type=job_type, job_name=job, status=status, reason=reason
)
if "CapacityError" in str(reason):
raise exceptions.CapacityError(
message=message,
allowed_statuses=["Completed", "Stopped"],
actual_status=status,
)
raise exceptions.UnexpectedStatusException(
message="Error for {job_type} {job_name}: {status}. Reason: {reason}".format(
job_type=job_type, job_name=job, status=status, reason=reason
),
message=message,
allowed_statuses=["Completed", "Stopped"],
actual_status=status,
)
Expand All @@ -3313,6 +3340,10 @@ def wait_for_endpoint(self, endpoint, poll=30):
endpoint (str): Name of the ``Endpoint`` to wait for.
poll (int): Polling interval in seconds (default: 5).

Raises:
exceptions.CapacityError: If the endpoint creation job fails with CapacityError.
exceptions.UnexpectedStatusException: If the endpoint creation job fails.

Returns:
dict: Return value from the ``DescribeEndpoint`` API.
"""
Expand All @@ -3321,10 +3352,17 @@ def wait_for_endpoint(self, endpoint, poll=30):

if status != "InService":
reason = desc.get("FailureReason", None)
message = "Error hosting endpoint {endpoint}: {status}. Reason: {reason}.".format(
endpoint=endpoint, status=status, reason=reason
)
if "CapacityError" in str(reason):
raise exceptions.CapacityError(
message=message,
allowed_statuses=["InService"],
actual_status=status,
)
raise exceptions.UnexpectedStatusException(
message="Error hosting endpoint {endpoint}: {status}. Reason: {reason}.".format(
endpoint=endpoint, status=status, reason=reason
),
message=message,
allowed_statuses=["InService"],
actual_status=status,
)
Expand Down Expand Up @@ -3649,6 +3687,7 @@ def logs_for_job( # noqa: C901 - suppress complexity warning for this method
completion (default: 5).

Raises:
exceptions.CapacityError: If the training job fails with CapacityError.
exceptions.UnexpectedStatusException: If waiting and the training job fails.
"""

Expand Down
56 changes: 23 additions & 33 deletions tests/integ/test_huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
import os

import pytest
import logging

from sagemaker.huggingface import HuggingFace, HuggingFaceProcessor
from sagemaker.huggingface.model import HuggingFaceModel, HuggingFacePredictor
from sagemaker.utils import unique_name_from_base
from tests import integ
from tests.integ.utils import gpu_list, retry_with_instance_list
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES
from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name
from sagemaker.exceptions import UnexpectedStatusException

ROLE = "SageMakerRole"

Expand All @@ -34,43 +33,34 @@
and integ.test_region() in integ.TRAINING_NO_P3_REGIONS,
reason="no ml.p2 or ml.p3 instances in this region",
)
@retry_with_instance_list(gpu_list(integ.test_region()))
def test_framework_processing_job_with_deps(
sagemaker_session,
gpu_instance_type_list,
huggingface_training_latest_version,
huggingface_training_pytorch_latest_version,
huggingface_pytorch_latest_training_py_version,
**kwargs,
):
for i_type in gpu_instance_type_list:
logging.info("Using the instance type: {}".format(i_type))
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
code_path = os.path.join(DATA_DIR, "dummy_code_bundle_with_reqs")
entry_point = "main_script.py"

processor = HuggingFaceProcessor(
transformers_version=huggingface_training_latest_version,
pytorch_version=huggingface_training_pytorch_latest_version,
py_version=huggingface_pytorch_latest_training_py_version,
role=ROLE,
instance_count=1,
instance_type=i_type,
sagemaker_session=sagemaker_session,
base_job_name="test-huggingface",
)
try:
processor.run(
code=entry_point,
source_dir=code_path,
inputs=[],
wait=True,
)
except UnexpectedStatusException as e:
if "CapacityError" in str(e) and i_type != gpu_instance_type_list[-1]:
logging.warning("Failure using instance type: {}. {}".format(i_type, str(e)))
continue
else:
raise
break
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
code_path = os.path.join(DATA_DIR, "dummy_code_bundle_with_reqs")
entry_point = "main_script.py"

processor = HuggingFaceProcessor(
transformers_version=huggingface_training_latest_version,
pytorch_version=huggingface_training_pytorch_latest_version,
py_version=huggingface_pytorch_latest_training_py_version,
role=ROLE,
instance_count=1,
instance_type=kwargs["instance_type"],
sagemaker_session=sagemaker_session,
base_job_name="test-huggingface",
)
processor.run(
code=entry_point,
source_dir=code_path,
inputs=[],
wait=True,
)


@pytest.mark.release
Expand Down
54 changes: 22 additions & 32 deletions tests/integ/test_tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import numpy as np
import os
import time
import logging

import pytest

Expand All @@ -25,8 +24,8 @@
import tests.integ
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES, kms_utils, timeout
from tests.integ.retry import retries
from tests.integ.utils import gpu_list, retry_with_instance_list
from tests.integ.s3_utils import assert_s3_file_patterns_exist
from sagemaker.exceptions import UnexpectedStatusException


ROLE = "SageMakerRole"
Expand All @@ -48,41 +47,32 @@
and tests.integ.test_region() in tests.integ.TRAINING_NO_P3_REGIONS,
reason="no ml.p2 or ml.p3 instances in this region",
)
@retry_with_instance_list(gpu_list(tests.integ.test_region()))
def test_framework_processing_job_with_deps(
sagemaker_session,
gpu_instance_type_list,
tensorflow_training_latest_version,
tensorflow_training_latest_py_version,
**kwargs,
):
for i_type in gpu_instance_type_list:
logging.info("Using the instance type: {}".format(i_type))
with timeout.timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
code_path = os.path.join(DATA_DIR, "dummy_code_bundle_with_reqs")
entry_point = "main_script.py"

processor = TensorFlowProcessor(
framework_version=tensorflow_training_latest_version,
py_version=tensorflow_training_latest_py_version,
role=ROLE,
instance_count=1,
instance_type=i_type,
sagemaker_session=sagemaker_session,
base_job_name="test-tensorflow",
)
try:
processor.run(
code=entry_point,
source_dir=code_path,
inputs=[],
wait=True,
)
except UnexpectedStatusException as e:
if "CapacityError" in str(e) and i_type != gpu_instance_type_list[-1]:
logging.warning("Failure using instance type: {}. {}".format(i_type, str(e)))
continue
else:
raise
break
with timeout.timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
code_path = os.path.join(DATA_DIR, "dummy_code_bundle_with_reqs")
entry_point = "main_script.py"

processor = TensorFlowProcessor(
framework_version=tensorflow_training_latest_version,
py_version=tensorflow_training_latest_py_version,
role=ROLE,
instance_count=1,
instance_type=kwargs["instance_type"],
sagemaker_session=sagemaker_session,
base_job_name="test-tensorflow",
)
processor.run(
code=entry_point,
source_dir=code_path,
inputs=[],
wait=True,
)


def test_mnist_with_checkpoint_config(
Expand Down
Loading