Skip to content

Commit cede5fa

Browse files
authored
feature: Add FailStep Support for Sagemaker Pipeline (#2872)
1 parent 9d84e2e commit cede5fa

File tree

5 files changed

+634
-113
lines changed

5 files changed

+634
-113
lines changed

doc/workflows/pipelines/sagemaker.workflow.pipelines.rst

+2
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,5 @@ Steps
147147
.. autoclass:: sagemaker.workflow.clarify_check_step.ClarifyCheckConfig
148148

149149
.. autoclass:: sagemaker.workflow.clarify_check_step.ClarifyCheckStep
150+
151+
.. autoclass:: sagemaker.workflow.fail_step.FailStep

src/sagemaker/workflow/fail_step.py

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""The `Step` definitions for SageMaker Pipelines Workflows."""
14+
from __future__ import absolute_import
15+
16+
from typing import List, Union
17+
18+
from sagemaker.workflow import PipelineNonPrimitiveInputTypes
19+
from sagemaker.workflow.entities import (
20+
RequestType,
21+
)
22+
from sagemaker.workflow.steps import Step, StepTypeEnum
23+
24+
25+
class FailStep(Step):
26+
"""`FailStep` for SageMaker Pipelines Workflows."""
27+
28+
def __init__(
29+
self,
30+
name: str,
31+
error_message: Union[str, PipelineNonPrimitiveInputTypes] = None,
32+
display_name: str = None,
33+
description: str = None,
34+
depends_on: Union[List[str], List[Step]] = None,
35+
):
36+
"""Constructs a `FailStep`.
37+
38+
Args:
39+
name (str): The name of the `FailStep`. A name is required and must be
40+
unique within a pipeline.
41+
error_message (str or PipelineNonPrimitiveInputTypes):
42+
An error message defined by the user.
43+
Once the `FailStep` is reached, the execution fails and the
44+
error message is set as the failure reason (default: None).
45+
display_name (str): The display name of the `FailStep`.
46+
The display name provides better UI readability. (default: None).
47+
description (str): The description of the `FailStep` (default: None).
48+
depends_on (List[str] or List[Step]): A list of `Step` names or `Step` instances
49+
that this `FailStep` depends on.
50+
If a listed `Step` name does not exist, an error is returned (default: None).
51+
"""
52+
super(FailStep, self).__init__(
53+
name, display_name, description, StepTypeEnum.FAIL, depends_on
54+
)
55+
self.error_message = error_message if error_message is not None else ""
56+
57+
@property
58+
def arguments(self) -> RequestType:
59+
"""The arguments dictionary that is used to define the `FailStep`."""
60+
return dict(ErrorMessage=self.error_message)
61+
62+
@property
63+
def properties(self):
64+
"""A `Properties` object is not available for the `FailStep`.
65+
66+
Executing a `FailStep` will terminate the pipeline.
67+
`FailStep` properties should not be referenced.
68+
"""
69+
raise RuntimeError(
70+
"FailStep is a terminal step and the Properties object is not available for it."
71+
)

0 commit comments

Comments
 (0)