90
90
)
91
91
from sagemaker .workflow .step_collections import RegisterModel
92
92
from sagemaker .workflow .pipeline import Pipeline
93
- from sagemaker .workflow .utilities import hash_files_or_dirs
93
+ from sagemaker .workflow .utilities import hash_files_or_dirs , hash_object
94
94
from sagemaker .feature_store .feature_group import (
95
95
FeatureGroup ,
96
96
FeatureDefinition ,
@@ -1425,7 +1425,6 @@ def test_multi_step_framework_processing_pipeline_uploads(
1425
1425
)
1426
1426
try :
1427
1427
pipeline .create (role )
1428
-
1429
1428
definition = json .loads (pipeline .definition ())
1430
1429
1431
1430
source_dir_tar_prefix = (
@@ -1434,7 +1433,6 @@ def test_multi_step_framework_processing_pipeline_uploads(
1434
1433
)
1435
1434
1436
1435
run_procs = []
1437
-
1438
1436
for step in definition ["Steps" ]:
1439
1437
for input_obj in step ["Arguments" ]["ProcessingInputs" ]:
1440
1438
if input_obj ["InputName" ] == "entrypoint" :
@@ -1447,6 +1445,43 @@ def test_multi_step_framework_processing_pipeline_uploads(
1447
1445
# verify all the run_proc.sh artifact paths are distinct
1448
1446
assert len (run_procs ) == len (set (run_procs ))
1449
1447
1448
+ expected_source_dir_tar = (
1449
+ f"{ pipeline_name } "
1450
+ f"/code/{ hash_files_or_dirs ([DATA_DIR + '/framework_processor_data' ])} /sourcedir.tar.gz"
1451
+ )
1452
+ expected_query_step_artifact = (
1453
+ f"{ pipeline_name } /"
1454
+ f"code/{ hash_files_or_dirs ([DATA_DIR + '/framework_processor_data/query_data.py' ])} /"
1455
+ f"query_data.py"
1456
+ )
1457
+
1458
+ prepare_step_script = data_processor ._generate_framework_script ("preprocess.py" )
1459
+ expected_prepare_step_artifact = (
1460
+ f"{ pipeline_name } /" f"code/{ hash_object (prepare_step_script )} /runproc.sh"
1461
+ )
1462
+
1463
+ split_step_script = data_processor ._generate_framework_script ("train_test_split.py" )
1464
+ expected_split_step_artifact = (
1465
+ f"{ pipeline_name } /" f"code/{ hash_object (split_step_script )} /runproc.sh"
1466
+ )
1467
+
1468
+ eval_step_script = sk_processor ._generate_framework_script ("evaluate.py" )
1469
+ expected_eval_step_artifact = (
1470
+ f"{ pipeline_name } /" f"code/{ hash_object (eval_step_script )} /runproc.sh"
1471
+ )
1472
+
1473
+ expected_prefix = f"{ pipeline_name } /code"
1474
+ s3_code_objects = pipeline_session .list_s3_files (
1475
+ bucket = default_bucket , key_prefix = expected_prefix
1476
+ )
1477
+
1478
+ # verify
1479
+ assert expected_source_dir_tar in s3_code_objects
1480
+ assert expected_query_step_artifact in s3_code_objects
1481
+ assert expected_prepare_step_artifact in s3_code_objects
1482
+ assert expected_split_step_artifact in s3_code_objects
1483
+ assert expected_eval_step_artifact in s3_code_objects
1484
+
1450
1485
finally :
1451
1486
try :
1452
1487
pipeline .delete ()
0 commit comments