Skip to content

fix: FrameworkProcessor S3 uploads #3493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions src/sagemaker/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1741,13 +1741,7 @@ def _pack_and_upload_code(
raise RuntimeError("S3 source_dir file must be named `sourcedir.tar.gz.`")

script = estimator.uploaded_code.script_name
s3_runproc_sh = S3Uploader.upload_string_as_file_body(
self._generate_framework_script(script),
desired_s3_uri=entrypoint_s3_uri,
kms_key=kms_key,
sagemaker_session=self.sagemaker_session,
)
logger.info("runproc.sh uploaded to %s", s3_runproc_sh)
s3_runproc_sh = self._create_and_upload_runproc(script, kms_key, entrypoint_s3_uri)

return s3_runproc_sh, inputs, job_name

Expand Down Expand Up @@ -1857,3 +1851,42 @@ def _set_entrypoint(self, command, user_script_name):
)
)
self.entrypoint = self.framework_entrypoint_command + [user_script_location]

def _create_and_upload_runproc(self, user_script, kms_key, entrypoint_s3_uri):
"""Create runproc shell script and upload to S3 bucket.

If leveraging a pipeline session with optimized S3 artifact paths,
the runproc.sh file is hashed and uploaded to a separate S3 location.


Args:
user_script (str): Relative path to ```code``` in the source bundle
- e.g. 'process.py'.
kms_key (str): THe kms key used for encryption.
entrypoint_s3_uri (str): The S3 upload path for the runproc script.
"""
from sagemaker.workflow.utilities import _pipeline_config, hash_object

if _pipeline_config and _pipeline_config.pipeline_name:
runproc_file_str = self._generate_framework_script(user_script)
runproc_file_hash = hash_object(runproc_file_str)
s3_uri = (
f"s3://{self.sagemaker_session.default_bucket()}/{_pipeline_config.pipeline_name}/"
f"code/{runproc_file_hash}/runproc.sh"
)
s3_runproc_sh = S3Uploader.upload_string_as_file_body(
runproc_file_str,
desired_s3_uri=s3_uri,
kms_key=kms_key,
sagemaker_session=self.sagemaker_session,
)
else:
s3_runproc_sh = S3Uploader.upload_string_as_file_body(
self._generate_framework_script(user_script),
desired_s3_uri=entrypoint_s3_uri,
kms_key=kms_key,
sagemaker_session=self.sagemaker_session,
)
logger.info("runproc.sh uploaded to %s", s3_runproc_sh)

return s3_runproc_sh
11 changes: 11 additions & 0 deletions tests/data/pipeline/test_source_dir/script_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
Integ test file script_1.py
"""
import pathlib

if __name__ == "__main__":

print("writing file to /opt/ml/processing/test/test.py...")
pathlib.Path("/opt/ml/processing/test").mkdir(parents=True, exist_ok=True)
with open("/opt/ml/processing/test/test.py", "w") as f:
f.write('print("test...")')
9 changes: 9 additions & 0 deletions tests/data/pipeline/test_source_dir/script_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""
Integ test file script_2.py
"""

if __name__ == "__main__":

