Skip to content

feature: Pipelines cache keys update #3428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 27, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 163 additions & 7 deletions doc/amazon_sagemaker_model_building_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -322,29 +322,34 @@ Example:

.. code-block:: python

bucket = "my-bucket"
model_prefix = "my-model"

step_tune = TuningStep(...)
# tuning step can launch multiple training jobs, thus producing multiple model artifacts
# we can create a model with the best performance
best_model = Model(
model_data=Join(
on="/",
values=[
"s3://my-bucket",
f"s3://{bucket}/{model_prefix}",
# from DescribeHyperParameterTuningJob
step_tune.properties.BestTrainingJob.TrainingJobName,
"output/model.tar.gz",
],
)
)
# we can also access any top-k best as we wish
second_best_model = Model(
model_data=Join(
on="/",
values=[
"s3://my-bucket",
f"s3://{bucket}/{model_prefix}",
# from ListTrainingJobsForHyperParameterTuningJob
step_tune.properties.TrainingJobSummaries[1].TrainingJobName,
"output/model.tar.gz",
],
)
)

:class:`sagemaker.workflow.steps.TuningStep` also has a helper function to generate any :code:`top-k` model data URI easily:
Expand All @@ -353,7 +358,8 @@ Example:

model_data = step_tune.get_top_model_s3_uri(
top_k=0, # best model
s3_bucket="s3://my-bucekt",
s3_bucket=bucket,
prefix=model_prefix
)

CreateModelStep
Expand Down Expand Up @@ -833,9 +839,9 @@ The following example uses :class:`sagemaker.workflow.parallelism_config.Paralle

Caching Configuration
==============================
Executing the step without changing its configurations, inputs, or outputs can be a waste. Thus, we can enable caching for pipeline steps. When caching is enabled, an expiration time (in `ISO8601 duration string format`_) needs to be supplied. The expiration time indicates how old a previous execution can be to be considered for reuse.
Executing the step without changing its configurations, inputs, or outputs can be a waste. Thus, we can enable caching for pipeline steps. When you use step signature caching, SageMaker Pipelines tries to use a previous run of your current pipeline step instead of running the step again. When previous runs are considered for reuse, certain arguments from the step are evaluated to see if any have changed. If any of these arguments have been updated, the step will execute again with the new configuration.

.. _ISO8601 duration string format: https://en.wikipedia.org/wiki/ISO_8601#Durations
When you turn on caching, you supply an expiration time (in `ISO8601 duration string format <https://en.wikipedia.org/wiki/ISO_8601#Durations>`__). The expiration time indicates how old a previous execution can be to be considered for reuse.

.. code-block:: python

Expand All @@ -844,13 +850,13 @@ Executing the step without changing its configurations, inputs, or outputs can b
expire_after="P30d" # 30-day
)

Here are few sample ISO8601 duration strings:
You can format your ISO8601 duration strings like the following examples:

- :code:`p30d`: 30 days
- :code:`P4DT12H`: 4 days and 12 hours
- :code:`T12H`: 12 hours

Caching is supported for the following step type:
Caching is supported for the following step types:

- :class:`sagemaker.workflow.steps.TrainingStep`
- :class:`sagemaker.workflow.steps.ProcessingStep`
Expand All @@ -860,6 +866,156 @@ Caching is supported for the following step type:
- :class:`sagemaker.workflow.clarify_check_step.ClarifyCheckStep`
- :class:`sagemaker.workflow.emr_step.EMRStep`

In order to create pipeline steps and eventually construct a SageMaker pipeline, you provide parameters within a Python script or notebook. The SageMaker Python SDK creates a pipeline definition by translating these parameters into SageMaker job attributes. Some of these attributes, when changed, cause the step to re-run (See `Caching Pipeline Steps <https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html>`__ for a detailed list). Therefore, if you update a SDK parameter that is used to create such an attribute, the step will rerun. See the following discussion for examples of this in processing and training steps, which are commonly used steps in Pipelines.

The following example creates a processing step:

.. code-block:: python

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.dataset_definition.inputs import S3Input
from sagemaker.processing import ProcessingInput, ProcessingOutput

pipeline_session = PipelineSession()

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
framework_version=framework_version,
instance_type="ml.m5.xlarge",
instance_count=processing_instance_count,
role=role,
sagemaker_session=pipeline_session
)

processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(
source="artifacts/data/abalone-dataset.csv",
input_name="abalone-dataset",
s3_input=S3Input(
local_path="/opt/ml/processing/input",
s3_uri="artifacts/data/abalone-dataset.csv",
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type="FullyReplicated",
s3_compression_type="None",
)
)
],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
],
code="artifacts/code/process/preprocessing.py",
)

