16
16
import json
17
17
18
18
from copy import deepcopy
19
- from typing import Any , Dict , List , Sequence , Union
19
+ from typing import Any , Dict , List , Sequence , Union , Optional
20
20
21
21
import attr
22
22
import botocore
30
30
Expression ,
31
31
RequestType ,
32
32
)
33
+ from sagemaker .workflow .execution_variables import ExecutionVariables
33
34
from sagemaker .workflow .parameters import Parameter
35
+ from sagemaker .workflow .pipeline_experiment_config import PipelineExperimentConfig
34
36
from sagemaker .workflow .properties import Properties
35
37
from sagemaker .workflow .steps import Step
36
38
from sagemaker .workflow .step_collections import StepCollection
@@ -44,6 +46,12 @@ class Pipeline(Entity):
44
46
Attributes:
45
47
name (str): The name of the pipeline.
46
48
parameters (Sequence[Parameters]): The list of the parameters.
49
+ pipeline_experiment_config (Optional[PipelineExperimentConfig]): If set,
50
+ the workflow will attempt to create an experiment and trial before
51
+ executing the steps. Creation will be skipped if an experiment or a trial with
52
+ the same name already exists. By default, pipeline name is used as
53
+ experiment name and execution id is used as the trial name.
54
+ If set to None, no experiment or trial will be created automatically.
47
55
steps (Sequence[Steps]): The list of the non-conditional steps associated with the pipeline.
48
56
Any steps that are within the
49
57
`if_steps` or `else_steps` of a `ConditionStep` cannot be listed in the steps of a
@@ -57,6 +65,11 @@ class Pipeline(Entity):
57
65
58
66
name : str = attr .ib (factory = str )
59
67
parameters : Sequence [Parameter ] = attr .ib (factory = list )
68
+ pipeline_experiment_config : Optional [PipelineExperimentConfig ] = attr .ib (
69
+ default = PipelineExperimentConfig (
70
+ ExecutionVariables .PIPELINE_NAME , ExecutionVariables .PIPELINE_EXECUTION_ID
71
+ )
72
+ )
60
73
steps : Sequence [Union [Step , StepCollection ]] = attr .ib (factory = list )
61
74
sagemaker_session : Session = attr .ib (factory = Session )
62
75
@@ -69,22 +82,23 @@ def to_request(self) -> RequestType:
69
82
"Version" : self ._version ,
70
83
"Metadata" : self ._metadata ,
71
84
"Parameters" : list_to_request (self .parameters ),
85
+ "PipelineExperimentConfig" : self .pipeline_experiment_config .to_request ()
86
+ if self .pipeline_experiment_config is not None
87
+ else None ,
72
88
"Steps" : list_to_request (self .steps ),
73
89
}
74
90
75
91
def create (
76
92
self ,
77
93
role_arn : str ,
78
94
description : str = None ,
79
- experiment_name : str = None ,
80
95
tags : List [Dict [str , str ]] = None ,
81
96
) -> Dict [str , Any ]:
82
97
"""Creates a Pipeline in the Pipelines service.
83
98
84
99
Args:
85
100
role_arn (str): The role arn that is assumed by the pipeline to create step artifacts.
86
101
description (str): A description of the pipeline.
87
- experiment_name (str): The name of the experiment.
88
102
tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as
89
103
tags.
90
104
@@ -96,7 +110,6 @@ def create(
96
110
kwargs = self ._create_args (role_arn , description )
97
111
update_args (
98
112
kwargs ,
99
- ExperimentName = experiment_name ,
100
113
Tags = tags ,
101
114
)
102
115
return self .sagemaker_session .sagemaker_client .create_pipeline (** kwargs )
@@ -106,7 +119,7 @@ def _create_args(self, role_arn: str, description: str):
106
119
107
120
Args:
108
121
role_arn (str): The role arn that is assumed by pipelines to create step artifacts.
109
- pipeline_description (str): A description of the pipeline.
122
+ description (str): A description of the pipeline.
110
123
111
124
Returns:
112
125
A keyword argument dict for calling create_pipeline.
@@ -147,23 +160,21 @@ def upsert(
147
160
self ,
148
161
role_arn : str ,
149
162
description : str = None ,
150
- experiment_name : str = None ,
151
163
tags : List [Dict [str , str ]] = None ,
152
164
) -> Dict [str , Any ]:
153
165
"""Creates a pipeline or updates it, if it already exists.
154
166
155
167
Args:
156
168
role_arn (str): The role arn that is assumed by workflow to create step artifacts.
157
- pipeline_description (str): A description of the pipeline.
158
- experiment_name (str): The name of the experiment.
169
+ description (str): A description of the pipeline.
159
170
tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as
160
171
tags.
161
172
162
173
Returns:
163
174
response dict from service
164
175
"""
165
176
try :
166
- response = self .create (role_arn , description , experiment_name , tags )
177
+ response = self .create (role_arn , description , tags )
167
178
except ClientError as e :
168
179
error = e .response ["Error" ]
169
180
if (
@@ -224,6 +235,9 @@ def start(
224
235
def definition (self ) -> str :
225
236
"""Converts a request structure to string representation for workflow service calls."""
226
237
request_dict = self .to_request ()
238
+ request_dict ["PipelineExperimentConfig" ] = interpolate (
239
+ request_dict ["PipelineExperimentConfig" ]
240
+ )
227
241
request_dict ["Steps" ] = interpolate (request_dict ["Steps" ])
228
242
229
243
return json .dumps (request_dict )
0 commit comments