print("reading file: /opt/ml/procesing/test/test.py")
with open("/opt/ml/processing/test/test.py", "r") as f:
print(f.read())
9 changes: 9 additions & 0 deletions tests/data/pipeline/test_source_dir_2/script_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""
Integ test file script_2.py
"""

if __name__ == "__main__":

print("reading file: /opt/ml/procesing/test/test.py")
with open("/opt/ml/processing/test/test.py", "r") as f:
print(f.read())
241 changes: 238 additions & 3 deletions tests/integ/sagemaker/workflow/test_processing_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
import re
import subprocess
from datetime import datetime
from pathlib import Path

import pytest
from botocore.exceptions import WaiterError
from sagemaker.workflow.utilities import hash_files_or_dirs, hash_object

from sagemaker import image_uris, get_execution_role, utils
from sagemaker.dataset_definition import DatasetDefinition, AthenaDatasetDefinition
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.s3 import S3Uploader
from sagemaker.sklearn import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor
from sagemaker.s3 import S3Uploader, S3Downloader
from sagemaker.sklearn import SKLearnProcessor, SKLearn
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
Expand Down Expand Up @@ -379,6 +382,197 @@ def test_one_step_framework_processing_pipeline(
pass


def test_multi_step_framework_processing_pipeline_same_source_dir(
pipeline_session, role, pipeline_name, region_name
):
default_bucket = pipeline_session.default_bucket()
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

SOURCE_DIR = "/pipeline/test_source_dir"

framework_processor_tf = FrameworkProcessor(
role=role,
instance_type="ml.m5.xlarge",
instance_count=1,
estimator_cls=TensorFlow,
framework_version="2.9",
py_version="py39",
sagemaker_session=pipeline_session,
)

framework_processor_sk = FrameworkProcessor(
framework_version="1.0-1",
instance_type="ml.m5.xlarge",
instance_count=1,
base_job_name="my-job",
role=role,
estimator_cls=SKLearn,
sagemaker_session=pipeline_session,
)

step_1 = ProcessingStep(
name="Step-1",
step_args=framework_processor_tf.run(
code="script_1.py",
source_dir=DATA_DIR + SOURCE_DIR,
outputs=[ProcessingOutput(output_name="test", source="/opt/ml/processing/test")],
),
cache_config=cache_config,
)

step_2 = ProcessingStep(
name="Step-2",
step_args=framework_processor_sk.run(
code="script_2.py",
source_dir=DATA_DIR + SOURCE_DIR,
inputs=[
ProcessingInput(
source=step_1.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
),
cache_config=cache_config,
)

pipeline = Pipeline(name=pipeline_name, steps=[step_1, step_2])
try:
pipeline.create(role)
definition = json.loads(pipeline.definition())

source_dir_1_s3_uri, entry_point_1 = _verify_code_artifacts_of_framework_processing_step(
pipeline_session,
framework_processor_tf,
default_bucket,
pipeline_name,
definition["Steps"][0],
SOURCE_DIR,
"script_1.py",
)
source_dir_2_s3_uri, entry_point_2 = _verify_code_artifacts_of_framework_processing_step(
pipeline_session,
framework_processor_sk,
default_bucket,
pipeline_name,
definition["Steps"][1],
SOURCE_DIR,
"script_2.py",
)

# the same local source_dirs should have the same s3 paths
assert source_dir_1_s3_uri == source_dir_2_s3_uri

# verify different entry_point paths
assert entry_point_1 != entry_point_2

execution = pipeline.start(parameters={})
try:
execution.wait(delay=540, max_attempts=3)
except WaiterError:
pass
execution_steps = execution.list_steps()
assert len(execution_steps) == 2
for step in execution_steps:
assert step["StepStatus"] == "Succeeded"

finally:
try:
pipeline.delete()
except Exception:
pass


def test_multi_step_framework_processing_pipeline_different_source_dir(
pipeline_session, role, pipeline_name, region_name
):
default_bucket = pipeline_session.default_bucket()
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

SOURCE_DIR_1 = "/pipeline/test_source_dir"
SOURCE_DIR_2 = "/pipeline/test_source_dir_2"

framework_processor_tf = FrameworkProcessor(
role=role,
instance_type="ml.m5.xlarge",
instance_count=1,
estimator_cls=TensorFlow,
framework_version="2.9",
py_version="py39",
sagemaker_session=pipeline_session,
)

step_1 = ProcessingStep(
name="Step-1",
step_args=framework_processor_tf.run(
code="script_1.py",
source_dir=DATA_DIR + SOURCE_DIR_1,
outputs=[ProcessingOutput(output_name="test", source="/opt/ml/processing/test")],
),
cache_config=cache_config,
)

step_2 = ProcessingStep(
name="Step-2",
step_args=framework_processor_tf.run(
code="script_2.py",
source_dir=DATA_DIR + SOURCE_DIR_2,
inputs=[
ProcessingInput(
source=step_1.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
),
cache_config=cache_config,
)

pipeline = Pipeline(name=pipeline_name, steps=[step_1, step_2])
try:
pipeline.create(role)
definition = json.loads(pipeline.definition())

source_dir_1_s3_uri, entry_point_1 = _verify_code_artifacts_of_framework_processing_step(
pipeline_session,
framework_processor_tf,
default_bucket,
pipeline_name,
definition["Steps"][0],
SOURCE_DIR_1,
"script_1.py",
)
source_dir_2_s3_uri, entry_point_2 = _verify_code_artifacts_of_framework_processing_step(
pipeline_session,
framework_processor_tf,
default_bucket,
pipeline_name,
definition["Steps"][1],
SOURCE_DIR_2,
"script_2.py",
)

# different local source_dirs should have different s3 paths
assert source_dir_1_s3_uri != source_dir_2_s3_uri

# verify different entry_point paths
assert entry_point_1 != entry_point_2

execution = pipeline.start(parameters={})
try:
execution.wait(delay=540, max_attempts=3)
except WaiterError:
pass
execution_steps = execution.list_steps()
assert len(execution_steps) == 2
for step in execution_steps:
assert step["StepStatus"] == "Succeeded"

finally:
try:
pipeline.delete()
except Exception:
pass


def test_one_step_pyspark_processing_pipeline(
sagemaker_session,
role,
Expand Down Expand Up @@ -796,3 +990,44 @@ def test_two_processing_job_depends_on(
pipeline.delete()
except Exception:
pass


def _verify_code_artifacts_of_framework_processing_step(
pipeline_session, processor, bucket, pipeline_name, step_definition, source_dir, entry_point
):

source_dir_s3_uri = (
f"s3://{bucket}/{pipeline_name}" f"/code/{hash_files_or_dirs([f'{DATA_DIR}/{source_dir}'])}"
)

# verify runproc.sh prefix is different from code artifact prefix
runprocs = []
for input_obj in step_definition["Arguments"]["ProcessingInputs"]:
if input_obj["InputName"] == "entrypoint":
s3_uri = input_obj["S3Input"]["S3Uri"]
runprocs.append(s3_uri)

assert Path(s3_uri).parent != source_dir_s3_uri

# verify only one entrypoint generated per step
assert len(runprocs) == 1

expected_source_dir_tar = (
f"{pipeline_name}"
f"/code/{hash_files_or_dirs([DATA_DIR + '/pipeline/test_source_dir'])}/sourcedir.tar.gz"
)

step_script = processor._generate_framework_script(entry_point)
expected_step_artifact = f"{pipeline_name}/code/{hash_object(step_script)}/runproc.sh"

expected_prefix = f"{pipeline_name}/code"
s3_code_objects = pipeline_session.list_s3_files(bucket=bucket, key_prefix=expected_prefix)

# verify all distinct artifacts were uploaded
assert expected_source_dir_tar in s3_code_objects
assert expected_step_artifact in s3_code_objects

# verify runprocs contain the correct commands
step_runproc = S3Downloader.read_file(f"s3://{bucket}/{expected_step_artifact}")
assert f"python {entry_point}" in step_runproc
return source_dir, expected_step_artifact
8 changes: 7 additions & 1 deletion tests/integ/sagemaker/workflow/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,13 @@ def walk():


def test_caching_behavior(
pipeline_session, role, cpu_instance_type, pipeline_name, script_dir, athena_dataset_definition
pipeline_session,
role,
cpu_instance_type,
pipeline_name,
script_dir,
athena_dataset_definition,
region_name,
):
default_bucket = pipeline_session.default_bucket()
data_path = os.path.join(DATA_DIR, "workflow")
Expand Down