Skip to content

Commit e04e2e8

Browse files
brockwade633Brock Wademufaddal-rohawala
authored andcommitted
fix: FrameworkProcessor S3 uploads (aws#3493)
Co-authored-by: Brock Wade <[email protected]> Co-authored-by: Mufaddal Rohawala <[email protected]>
1 parent 6bc6136 commit e04e2e8

File tree

6 files changed

+322
-11
lines changed

6 files changed

+322
-11
lines changed

src/sagemaker/processing.py

+40-7
Original file line numberDiff line numberDiff line change
@@ -1741,13 +1741,7 @@ def _pack_and_upload_code(
17411741
raise RuntimeError("S3 source_dir file must be named `sourcedir.tar.gz.`")
17421742

17431743
script = estimator.uploaded_code.script_name
1744-
s3_runproc_sh = S3Uploader.upload_string_as_file_body(
1745-
self._generate_framework_script(script),
1746-
desired_s3_uri=entrypoint_s3_uri,
1747-
kms_key=kms_key,
1748-
sagemaker_session=self.sagemaker_session,
1749-
)
1750-
logger.info("runproc.sh uploaded to %s", s3_runproc_sh)
1744+
s3_runproc_sh = self._create_and_upload_runproc(script, kms_key, entrypoint_s3_uri)
17511745

17521746
return s3_runproc_sh, inputs, job_name
17531747

@@ -1857,3 +1851,42 @@ def _set_entrypoint(self, command, user_script_name):
18571851
)
18581852
)
18591853
self.entrypoint = self.framework_entrypoint_command + [user_script_location]
1854+
1855+
def _create_and_upload_runproc(self, user_script, kms_key, entrypoint_s3_uri):
1856+
"""Create runproc shell script and upload to S3 bucket.
1857+
1858+
If leveraging a pipeline session with optimized S3 artifact paths,
1859+
the runproc.sh file is hashed and uploaded to a separate S3 location.
1860+
1861+
1862+
Args:
1863+
user_script (str): Relative path to ```code``` in the source bundle
1864+
- e.g. 'process.py'.
1865+
kms_key (str): THe kms key used for encryption.
1866+
entrypoint_s3_uri (str): The S3 upload path for the runproc script.
1867+
"""
1868+
from sagemaker.workflow.utilities import _pipeline_config, hash_object
1869+
1870+
if _pipeline_config and _pipeline_config.pipeline_name:
1871+
runproc_file_str = self._generate_framework_script(user_script)
1872+
runproc_file_hash = hash_object(runproc_file_str)
1873+
s3_uri = (
1874+
f"s3://{self.sagemaker_session.default_bucket()}/{_pipeline_config.pipeline_name}/"
1875+
f"code/{runproc_file_hash}/runproc.sh"
1876+
)
1877+
s3_runproc_sh = S3Uploader.upload_string_as_file_body(
1878+
runproc_file_str,
1879+
desired_s3_uri=s3_uri,
1880+
kms_key=kms_key,
1881+
sagemaker_session=self.sagemaker_session,
1882+
)
1883+
else:
1884+
s3_runproc_sh = S3Uploader.upload_string_as_file_body(
1885+
self._generate_framework_script(user_script),
1886+
desired_s3_uri=entrypoint_s3_uri,
1887+
kms_key=kms_key,
1888+
sagemaker_session=self.sagemaker_session,
1889+
)
1890+
logger.info("runproc.sh uploaded to %s", s3_runproc_sh)
1891+
1892+
return s3_runproc_sh
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
"""
2+
Integ test file script_1.py
3+
"""
4+
import pathlib
5+
6+
if __name__ == "__main__":
7+
8+
print("writing file to /opt/ml/processing/test/test.py...")
9+
pathlib.Path("/opt/ml/processing/test").mkdir(parents=True, exist_ok=True)
10+
with open("/opt/ml/processing/test/test.py", "w") as f:
11+
f.write('print("test...")')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
"""
2+
Integ test file script_2.py
3+
"""
4+
5+
if __name__ == "__main__":
6+
7+
print("reading file: /opt/ml/procesing/test/test.py")
8+
with open("/opt/ml/processing/test/test.py", "r") as f:
9+
print(f.read())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
"""
2+
Integ test file script_2.py
3+
"""
4+
5+
if __name__ == "__main__":
6+
7+
print("reading file: /opt/ml/procesing/test/test.py")
8+
with open("/opt/ml/processing/test/test.py", "r") as f:
9+
print(f.read())

tests/integ/sagemaker/workflow/test_processing_steps.py

+246-3
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717
import re
1818
import subprocess
1919
from datetime import datetime
20+
from pathlib import Path
2021

2122
import pytest
2223
from botocore.exceptions import WaiterError
24+
from sagemaker.workflow.utilities import hash_files_or_dirs, hash_object
2325

