@@ -437,27 +437,28 @@ def test_multi_step_framework_processing_pipeline_same_source_dir(
437
437
pipeline .create (role )
438
438
definition = json .loads (pipeline .definition ())
439
439
440
- source_dir_1_s3_uri , entry_point_1 = _verify_code_artifacts_of_framework_processing_step (
440
+ source_dir_1_tar_uri , entry_point_1 = _verify_code_artifacts_of_framework_processing_step (
441
441
pipeline_session ,
442
442
framework_processor_tf ,
443
443
default_bucket ,
444
444
pipeline_name ,
445
445
definition ["Steps" ][0 ],
446
- SOURCE_DIR ,
446
+ DATA_DIR + SOURCE_DIR ,
447
447
"script_1.py" ,
448
448
)
449
- source_dir_2_s3_uri , entry_point_2 = _verify_code_artifacts_of_framework_processing_step (
449
+
450
+ source_dir_2_tar_uri , entry_point_2 = _verify_code_artifacts_of_framework_processing_step (
450
451
pipeline_session ,
451
452
framework_processor_sk ,
452
453
default_bucket ,
453
454
pipeline_name ,
454
455
definition ["Steps" ][1 ],
455
- SOURCE_DIR ,
456
+ DATA_DIR + SOURCE_DIR ,
456
457
"script_2.py" ,
457
458
)
458
459
459
- # the same local source_dirs should have the same s3 paths
460
- assert source_dir_1_s3_uri == source_dir_2_s3_uri
460
+ # the tarred source dirs should have a different s3 uri since the entry_point code is different
461
+ assert source_dir_1_tar_uri != source_dir_2_tar_uri
461
462
462
463
# verify different entry_point paths
463
464
assert entry_point_1 != entry_point_2
@@ -528,31 +529,49 @@ def test_multi_step_framework_processing_pipeline_different_source_dir(
528
529
pipeline .create (role )
529
530
definition = json .loads (pipeline .definition ())
530
531
531
- source_dir_1_s3_uri , entry_point_1 = _verify_code_artifacts_of_framework_processing_step (
532
+ source_dir_1_tar_uri , entry_point_1 = _verify_code_artifacts_of_framework_processing_step (
532
533
pipeline_session ,
533
534
framework_processor_tf ,
534
535
default_bucket ,
535
536
pipeline_name ,
536
537
definition ["Steps" ][0 ],
537
- SOURCE_DIR_1 ,
538
+ DATA_DIR + SOURCE_DIR_1 ,
538
539
"script_1.py" ,
539
540
)
540
- source_dir_2_s3_uri , entry_point_2 = _verify_code_artifacts_of_framework_processing_step (
541
+
542
+ source_dir_2_tar_uri , entry_point_2 = _verify_code_artifacts_of_framework_processing_step (
541
543
pipeline_session ,
542
544
framework_processor_tf ,
543
545
default_bucket ,
544
546
pipeline_name ,
545
547
definition ["Steps" ][1 ],
546
- SOURCE_DIR_2 ,
548
+ DATA_DIR + SOURCE_DIR_2 ,
547
549
"script_2.py" ,
548
550
)
549
551
550
- # different local source_dirs should have different s3 paths
551
- assert source_dir_1_s3_uri != source_dir_2_s3_uri
552
+ # the tarred source dirs should have a different s3 uri since the source_dirs and entry_point code are different
553
+ assert source_dir_1_tar_uri != source_dir_2_tar_uri
552
554
553
555
# verify different entry_point paths
554
556
assert entry_point_1 != entry_point_2
555
557
558
+ # define another step with the same source_dir and entry_point as the second step
559
+ source_dir_3_tar_uri , entry_point_3 = _verify_code_artifacts_of_framework_processing_step (
560
+ pipeline_session ,
561
+ framework_processor_tf ,
562
+ default_bucket ,
563
+ pipeline_name ,
564
+ definition ["Steps" ][1 ],
565
+ DATA_DIR + SOURCE_DIR_2 ,
566
+ "script_2.py" ,
567
+ )
568
+
569
+ # verify the same entry_point paths
570
+ assert entry_point_2 == entry_point_3
571
+
572
+ # the tarred source dirs should now be the same since the source_dirs and entry_point are the same
573
+ assert source_dir_2_tar_uri == source_dir_3_tar_uri
574
+
556
575
execution = pipeline .start (parameters = {})
557
576
wait_pipeline_execution (execution = execution , delay = 540 , max_attempts = 3 )
558
577
@@ -975,13 +994,19 @@ def test_two_processing_job_depends_on(
975
994
pass
976
995
977
996
997
+ # Verifies that the processing step artifacts are created as expected.
998
+ # Requires that source_dir and entry_point are exactly those passed to the processing step.
978
999
def _verify_code_artifacts_of_framework_processing_step (
979
1000
pipeline_session , processor , bucket , pipeline_name , step_definition , source_dir , entry_point
980
1001
):
981
1002
982
- source_dir_s3_uri = (
983
- f"s3://{ bucket } /{ pipeline_name } " f"/code/{ hash_files_or_dirs ([f'{ DATA_DIR } /{ source_dir } ' ])} "
984
- )
1003
+ files_to_hash = []
1004
+ if entry_point is not None :
1005
+ files_to_hash .append (source_dir )
1006
+ files_to_hash .append (entry_point )
1007
+ file_hash = hash_files_or_dirs (files_to_hash )
1008
+
1009
+ source_dir_s3_uri = f"s3://{ bucket } /{ pipeline_name } /code/{ file_hash } "
985
1010
986
1011
# verify runproc.sh prefix is different from code artifact prefix
987
1012
runprocs = []
@@ -995,10 +1020,7 @@ def _verify_code_artifacts_of_framework_processing_step(
995
1020
# verify only one entrypoint generated per step
996
1021
assert len (runprocs ) == 1
997
1022
998
- expected_source_dir_tar = (
999
- f"{ pipeline_name } "
1000
- f"/code/{ hash_files_or_dirs ([DATA_DIR + '/pipeline/test_source_dir' ])} /sourcedir.tar.gz"
1001
- )
1023
+ expected_source_dir_tar = f"{ pipeline_name } /code/{ file_hash } /sourcedir.tar.gz"
1002
1024
1003
1025
step_script = processor ._generate_framework_script (entry_point )
1004
1026
expected_step_artifact = f"{ pipeline_name } /code/{ hash_object (step_script )} /runproc.sh"
@@ -1015,4 +1037,4 @@ def _verify_code_artifacts_of_framework_processing_step(
1015
1037
f"s3://{ bucket } /{ expected_step_artifact } " , pipeline_session
1016
1038
)
1017
1039
assert f"python { entry_point } " in step_runproc
1018
- return source_dir , expected_step_artifact
1040
+ return expected_source_dir_tar , expected_step_artifact
0 commit comments