Skip to content

feature: support configurable retry for pipeline steps #2662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 80 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
868db55
add helper function to generate no-op (data ingestion only) recipe
jerrypeng7773 May 11, 2021
21bedbb
Merge branch 'aws:master' into master
jerrypeng7773 May 11, 2021
854dd10
separate flow generation by source input type + move generation helpe…
jerrypeng7773 May 11, 2021
8798b65
Merge branch 'aws:master' into master
jerrypeng7773 May 11, 2021
69ae4bd
create an internal helper function to generate output node
jerrypeng7773 May 12, 2021
a6a8449
Merge branch 'master' of github.com:jerrypeng7773/sagemaker-python-sdk
jerrypeng7773 May 12, 2021
2aa256e
Merge branch 'aws:master' into master
jerrypeng7773 May 18, 2021
06557a8
add ingestion test using dw processor via pipeline execution
jerrypeng7773 May 19, 2021
dcbfd13
Merge branch 'aws:master' into master
jerrypeng7773 May 19, 2021
fc6522e
verify the fg query df
jerrypeng7773 May 19, 2021
b6f9371
Merge branch 'master' into master
ahsan-z-khan May 19, 2021
86fa47d
fix tests
jerrypeng7773 May 19, 2021
05ccfa6
Merge branch 'master' into master
ahsan-z-khan May 20, 2021
0716e9f
Merge branch 'aws:master' into master
jerrypeng7773 Jun 14, 2021
7ca5af4
add tuning step support
jerrypeng7773 Jun 24, 2021
8cf18b8
fix docstyle check
jerrypeng7773 Jun 24, 2021
1f95b82
add helper function to get tuning step top performing model s3 uri
jerrypeng7773 Jun 29, 2021
1b9d66b
Merge branch 'aws:master' into master
jerrypeng7773 Jun 30, 2021
5bc47bd
allow step depends on pass in step instance
jerrypeng7773 Jun 30, 2021
603b934
Merge branch 'aws:master' into master
jerrypeng7773 Jun 30, 2021
664f2a8
Merge branch 'master' of github.com:jerrypeng7773/sagemaker-python-sdk
jerrypeng7773 Jun 30, 2021
a8755ec
Merge branch 'master' into master
apogupta2018 Jul 1, 2021
e25d36c
Merge branch 'aws:master' into master
jerrypeng7773 Jul 1, 2021
a9cfab4
Merge branch 'master' into accept-step-object-in-dependson-list
jerrypeng7773 Jul 1, 2021
c0066ea
resolve merge conflict
jerrypeng7773 Jul 1, 2021
e9ac9fa
support passing step object to tuning step depends on list
jerrypeng7773 Jul 1, 2021
eb6a523
fix test_workflow_with_clarify
jerrypeng7773 Jul 1, 2021
c19c426
add tuning step to docs
jerrypeng7773 Jul 6, 2021
450e4a5
allow step instance in depends on list for repack and reigster model …
jerrypeng7773 Jul 6, 2021
cb7be4a
Merge branch 'master' into master
ahsan-z-khan Jul 7, 2021
2918765
add tuning step get_top_model_s3_uri to doc
jerrypeng7773 Jul 9, 2021
fe9bd70
Merge branch 'aws:master' into master
jerrypeng7773 Jul 9, 2021
378c868
Merge branch 'master' of github.com:jerrypeng7773/sagemaker-python-sdk
jerrypeng7773 Jul 9, 2021
93cdb68
remove extra new line
jerrypeng7773 Jul 9, 2021
24226f9
add callback step to doc
jerrypeng7773 Jul 9, 2021
001cac5
switch order in doc
jerrypeng7773 Jul 9, 2021
b5c00c1
Merge branch 'master' into master
ahsan-z-khan Jul 12, 2021
3b75821
Merge branch 'master' into accept-step-object-in-dependson-list
ahsan-z-khan Jul 12, 2021
e70ae34
Merge branch 'aws:master' into master
jerrypeng7773 Jul 14, 2021
0eaf41b
Merge branch 'master' of https://github.com/aws/sagemaker-python-sdk …
jerrypeng7773 Jul 14, 2021
dad08c4
fix formatting
jerrypeng7773 Jul 14, 2021
edf9cba
support parameterize tuning job parameter ranges
jerrypeng7773 Aug 3, 2021
57bd90d
Merge branch 'aws:master' into master
jerrypeng7773 Aug 3, 2021
597bb74
Merge branch 'aws:master' into master
jerrypeng7773 Aug 3, 2021
ae55619
support tuning step parameter range parameterization + support retry …
jerrypeng7773 Aug 3, 2021
5a6148a
Merge branch 'master' into master
ahsan-z-khan Aug 9, 2021
9b1d905
Merge branch 'master' into master
ahsan-z-khan Aug 11, 2021
282c9fe
Merge branch 'master' into master
ahsan-z-khan Aug 11, 2021
7279588
Merge branch 'aws:master' into master
jerrypeng7773 Sep 1, 2021
b92389b
Merge branch 'aws:master' into master
jerrypeng7773 Sep 7, 2021
3e7b04c
add configurable retry support
jerrypeng7773 Sep 7, 2021
dd1fef5
remove adding new default throttling retry policy
jerrypeng7773 Sep 7, 2021
7715357
reformatting
jerrypeng7773 Sep 7, 2021
12905cd
doc: update experiment config doc on fit method (#2609)
danabens Sep 10, 2021
962a06e
prepare release v2.59.1.post0
Sep 13, 2021
9c6c0c7
update development version to v2.59.2.dev0
Sep 13, 2021
c47e598
fix: unit tests for KIX and remove regional calls to boto (#2640)
shreyapandit Sep 15, 2021
d2b43d2
documentation: Remove Shortbread (#2610)
jkroll-aws Sep 15, 2021
7bb72e3
prepare release v2.59.2
Sep 15, 2021
0d435af
update development version to v2.59.3.dev0
Sep 15, 2021
0e496ad
Documentation: instance_type no longer hidden in instance_count docum…
daMichaelB Sep 17, 2021
ec44d50
prepare release v2.59.3
Sep 20, 2021
08d7c5b
update development version to v2.59.4.dev0
Sep 20, 2021
95e161c
documentation: Info about offline s3 bucket key when creating feature…
can-sun Sep 21, 2021
8f801f0
prepare release v2.59.3.post0
Sep 22, 2021
ba5d88d
update development version to v2.59.4.dev0
Sep 22, 2021
5e9fdba
fix: add pytorch 1.8.1 for huggingface (#2642)
jeniyat Sep 23, 2021
4fa9d18
Doc: updated pr template to check backward compatibility (#2655)
shreyapandit Sep 23, 2021
5a9e654
support pipeline step configurable retry
Sep 2, 2021
fb5140e
Merge branch 'aws:master' into master
jerrypeng7773 Sep 24, 2021
c435184
Merge branch 'master' into master
ahsan-z-khan Sep 27, 2021
1e24b0f
make default retry parameters static vars
jerrypeng7773 Sep 27, 2021
df628d7
remove unused import
jerrypeng7773 Sep 27, 2021
6647ea6
Merge branch 'master' into master
jerrypeng7773 Oct 9, 2021
02acf0c
Merge branch 'master' into master
ahsan-z-khan Oct 9, 2021
0289290
Merge branch 'master' into master
jeniyat Oct 10, 2021
c07b2e3
Merge branch 'master' into master
ahsan-z-khan Oct 11, 2021
336e293
Merge branch 'master' into master
ahsan-z-khan Oct 12, 2021
a713c6d
Merge branch 'master' into master
ahsan-z-khan Oct 12, 2021
7fee07b
Merge branch 'master' into master
ahsan-z-khan Oct 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/sagemaker/workflow/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.entities import RequestType
from sagemaker.workflow.properties import Properties
from sagemaker.session import get_create_model_package_request
from sagemaker.session import get_model_package_args
from sagemaker.session import get_create_model_package_request, get_model_package_args
from sagemaker.workflow.steps import (
StepTypeEnum,
TrainingStep,
Step,
ConfigurableRetryStep,
)
from sagemaker.workflow.retry import RetryPolicy

FRAMEWORK_VERSION = "0.23-1"
INSTANCE_TYPE = "ml.m5.large"
Expand All @@ -60,6 +61,7 @@ def __init__(
source_dir: str = None,
dependencies: List = None,
depends_on: Union[List[str], List[Step]] = None,
retry_policies: List[RetryPolicy] = None,
subnets=None,
security_group_ids=None,
**kwargs,
Expand Down Expand Up @@ -126,6 +128,7 @@ def __init__(
This is not supported with "local code" in Local Mode.
depends_on (List[str] or List[Step]): A list of step names or instances
this step depends on
retry_policies (List[RetryPolicy]): The list of retry policies for the current step
subnets (list[str]): List of subnet ids. If not specified, the re-packing
job will be created without VPC config.
security_group_ids (list[str]): List of security group ids. If not
Expand Down Expand Up @@ -171,6 +174,7 @@ def __init__(
display_name=display_name,
description=description,
depends_on=depends_on,
retry_policies=retry_policies,
estimator=repacker,
inputs=inputs,
)
Expand Down Expand Up @@ -252,7 +256,7 @@ def properties(self):
return self._properties


class _RegisterModelStep(Step):
class _RegisterModelStep(ConfigurableRetryStep):
"""Register model step in workflow that creates a model package.

Attributes:
Expand Down Expand Up @@ -295,6 +299,7 @@ def __init__(
display_name: str = None,
description=None,
depends_on: Union[List[str], List[Step]] = None,
retry_policies: List[RetryPolicy] = None,
tags=None,
container_def_list=None,
**kwargs,
Expand Down Expand Up @@ -332,10 +337,11 @@ def __init__(
description (str): Model Package description (default: None).
depends_on (List[str] or List[Step]): A list of step names or instances
this step depends on
retry_policies (List[RetryPolicy]): The list of retry policies for the current step
**kwargs: additional arguments to `create_model`.
"""
super(_RegisterModelStep, self).__init__(
name, display_name, description, StepTypeEnum.REGISTER_MODEL, depends_on
name, StepTypeEnum.REGISTER_MODEL, display_name, description, depends_on, retry_policies
)
self.estimator = estimator
self.model_data = model_data
Expand Down
201 changes: 201 additions & 0 deletions src/sagemaker/workflow/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Pipeline parameters and conditions for workflow."""
from __future__ import absolute_import

from enum import Enum
from typing import List
import attr

from sagemaker.workflow.entities import Entity, DefaultEnumMeta, RequestType

MAX_ATTEMPTS_CAP = 20
MAX_EXPIRE_AFTER_MIN = 14400


class StepExceptionTypeEnum(Enum, metaclass=DefaultEnumMeta):
"""Step ExceptionType enum."""

SERVICE_FAULT = "Step.SERVICE_FAULT"
THROTTLING = "Step.THROTTLING"


class SageMakerJobExceptionTypeEnum(Enum, metaclass=DefaultEnumMeta):
"""SageMaker Job ExceptionType enum."""

INTERNAL_ERROR = "SageMaker.JOB_INTERNAL_ERROR"
CAPACITY_ERROR = "SageMaker.CAPACITY_ERROR"
RESOURCE_LIMIT = "SageMaker.RESOURCE_LIMIT"


@attr.s
class RetryPolicy(Entity):
"""RetryPolicy base class

Attributes:
backoff_rate (float): The multiplier by which the retry interval increases
during each attempt (default: 2.0)
interval_seconds (int): An integer that represents the number of seconds before the
first retry attempt (default: 1)
max_attempts (int): A positive integer that represents the maximum
number of retry attempts. (default: None)
expire_after_mins (int): A positive integer that represents the maximum minute
to expire any further retry attempt (default: None)
"""

backoff_rate: float = attr.ib(default=2.0)
interval_seconds: int = attr.ib(default=1.0)
max_attempts: int = attr.ib(default=None)
expire_after_mins: int = attr.ib(default=None)

@backoff_rate.validator
def validate_backoff_rate(self, _, value):
"""Validate the input back off rate type"""
if value:
assert value >= 0.0, "backoff_rate should be non-negative"

@interval_seconds.validator
def validate_interval_seconds(self, _, value):
"""Validate the input interval seconds"""
if value:
assert value >= 0.0, "interval_seconds rate should be non-negative"

@max_attempts.validator
def validate_max_attempts(self, _, value):
"""Validate the input max attempts"""
if value:
assert (
MAX_ATTEMPTS_CAP >= value >= 1
), f"max_attempts must in range of (0, {MAX_ATTEMPTS_CAP}] attempts"

@expire_after_mins.validator
def validate_expire_after_mins(self, _, value):
"""Validate expire after mins"""
if value:
assert (
MAX_EXPIRE_AFTER_MIN >= value >= 0
), f"expire_after_mins must in range of (0, {MAX_EXPIRE_AFTER_MIN}] minutes"

def to_request(self) -> RequestType:
"""Get the request structure for workflow service calls."""
if (self.max_attempts is None) == self.expire_after_mins is None:
raise ValueError("Only one of [max_attempts] and [expire_after_mins] can be given.")

request = {
"BackoffRate": self.backoff_rate,
"IntervalSeconds": self.interval_seconds,
}

if self.max_attempts:
request["MaxAttempts"] = self.max_attempts

if self.expire_after_mins:
request["ExpireAfterMin"] = self.expire_after_mins

return request


class StepRetryPolicy(RetryPolicy):
"""RetryPolicy for a retryable step. The pipeline service will retry

`sagemaker.workflow.retry.StepRetryExceptionTypeEnum.SERVICE_FAULT` and
`sagemaker.workflow.retry.StepRetryExceptionTypeEnum.THROTTLING` regardless of
pipeline step type by default. However, for step defined as retryable, you can override them
by specifying a StepRetryPolicy.

Attributes:
exception_types (List[StepExceptionTypeEnum]): the exception types to match for this policy
backoff_rate (float): The multiplier by which the retry interval increases
during each attempt (default: 2.0)
interval_seconds (int): An integer that represents the number of seconds before the
first retry attempt (default: 1)
max_attempts (int): A positive integer that represents the maximum
number of retry attempts. (default: None)
expire_after_mins (int): A positive integer that represents the maximum minute
to expire any further retry attempt (default: None)
"""

def __init__(
self,
exception_types: List[StepExceptionTypeEnum],
backoff_rate: float = 2.0,
interval_seconds: int = 1,
max_attempts: int = None,
expire_after_mins: int = None,
):
super().__init__(backoff_rate, interval_seconds, max_attempts, expire_after_mins)
for exception_type in exception_types:
if not isinstance(exception_type, StepExceptionTypeEnum):
raise ValueError(f"{exception_type} is not of StepExceptionTypeEnum.")
self.exception_types = exception_types

def to_request(self) -> RequestType:
"""Gets the request structure for retry policy."""
request = super().to_request()
request["ExceptionType"] = [e.value for e in self.exception_types]
return request


class SageMakerJobStepRetryPolicy(RetryPolicy):
"""RetryPolicy for exception thrown by SageMaker Job.

Attributes:
exception_types (List[SageMakerJobExceptionTypeEnum]):
The SageMaker exception to match for this policy. The SageMaker exceptions
captured here are the exceptions thrown by synchronously
creating the job. For instance the resource limit exception.
failure_reason_types (List[SageMakerJobExceptionTypeEnum]): the SageMaker
failure reason types to match for this policy. The failure reason type
is presented in FailureReason field of the Describe response, it indicates
the runtime failure reason for a job.
backoff_rate (float): The multiplier by which the retry interval increases
during each attempt (default: 2.0)
interval_seconds (int): An integer that represents the number of seconds before the
first retry attempt (default: 1)
max_attempts (int): A positive integer that represents the maximum
number of retry attempts. (default: None)
expire_after_mins (int): A positive integer that represents the maximum minute
to expire any further retry attempt (default: None)
"""

def __init__(
self,
exception_types: List[SageMakerJobExceptionTypeEnum] = None,
failure_reason_types: List[SageMakerJobExceptionTypeEnum] = None,
backoff_rate: float = 2.0,
interval_seconds: int = 1,
max_attempts: int = None,
expire_after_mins: int = None,
):
super().__init__(backoff_rate, interval_seconds, max_attempts, expire_after_mins)

if not exception_types and not failure_reason_types:
raise ValueError(
"At least one of the [exception_types, failure_reason_types] needs to be given."
)

self.exception_type_list: List[SageMakerJobExceptionTypeEnum] = []
if exception_types:
self.exception_type_list += exception_types
if failure_reason_types:
self.exception_type_list += failure_reason_types

for exception_type in self.exception_type_list:
if not isinstance(exception_type, SageMakerJobExceptionTypeEnum):
raise ValueError(f"{exception_type} is not of SageMakerJobExceptionTypeEnum.")

def to_request(self) -> RequestType:
"""Gets the request structure for retry policy."""
request = super().to_request()
request["ExceptionType"] = [e.value for e in self.exception_type_list]
return request
23 changes: 23 additions & 0 deletions src/sagemaker/workflow/step_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
_RegisterModelStep,
_RepackModelStep,
)
from sagemaker.workflow.retry import RetryPolicy


@attr.s
Expand Down Expand Up @@ -62,6 +63,8 @@ def __init__(
estimator: EstimatorBase = None,
model_data=None,
depends_on: Union[List[str], List[Step]] = None,
repack_model_step_retry_policies: List[RetryPolicy] = None,
register_model_step_retry_policies: List[RetryPolicy] = None,
model_package_group_name=None,
model_metrics=None,
approval_status=None,
Expand All @@ -87,6 +90,10 @@ def __init__(
job can be run or on which an endpoint can be deployed (default: None).
depends_on (List[str] or List[Step]): The list of step names or step instances
the first step in the collection depends on
repack_model_step_retry_policies (List[RetryPolicy]): The list of retry policies
for the repack model step
register_model_step_retry_policies (List[RetryPolicy]): The list of retry policies
for register model step
model_package_group_name (str): The Model Package Group name, exclusive to
`model_package_name`, using `model_package_group_name` makes the Model Package
versioned (default: None).
Expand Down Expand Up @@ -130,6 +137,7 @@ def __init__(
repack_model_step = _RepackModelStep(
name=f"{name}RepackModel",
depends_on=depends_on,
retry_policies=repack_model_step_retry_policies,
sagemaker_session=estimator.sagemaker_session,
role=estimator.role,
model_data=model_data,
Expand Down Expand Up @@ -173,6 +181,7 @@ def __init__(
repack_model_step = _RepackModelStep(
name=f"{model_name}RepackModel",
depends_on=depends_on,
retry_policies=repack_model_step_retry_policies,
sagemaker_session=sagemaker_session,
role=role,
model_data=model_entity.model_data,
Expand Down Expand Up @@ -216,6 +225,7 @@ def __init__(
display_name=display_name,
tags=tags,
container_def_list=self.container_def_list,
retry_policies=register_model_step_retry_policies,
**kwargs,
)
if not repack_model:
Expand Down Expand Up @@ -254,6 +264,10 @@ def __init__(
tags=None,
volume_kms_key=None,
depends_on: Union[List[str], List[Step]] = None,
# step retry policies
repack_model_step_retry_policies: List[RetryPolicy] = None,
model_step_retry_policies: List[RetryPolicy] = None,
transform_step_retry_policies: List[RetryPolicy] = None,
**kwargs,
):
"""Construct steps required for a Transformer step collection:
Expand Down Expand Up @@ -292,6 +306,12 @@ def __init__(
transform job (default: None).
depends_on (List[str] or List[Step]): The list of step names or step instances
the first step in the collection depends on
repack_model_step_retry_policies (List[RetryPolicy]): The list of retry policies
for the repack model step
model_step_retry_policies (List[RetryPolicy]): The list of retry policies for
model step
transform_step_retry_policies (List[RetryPolicy]): The list of retry policies for
transform step
"""
steps = []
if "entry_point" in kwargs:
Expand All @@ -301,6 +321,7 @@ def __init__(
repack_model_step = _RepackModelStep(
name=f"{name}RepackModel",
depends_on=depends_on,
retry_policies=repack_model_step_retry_policies,
sagemaker_session=estimator.sagemaker_session,
role=estimator.sagemaker_session,
model_data=model_data,
Expand Down Expand Up @@ -336,6 +357,7 @@ def predict_wrapper(endpoint, session):
inputs=model_inputs,
description=description,
display_name=display_name,
retry_policies=model_step_retry_policies,
)
if "entry_point" not in kwargs and depends_on:
# if the CreateModelStep is the first step in the collection
Expand Down Expand Up @@ -365,6 +387,7 @@ def predict_wrapper(endpoint, session):
inputs=transform_inputs,
description=description,
display_name=display_name,
retry_policies=transform_step_retry_policies,
)
steps.append(transform_step)

Expand Down
Loading