2426
from sagemaker import image_uris, get_execution_role, utils
2527
from sagemaker.dataset_definition import DatasetDefinition, AthenaDatasetDefinition
26-
from sagemaker.processing import ProcessingInput, ProcessingOutput
27-
from sagemaker.s3 import S3Uploader
28-
from sagemaker.sklearn import SKLearnProcessor
28+
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor
29+
from sagemaker.s3 import S3Uploader, S3Downloader
30+
from sagemaker.sklearn import SKLearnProcessor, SKLearn
31+
from sagemaker.tensorflow import TensorFlow
2932
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
3033
from sagemaker.workflow.pipeline import Pipeline
3134
from sagemaker.workflow.steps import (
@@ -379,6 +382,203 @@ def test_one_step_framework_processing_pipeline(
379382
pass
380383

381384

385+
def test_multi_step_framework_processing_pipeline_same_source_dir(
386+
pipeline_session, role, pipeline_name
387+
):
388+
default_bucket = pipeline_session.default_bucket()
389+
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
390+
391+
SOURCE_DIR = "/pipeline/test_source_dir"
392+
393+
framework_processor_tf = FrameworkProcessor(
394+
role=role,
395+
instance_type="ml.m5.xlarge",
396+
instance_count=1,
397+
estimator_cls=TensorFlow,
398+
framework_version="2.9",
399+
py_version="py39",
400+
sagemaker_session=pipeline_session,
401+
)
402+
403+
framework_processor_sk = FrameworkProcessor(
404+
framework_version="1.0-1",
405+
instance_type="ml.m5.xlarge",
406+
instance_count=1,
407+
base_job_name="my-job",
408+
role=role,
409+
estimator_cls=SKLearn,
410+
sagemaker_session=pipeline_session,
411+
)
412+
413+
step_1 = ProcessingStep(
414+
name="Step-1",
415+
step_args=framework_processor_tf.run(
416+
code="script_1.py",
417+
source_dir=DATA_DIR + SOURCE_DIR,
418+
outputs=[ProcessingOutput(output_name="test", source="/opt/ml/processing/test")],
419+
),
420+
cache_config=cache_config,
421+
)
422+
423+
step_2 = ProcessingStep(
424+
name="Step-2",
425+
step_args=framework_processor_sk.run(
426+
code="script_2.py",
427+
source_dir=DATA_DIR + SOURCE_DIR,
428+
inputs=[
429+
ProcessingInput(
430+
source=step_1.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
431+
destination="/opt/ml/processing/test",
432+
),
433+
],
434+
),
435+
cache_config=cache_config,
436+
)
437+
438+
pipeline = Pipeline(
439+
name=pipeline_name, steps=[step_1, step_2], sagemaker_session=pipeline_session
440+
)
441+
try:
442+
pipeline.create(role)
443+
definition = json.loads(pipeline.definition())
444+
445+
source_dir_1_s3_uri, entry_point_1 = _verify_code_artifacts_of_framework_processing_step(
446+
pipeline_session,
447+
framework_processor_tf,
448+
default_bucket,
449+
pipeline_name,
450+
definition["Steps"][0],
451+
SOURCE_DIR,
452+
"script_1.py",
453+
)
454+
source_dir_2_s3_uri, entry_point_2 = _verify_code_artifacts_of_framework_processing_step(
455+
pipeline_session,
456+
framework_processor_sk,
457+
default_bucket,
458+
pipeline_name,
459+
definition["Steps"][1],
460+
SOURCE_DIR,
461+
"script_2.py",
462+
)
463+
464+
# the same local source_dirs should have the same s3 paths
465+
assert source_dir_1_s3_uri == source_dir_2_s3_uri
466+
467+
# verify different entry_point paths
468+
assert entry_point_1 != entry_point_2
469+
470+
execution = pipeline.start(parameters={})
471+
try:
472+
execution.wait(delay=540, max_attempts=3)
473+
except WaiterError:
474+
pass
475+
476+
execution_steps = execution.list_steps()
477+
assert len(execution_steps) == 2
478+
for step in execution_steps:
479+
assert step["StepStatus"] == "Succeeded"
480+
481+
finally:
482+
try:
483+
pipeline.delete()
484+
except Exception:
485+
pass
486+
487+
488+
def test_multi_step_framework_processing_pipeline_different_source_dir(
489+
pipeline_session, role, pipeline_name
490+
):
491+
default_bucket = pipeline_session.default_bucket()
492+
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
493+
494+
SOURCE_DIR_1 = "/pipeline/test_source_dir"
495+
SOURCE_DIR_2 = "/pipeline/test_source_dir_2"
496+
497+
framework_processor_tf = FrameworkProcessor(
498+
role=role,
499+
instance_type="ml.m5.xlarge",
500+
instance_count=1,
501+
estimator_cls=TensorFlow,
502+
framework_version="2.9",
503+
py_version="py39",
504+
sagemaker_session=pipeline_session,
505+
)
506+
507+
step_1 = ProcessingStep(
508+
name="Step-1",
509+
step_args=framework_processor_tf.run(
510+
code="script_1.py",
511+
source_dir=DATA_DIR + SOURCE_DIR_1,
512+
outputs=[ProcessingOutput(output_name="test", source="/opt/ml/processing/test")],
513+
),
514+
cache_config=cache_config,
515+
)
516+
517+
step_2 = ProcessingStep(
518+
name="Step-2",
519+
step_args=framework_processor_tf.run(
520+
code="script_2.py",
521+
source_dir=DATA_DIR + SOURCE_DIR_2,
522+
inputs=[
523+
ProcessingInput(
524+
source=step_1.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
525+
destination="/opt/ml/processing/test",
526+
),
527+
],
528+
),
529+
cache_config=cache_config,
530+
)
531+
532+
pipeline = Pipeline(
533+
name=pipeline_name, steps=[step_1, step_2], sagemaker_session=pipeline_session
534+
)
535+
try:
536+
pipeline.create(role)
537+
definition = json.loads(pipeline.definition())
538+
539+
source_dir_1_s3_uri, entry_point_1 = _verify_code_artifacts_of_framework_processing_step(
540+
pipeline_session,
541+
framework_processor_tf,
542+
default_bucket,
543+
pipeline_name,
544+
definition["Steps"][0],
545+
SOURCE_DIR_1,
546+
"script_1.py",
547+
)
548+
source_dir_2_s3_uri, entry_point_2 = _verify_code_artifacts_of_framework_processing_step(
549+
pipeline_session,
550+
framework_processor_tf,
551+
default_bucket,
552+
pipeline_name,
553+
definition["Steps"][1],
554+
SOURCE_DIR_2,
555+
"script_2.py",
556+
)
557+
558+
# different local source_dirs should have different s3 paths
559+
assert source_dir_1_s3_uri != source_dir_2_s3_uri
560+
561+
# verify different entry_point paths
562+
assert entry_point_1 != entry_point_2
563+
564+
execution = pipeline.start(parameters={})
565+
try:
566+
execution.wait(delay=540, max_attempts=3)
567+
except WaiterError:
568+
pass
569+
570+
execution_steps = execution.list_steps()
571+
assert len(execution_steps) == 2
572+
for step in execution_steps:
573+
assert step["StepStatus"] == "Succeeded"
574+
575+
finally:
576+
try:
577+
pipeline.delete()
578+
except Exception:
579+
pass
580+
581+
382582
def test_one_step_pyspark_processing_pipeline(
383583
sagemaker_session,
384584
role,
@@ -796,3 +996,46 @@ def test_two_processing_job_depends_on(
796996
pipeline.delete()
797997
except Exception:
798998
pass
999+
1000+
1001+
def _verify_code_artifacts_of_framework_processing_step(
1002+
pipeline_session, processor, bucket, pipeline_name, step_definition, source_dir, entry_point
1003+
):
1004+
1005+
source_dir_s3_uri = (
1006+
f"s3://{bucket}/{pipeline_name}" f"/code/{hash_files_or_dirs([f'{DATA_DIR}/{source_dir}'])}"
1007+
)
1008+
1009+
# verify runproc.sh prefix is different from code artifact prefix
1010+
runprocs = []
1011+
for input_obj in step_definition["Arguments"]["ProcessingInputs"]:
1012+
if input_obj["InputName"] == "entrypoint":
1013+
s3_uri = input_obj["S3Input"]["S3Uri"]
1014+
runprocs.append(s3_uri)
1015+
1016+
assert Path(s3_uri).parent != source_dir_s3_uri
1017+
1018+
# verify only one entrypoint generated per step
1019+
assert len(runprocs) == 1
1020+
1021+
expected_source_dir_tar = (
1022+
f"{pipeline_name}"
1023+
f"/code/{hash_files_or_dirs([DATA_DIR + '/pipeline/test_source_dir'])}/sourcedir.tar.gz"
1024+
)
1025+
1026+
step_script = processor._generate_framework_script(entry_point)
1027+
expected_step_artifact = f"{pipeline_name}/code/{hash_object(step_script)}/runproc.sh"
1028+
1029+
expected_prefix = f"{pipeline_name}/code"
1030+
s3_code_objects = pipeline_session.list_s3_files(bucket=bucket, key_prefix=expected_prefix)
1031+
1032+
# verify all distinct artifacts were uploaded
1033+
assert expected_source_dir_tar in s3_code_objects
1034+
assert expected_step_artifact in s3_code_objects
1035+
1036+
# verify runprocs contain the correct commands
1037+
step_runproc = S3Downloader.read_file(
1038+
f"s3://{bucket}/{expected_step_artifact}", pipeline_session
1039+
)
1040+
assert f"python {entry_point}" in step_runproc
1041+
return source_dir, expected_step_artifact

tests/integ/sagemaker/workflow/test_workflow.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -1168,7 +1168,13 @@ def walk():
11681168

11691169

11701170
def test_caching_behavior(
1171-
pipeline_session, role, cpu_instance_type, pipeline_name, script_dir, athena_dataset_definition
1171+
pipeline_session,
1172+
role,
1173+
cpu_instance_type,
1174+
pipeline_name,
1175+
script_dir,
1176+
athena_dataset_definition,
1177+
region_name,
11721178
):
11731179
default_bucket = pipeline_session.default_bucket()
11741180
data_path = os.path.join(DATA_DIR, "workflow")

0 commit comments

Comments
 (0)