processing_step = ProcessingStep(
name="Process",
step_args=processor_args,
cache_config=cache_config
)

The following parameters from the example cause additional processing step iterations when you change them:

- :code:`framework_version`: This parameter is used to construct the :code:`image_uri` for the `AppSpecification <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_AppSpecification.html>`__ attribute of the processing job.
- :code:`inputs`: Any :class:`ProcessingInputs` are passed through directly as job `ProcessingInputs <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ProcessingInput.html>`__. Input :code:`source` files that exist in the container’s local file system are uploaded to S3 and given a new :code:`S3_Uri`. If the S3 path changes, a new processing job is initiated. For examples of S3 paths, see the **S3 Artifact Folder Structure** section.
- :code:`code`: The code parameter is also packaged as a `ProcessingInput <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ProcessingInput.html>`__ job. For local files, a unique hash is created from the file. The file is then uploaded to S3 with the hash included in the path. When a different local file is used, a new hash is created and the S3 path for that `ProcessingInput <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ProcessingInput.html>`__ changes, initiating a new step run. For examples S3 paths, see the **S3 Artifact Folder Structure** section.

The following example creates a training step:

.. code-block:: python

from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import TrainingStep

pipeline_session = PipelineSession()

image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type="ml.m5.xlarge",
)

hyperparameters = {
"dataset_frequency": "H",
"timestamp_format": "yyyy-MM-dd hh:mm:ss",
"number_of_backtest_windows": "1",
"role_arn": role_arn,
"region": region,
}

sklearn_estimator = SKLearn(
entry_point="train.py",
role=role_arn,
image_uri=container_image_uri,
instance_type=training_instance_type,
sagemaker_session=pipeline_session,
base_job_name="training_job",
hyperparameters=hyperparameters,
enable_sagemaker_metrics=True,
)

train_args = xgb_train.fit(
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv",
),
}
)

training_step = TrainingStep(
name="Train",
estimator=sklearn_estimator,
cache_config=cache_config
)

The following parameters from the example cause additional training step iterations when you change them:

- :code:`image_uri`: The :code:`image_uri` parameter defines the image used for training, and is used directly in the `AlgorithmSpecification <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_AlgorithmSpecification.html>`__ attribute of the training job.
- :code:`hyperparameters`: All of the hyperparameters are used directly in the `HyperParameters <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html#API_DescribeTrainingJob_ResponseSyntax>`__ attribute for the training job.
- :code:`entry_point`: The entry point file is included in the training job’s `InputDataConfig Channel <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_Channel.html>`__ array. A unique hash is created from the file (and any other dependencies), and then the file is uploaded to S3 with the hash included in the path. When a different entry point file is used, a new hash is created and the S3 path for that `InputDataConfig Channel <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_Channel.html>`__ object changes, initiating a new step run. For examples of what the S3 paths look like, see the **S3 Artifact Folder Structure** section.
- :code:`inputs`: The inputs are also included in the training job’s `InputDataConfig <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_Channel.html>`__. Local inputs are uploaded to S3. If the S3 path changes, a new training job is initiated. For examples of S3 paths, see the **S3 Artifact Folder Structure** section.

S3 Artifact Folder Structure
----------------------------

You use the following S3 paths when uploading local input and code artifacts, and when saving output artifacts.

*Processing*

- Code: :code:`s3://bucket_name/pipeline_name/code/<code_hash>/file.py`. The file could also be a tar.gz of source_dir and dependencies.
- Input Data: :code:`s3://bucket_name/pipeline_name/step_name/input/input_name/file.csv`
- Configuration: :code:`s3://bucket_name/pipeline_name/step_name/input/conf/<configuration_hash>/configuration.json`
- Output: :code:`s3://bucket_name/pipeline_name/<execution_id>/step_name/output/output_name`

*Training*

- Code: :code:`s3://bucket_name/code_location/pipeline_name/code/<code_hash>/code.tar.gz`
- Output: The output paths for Training jobs can vary - the default output path is the root of the s3 bucket: :code:`s3://bucket_name`. For Training jobs created from a Tuning job, the default path includes the Training job name created by the Training platform, formatted as :code:`s3://bucket_name/<training_job_name>/output/model.tar.gz`.

*Transform*

- Output: :code:`s3://bucket_name/pipeline_name/<execution_id>/step_name`

.. warning::
For input artifacts such as data or code files, the actual content of the artifacts is not tracked, only the S3 path. This means that if a file in S3 is updated and re-uploaded directly with an identical name and path, then the step does NOT run again.


Retry Policy
===============

Expand Down