24
24
25
25
import pandas as pd
26
26
27
+ from sagemaker .utils import retry_with_backoff
27
28
from tests .integ .sagemaker .workflow .helpers import wait_pipeline_execution
28
29
from tests .integ .s3_utils import extract_files_from_s3
29
30
from sagemaker .workflow .model_step import (
@@ -1002,7 +1003,7 @@ def test_create_and_update_with_parallelism_config(
1002
1003
assert response ["ParallelismConfiguration" ]["MaxParallelExecutionSteps" ] == 50
1003
1004
1004
1005
pipeline .parameters = [ParameterInteger (name = "InstanceCount" , default_value = 1 )]
1005
- response = pipeline .update (role , parallelism_config = {"MaxParallelExecutionSteps" : 55 })
1006
+ response = pipeline .upsert (role , parallelism_config = {"MaxParallelExecutionSteps" : 55 })
1006
1007
update_arn = response ["PipelineArn" ]
1007
1008
assert re .match (
1008
1009
rf"arn:aws:sagemaker:{ region_name } :\d{{12}}:pipeline/{ pipeline_name } " ,
@@ -1019,6 +1020,100 @@ def test_create_and_update_with_parallelism_config(
1019
1020
pass
1020
1021
1021
1022
1023
+ def test_create_and_start_without_parallelism_config_override (
1024
+ pipeline_session , role , pipeline_name , script_dir
1025
+ ):
1026
+ sklearn_train = SKLearn (
1027
+ framework_version = "0.20.0" ,
1028
+ entry_point = os .path .join (script_dir , "train.py" ),
1029
+ instance_type = "ml.m5.xlarge" ,
1030
+ sagemaker_session = pipeline_session ,
1031
+ role = role ,
1032
+ )
1033
+
1034
+ train_steps = [
1035
+ TrainingStep (
1036
+ name = f"my-train-{ count } " ,
1037
+ display_name = "TrainingStep" ,
1038
+ description = "description for Training step" ,
1039
+ step_args = sklearn_train .fit (),
1040
+ )
1041
+ for count in range (2 )
1042
+ ]
1043
+ pipeline = Pipeline (
1044
+ name = pipeline_name ,
1045
+ steps = train_steps ,
1046
+ sagemaker_session = pipeline_session ,
1047
+ )
1048
+
1049
+ try :
1050
+ pipeline .create (role , parallelism_config = dict (MaxParallelExecutionSteps = 1 ))
1051
+ # No ParallelismConfiguration given in pipeline.start, so it won't override that in pipeline.create
1052
+ execution = pipeline .start (parallelism_config = dict (MaxParallelExecutionSteps = 2 ))
1053
+
1054
+ def validate ():
1055
+ # Only one step would be scheduled initially
1056
+ assert len (execution .list_steps ()) == 1
1057
+
1058
+ retry_with_backoff (validate , num_attempts = 4 )
1059
+
1060
+ wait_pipeline_execution (execution = execution )
1061
+
1062
+ finally :
1063
+ try :
1064
+ pipeline .delete ()
1065
+ except Exception :
1066
+ pass
1067
+
1068
+
1069
+ def test_create_and_start_with_parallelism_config_override (
1070
+ pipeline_session , role , pipeline_name , script_dir
1071
+ ):
1072
+ sklearn_train = SKLearn (
1073
+ framework_version = "0.20.0" ,
1074
+ entry_point = os .path .join (script_dir , "train.py" ),
1075
+ instance_type = "ml.m5.xlarge" ,
1076
+ sagemaker_session = pipeline_session ,
1077
+ role = role ,
1078
+ )
1079
+
1080
+ train_steps = [
1081
+ TrainingStep (
1082
+ name = f"my-train-{ count } " ,
1083
+ display_name = "TrainingStep" ,
1084
+ description = "description for Training step" ,
1085
+ step_args = sklearn_train .fit (),
1086
+ )
1087
+ for count in range (2 )
1088
+ ]
1089
+ pipeline = Pipeline (
1090
+ name = pipeline_name ,
1091
+ steps = train_steps ,
1092
+ sagemaker_session = pipeline_session ,
1093
+ )
1094
+
1095
+ try :
1096
+ pipeline .create (role , parallelism_config = dict (MaxParallelExecutionSteps = 1 ))
1097
+ # Override ParallelismConfiguration in pipeline.start to None
1098
+ # so in backend, the default 100 will be applied
1099
+ execution = pipeline .start (parallelism_config = dict (MaxParallelExecutionSteps = 2 ))
1100
+
1101
+ def validate ():
1102
+ assert len (execution .list_steps ()) == 2
1103
+ for step in execution .list_steps ():
1104
+ assert step ["StepStatus" ] == "Executing"
1105
+
1106
+ retry_with_backoff (validate , num_attempts = 4 )
1107
+
1108
+ wait_pipeline_execution (execution = execution )
1109
+
1110
+ finally :
1111
+ try :
1112
+ pipeline .delete ()
1113
+ except Exception :
1114
+ pass
1115
+
1116
+
1022
1117
def test_model_registration_with_tuning_model (
1023
1118
pipeline_session ,
1024
1119
role ,
0 commit comments