16
16
import json
17
17
18
18
from copy import deepcopy
19
- from typing import Any , Dict , List , Sequence , Union
19
+ from typing import Any , Dict , List , Sequence , Union , Optional
20
20
21
21
import attr
22
22
import botocore
30
30
Expression ,
31
31
RequestType ,
32
32
)
33
+ from sagemaker .workflow .execution_variables import ExecutionVariables
33
34
from sagemaker .workflow .parameters import Parameter
35
+ from sagemaker .workflow .pipeline_experiment_config import PipelineExperimentConfig
34
36
from sagemaker .workflow .properties import Properties
35
37
from sagemaker .workflow .steps import Step
36
38
from sagemaker .workflow .step_collections import StepCollection
@@ -44,6 +46,11 @@ class Pipeline(Entity):
44
46
Attributes:
45
47
name (str): The name of the pipeline.
46
48
parameters (Sequence[Parameters]): The list of the parameters.
49
+ pipeline_experiment_config (Optional[PipelineExperimentConfig]): If set,
50
+ the workflow will attempt to create an experiment and trial before
51
+ executing the steps. Creation will be skipped if an experiment or a trial with
52
+ the same name already exists.
53
+ If set to None, no experiment or trial will be created automatically.
47
54
steps (Sequence[Steps]): The list of the non-conditional steps associated with the pipeline.
48
55
Any steps that are within the
49
56
`if_steps` or `else_steps` of a `ConditionStep` cannot be listed in the steps of a
@@ -57,6 +64,11 @@ class Pipeline(Entity):
57
64
58
65
name : str = attr .ib (factory = str )
59
66
parameters : Sequence [Parameter ] = attr .ib (factory = list )
67
+ pipeline_experiment_config : Optional [PipelineExperimentConfig ] = attr .ib (
68
+ default = PipelineExperimentConfig (
69
+ ExecutionVariables .PIPELINE_NAME , ExecutionVariables .PIPELINE_EXECUTION_ID
70
+ )
71
+ )
60
72
steps : Sequence [Union [Step , StepCollection ]] = attr .ib (factory = list )
61
73
sagemaker_session : Session = attr .ib (factory = Session )
62
74
@@ -69,22 +81,23 @@ def to_request(self) -> RequestType:
69
81
"Version" : self ._version ,
70
82
"Metadata" : self ._metadata ,
71
83
"Parameters" : list_to_request (self .parameters ),
84
+ "PipelineExperimentConfig" : self .pipeline_experiment_config .to_request ()
85
+ if self .pipeline_experiment_config is not None
86
+ else None ,
72
87
"Steps" : list_to_request (self .steps ),
73
88
}
74
89
75
90
def create (
76
91
self ,
77
92
role_arn : str ,
78
93
description : str = None ,
79
- experiment_name : str = None ,
80
94
tags : List [Dict [str , str ]] = None ,
81
95
) -> Dict [str , Any ]:
82
96
"""Creates a Pipeline in the Pipelines service.
83
97
84
98
Args:
85
99
role_arn (str): The role arn that is assumed by the pipeline to create step artifacts.
86
100
description (str): A description of the pipeline.
87
- experiment_name (str): The name of the experiment.
88
101
tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as
89
102
tags.
90
103
@@ -96,7 +109,6 @@ def create(
96
109
kwargs = self ._create_args (role_arn , description )
97
110
update_args (
98
111
kwargs ,
99
- ExperimentName = experiment_name ,
100
112
Tags = tags ,
101
113
)
102
114
return self .sagemaker_session .sagemaker_client .create_pipeline (** kwargs )
@@ -106,7 +118,7 @@ def _create_args(self, role_arn: str, description: str):
106
118
107
119
Args:
108
120
role_arn (str): The role arn that is assumed by pipelines to create step artifacts.
109
- pipeline_description (str): A description of the pipeline.
121
+ description (str): A description of the pipeline.
110
122
111
123
Returns:
112
124
A keyword argument dict for calling create_pipeline.
@@ -147,23 +159,21 @@ def upsert(
147
159
self ,
148
160
role_arn : str ,
149
161
description : str = None ,
150
- experiment_name : str = None ,
151
162
tags : List [Dict [str , str ]] = None ,
152
163
) -> Dict [str , Any ]:
153
164
"""Creates a pipeline or updates it, if it already exists.
154
165
155
166
Args:
156
167
role_arn (str): The role arn that is assumed by workflow to create step artifacts.
157
- pipeline_description (str): A description of the pipeline.
158
- experiment_name (str): The name of the experiment.
168
+ description (str): A description of the pipeline.
159
169
tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as
160
170
tags.
161
171
162
172
Returns:
163
173
response dict from service
164
174
"""
165
175
try :
166
- response = self .create (role_arn , description , experiment_name , tags )
176
+ response = self .create (role_arn , description , tags )
167
177
except ClientError as e :
168
178
error = e .response ["Error" ]
169
179
if (
@@ -224,6 +234,9 @@ def start(
224
234
def definition (self ) -> str :
225
235
"""Converts a request structure to string representation for workflow service calls."""
226
236
request_dict = self .to_request ()
237
+ request_dict ["PipelineExperimentConfig" ] = interpolate (
238
+ request_dict ["PipelineExperimentConfig" ]
239
+ )
227
240
request_dict ["Steps" ] = interpolate (request_dict ["Steps" ])
228
241
229
242
return json .dumps (request_dict )
0 commit comments