16
16
17
17
import pytest
18
18
19
+ from sagemaker .processing import ProcessingInput
19
20
from tests .integ import DATA_DIR
20
21
from sagemaker .sklearn import SKLearnProcessor
21
22
from sagemaker .workflow .step_outputs import get_step
@@ -84,7 +85,7 @@ def sum(a, b):
84
85
region_name = region_name ,
85
86
role = role ,
86
87
no_of_steps = 2 ,
87
- last_step_name = "sum" ,
88
+ last_step_name_prefix = "sum" ,
88
89
execution_parameters = dict (),
89
90
step_status = "Succeeded" ,
90
91
step_result_type = int ,
@@ -97,7 +98,7 @@ def sum(a, b):
97
98
region_name = region_name ,
98
99
role = role ,
99
100
no_of_steps = 2 ,
100
- last_step_name = "sum" ,
101
+ last_step_name_prefix = "sum" ,
101
102
execution_parameters = dict (),
102
103
step_status = "Succeeded" ,
103
104
step_result_type = int ,
@@ -115,7 +116,7 @@ def sum(a, b):
115
116
pass
116
117
117
118
118
- def test_selective_execution_of_regular_step_depended_by_function_step (
119
+ def test_selective_execution_of_regular_step_referenced_by_function_step (
119
120
sagemaker_session ,
120
121
role ,
121
122
pipeline_name ,
@@ -168,7 +169,7 @@ def func_2(arg):
168
169
region_name = region_name ,
169
170
role = role ,
170
171
no_of_steps = 2 ,
171
- last_step_name = "func" ,
172
+ last_step_name_prefix = "func" ,
172
173
execution_parameters = dict (),
173
174
step_status = "Succeeded" ,
174
175
step_result_type = str ,
@@ -182,7 +183,7 @@ def func_2(arg):
182
183
region_name = region_name ,
183
184
role = role ,
184
185
no_of_steps = 2 ,
185
- last_step_name = "func" ,
186
+ last_step_name_prefix = "func" ,
186
187
execution_parameters = dict (),
187
188
step_status = "Succeeded" ,
188
189
step_result_type = str ,
@@ -199,3 +200,102 @@ def func_2(arg):
199
200
pipeline .delete ()
200
201
except Exception :
201
202
pass
203
+
204
+
205
+ def test_selective_execution_of_function_step_referenced_by_regular_step (
206
+ pipeline_session ,
207
+ role ,
208
+ pipeline_name ,
209
+ region_name ,
210
+ dummy_container_without_error ,
211
+ sklearn_latest_version ,
212
+ ):
213
+ # Test Selective Pipeline Execution on function step -> [select: regular step]
214
+ os .environ ["AWS_DEFAULT_REGION" ] = region_name
215
+ processing_job_instance_counts = 2
216
+
217
+ @step (
218
+ name = "step1" ,
219
+ role = role ,
220
+ image_uri = dummy_container_without_error ,
221
+ instance_type = INSTANCE_TYPE ,
222
+ keep_alive_period_in_seconds = 60 ,
223
+ )
224
+ def func (var : int ):
225
+ return 1 , var
226
+
227
+ step_output = func (processing_job_instance_counts )
228
+
229
+ script_path = os .path .join (DATA_DIR , "dummy_script.py" )
230
+ input_file_path = os .path .join (DATA_DIR , "dummy_input.txt" )
231
+ inputs = [
232
+ ProcessingInput (source = input_file_path , destination = "/opt/ml/processing/inputs/" ),
233
+ ]
234
+
235
+ sklearn_processor = SKLearnProcessor (
236
+ framework_version = sklearn_latest_version ,
237
+ role = role ,
238
+ instance_type = INSTANCE_TYPE ,
239
+ instance_count = step_output [1 ],
240
+ command = ["python3" ],
241
+ sagemaker_session = pipeline_session ,
242
+ base_job_name = "test-sklearn" ,
243
+ )
244
+
245
+ step_args = sklearn_processor .run (
246
+ inputs = inputs ,
247
+ code = script_path ,
248
+ )
249
+ process_step = ProcessingStep (
250
+ name = "MyProcessStep" ,
251
+ step_args = step_args ,
252
+ )
253
+
254
+ pipeline = Pipeline (
255
+ name = pipeline_name ,
256
+ steps = [process_step ],
257
+ sagemaker_session = pipeline_session ,
258
+ )
259
+
260
+ try :
261
+ execution , _ = create_and_execute_pipeline (
262
+ pipeline = pipeline ,
263
+ pipeline_name = pipeline_name ,
264
+ region_name = region_name ,
265
+ role = role ,
266
+ no_of_steps = 2 ,
267
+ last_step_name_prefix = process_step .name ,
268
+ execution_parameters = dict (),
269
+ step_status = "Succeeded" ,
270
+ wait_duration = 1000 , # seconds
271
+ )
272
+
273
+ _ , execution_steps2 = create_and_execute_pipeline (
274
+ pipeline = pipeline ,
275
+ pipeline_name = pipeline_name ,
276
+ region_name = region_name ,
277
+ role = role ,
278
+ no_of_steps = 2 ,
279
+ last_step_name_prefix = process_step .name ,
280
+ execution_parameters = dict (),
281
+ step_status = "Succeeded" ,
282
+ wait_duration = 1000 , # seconds
283
+ selective_execution_config = SelectiveExecutionConfig (
284
+ source_pipeline_execution_arn = execution .arn ,
285
+ selected_steps = [process_step .name ],
286
+ ),
287
+ )
288
+
289
+ execution_proc_job = pipeline_session .describe_processing_job (
290
+ execution_steps2 [0 ]["Metadata" ]["ProcessingJob" ]["Arn" ].split ("/" )[- 1 ]
291
+ )
292
+ assert (
293
+ execution_proc_job ["ProcessingResources" ]["ClusterConfig" ]["InstanceCount" ]
294
+ == processing_job_instance_counts
295
+ )
296
+
297
+ finally :
298
+ try :
299
+ pipeline .delete ()
300
+ except Exception :
301
+ pass
0 commit comments