16
16
import json
17
17
18
18
from copy import deepcopy
19
- from typing import Any , Dict , List , Sequence , Union
19
+ from typing import Any , Dict , List , Sequence , Union , Optional
20
20
21
21
import attr
22
22
import botocore
30
30
Expression ,
31
31
RequestType ,
32
32
)
33
+ from sagemaker .workflow .execution_variables import ExecutionVariables
33
34
from sagemaker .workflow .parameters import Parameter
35
+ from sagemaker .workflow .pipeline_experiment_config import PipelineExperimentConfig
34
36
from sagemaker .workflow .properties import Properties
35
37
from sagemaker .workflow .steps import Step
36
38
from sagemaker .workflow .step_collections import StepCollection
@@ -44,6 +46,11 @@ class Pipeline(Entity):
44
46
Attributes:
45
47
name (str): The name of the pipeline.
46
48
parameters (Sequence[Parameters]): The list of the parameters.
49
+ pipeline_experiment_config (Optional[PipelineExperimentConfig]): If set,
50
+ the workflow will attempt to create an experiment and trial before
51
+ executing the steps. Creation will be skipped if an experiment or a trial with
52
+ the same name already exists.
53
+ If set to None, no experiment or trial will be created automatically.
47
54
steps (Sequence[Steps]): The list of the non-conditional steps associated with the pipeline.
48
55
Any steps that are within the
49
56
`if_steps` or `else_steps` of a `ConditionStep` cannot be listed in the steps of a
@@ -57,6 +64,11 @@ class Pipeline(Entity):
57
64
58
65
name : str = attr .ib (factory = str )
59
66
parameters : Sequence [Parameter ] = attr .ib (factory = list )
67
+ pipeline_experiment_config : Optional [PipelineExperimentConfig ] = attr .ib (
68
+ default = PipelineExperimentConfig (
69
+ ExecutionVariables .PIPELINE_NAME , ExecutionVariables .PIPELINE_EXECUTION_ID
70
+ )
71
+ )
60
72
steps : Sequence [Union [Step , StepCollection ]] = attr .ib (factory = list )
61
73
sagemaker_session : Session = attr .ib (factory = Session )
62
74
@@ -69,22 +81,22 @@ def to_request(self) -> RequestType:
69
81
"Version" : self ._version ,
70
82
"Metadata" : self ._metadata ,
71
83
"Parameters" : list_to_request (self .parameters ),
84
+ "PipelineExperimentConfig" :
85
+ self .pipeline_experiment_config .to_request if self .pipeline_experiment_config is not None else None ,
72
86
"Steps" : list_to_request (self .steps ),
73
87
}
74
88
75
89
def create (
76
90
self ,
77
91
role_arn : str ,
78
92
description : str = None ,
79
- experiment_name : str = None ,
80
93
tags : List [Dict [str , str ]] = None ,
81
94
) -> Dict [str , Any ]:
82
95
"""Creates a Pipeline in the Pipelines service.
83
96
84
97
Args:
85
98
role_arn (str): The role arn that is assumed by the pipeline to create step artifacts.
86
99
description (str): A description of the pipeline.
87
- experiment_name (str): The name of the experiment.
88
100
tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as
89
101
tags.
90
102
@@ -96,7 +108,6 @@ def create(
96
108
kwargs = self ._create_args (role_arn , description )
97
109
update_args (
98
110
kwargs ,
99
- ExperimentName = experiment_name ,
100
111
Tags = tags ,
101
112
)
102
113
return self .sagemaker_session .sagemaker_client .create_pipeline (** kwargs )
@@ -106,7 +117,7 @@ def _create_args(self, role_arn: str, description: str):
106
117
107
118
Args:
108
119
role_arn (str): The role arn that is assumed by pipelines to create step artifacts.
109
- pipeline_description (str): A description of the pipeline.
120
+ description (str): A description of the pipeline.
110
121
111
122
Returns:
112
123
A keyword argument dict for calling create_pipeline.
@@ -147,23 +158,21 @@ def upsert(
147
158
self ,
148
159
role_arn : str ,
149
160
description : str = None ,
150
- experiment_name : str = None ,
151
161
tags : List [Dict [str , str ]] = None ,
152
162
) -> Dict [str , Any ]:
153
163
"""Creates a pipeline or updates it, if it already exists.
154
164
155
165
Args:
156
166
role_arn (str): The role arn that is assumed by workflow to create step artifacts.
157
- pipeline_description (str): A description of the pipeline.
158
- experiment_name (str): The name of the experiment.
167
+ description (str): A description of the pipeline.
159
168
tags (List[Dict[str, str]]): A list of {"Key": "string", "Value": "string"} dicts as
160
169
tags.
161
170
162
171
Returns:
163
172
response dict from service
164
173
"""
165
174
try :
166
- response = self .create (role_arn , description , experiment_name , tags )
175
+ response = self .create (role_arn , description , tags )
167
176
except ClientError as e :
168
177
error = e .response ["Error" ]
169
178
if (
@@ -224,6 +233,7 @@ def start(
224
233
def definition (self ) -> str :
225
234
"""Converts a request structure to string representation for workflow service calls."""
226
235
request_dict = self .to_request ()
236
+ request_dict ["PipelineExperimentConfig" ] = interpolate (request_dict ["PipelineExperimentConfig" ])
227
237
request_dict ["Steps" ] = interpolate (request_dict ["Steps" ])
228
238
229
239
return json .dumps (request_dict )
0 commit comments