Skip to content

feature: Allow custom output for RepackModelStep #2804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
dde8d00
fix: Set ProcessingStep upload locations deterministically to avoid c…
staubhp Dec 8, 2021
0f72907
fix: Prevent repack_model script from referencing nonexistent directo…
staubhp Dec 9, 2021
0bae071
fix: S3Input - add support for instance attributes (#2754)
mufaddal-rohawala Dec 15, 2021
17fe93e
fix: typos and broken link (#2765)
mohamed-ali Dec 16, 2021
f0efd27
feature: Add output path parameter for _RepackModelStep
tuliocasagrande Dec 17, 2021
7a1f4f8
fix: Fix role parameter for _RepackModelStep
tuliocasagrande Dec 17, 2021
ee6afcf
fix: Remove entry_point before calling Model on EstimatorTransformer
tuliocasagrande Jan 4, 2022
faf4ad5
feature: Add tests for RegisterModel with repack output
tuliocasagrande Jan 4, 2022
8210375
fix: fixes unnecessary session call while generating pipeline definit…
xchen909 Jan 10, 2022
972a6d2
feature: Add models_v2 under lineage context (#2800)
yzhu0 Jan 10, 2022
7206b9e
feature: enable python 3.9 (#2802)
mufaddal-rohawala Jan 10, 2022
127c964
change: Update CHANGELOG.md (#2842)
shreyapandit Jan 11, 2022
554d735
fix: update pricing link (#2805)
ahsan-z-khan Jan 11, 2022
88e4d68
doc: Document the available ExecutionVariables (#2807)
tuliocasagrande Jan 12, 2022
b3c19d8
fix: Remove duplicate vertex/edge in query lineage (#2784)
yzhu0 Jan 12, 2022
fd7a335
feature: Support model pipelines in CreateModelStep (#2845)
staubhp Jan 12, 2022
ccfcbe7
feature: support JsonGet/Join parameterization in tuning step Hyperpa…
jerrypeng7773 Jan 13, 2022
71c5617
doc: Enhance smddp 1.2.2 doc (#2852)
mchoi8739 Jan 13, 2022
975e031
feature: support checkpoint to be passed from estimator (#2849)
marckarp Jan 13, 2022
b377b52
fix: allow kms_key to be passed for processing step (#2779)
jayatalr Jan 13, 2022
9d259b3
feature: Adds support for Serverless inference (#2831)
bhaoz Jan 14, 2022
b82fb8a
feature: Add support for SageMaker lineage queries in action (#2853)
yzhu0 Jan 14, 2022
0489b59
Merge branch 'dev' into repack_output
shreyapandit Jan 14, 2022
ed9131b
Merge remote-tracking branch 'upstream/dev' into repack_output
May 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## v2.77.1 (2022-02-25)

### Features

* default repack encryption
* support large pipeline
* add support for pytorch 1.10.0

### Documentation Changes

* SageMaker model parallel library 1.6.0 API doc

### Bug Fixes and Other Changes

* jumpstart model table
Expand Down
6 changes: 6 additions & 0 deletions src/sagemaker/workflow/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
display_name: str = None,
description: str = None,
source_dir: str = None,
repack_output_path=None,
dependencies: List = None,
depends_on: Union[List[str], List[Step]] = None,
retry_policies: List[RetryPolicy] = None,
Expand Down Expand Up @@ -101,6 +102,9 @@ def __init__(
or model hosting source code dependencies aside from the entry point
file in the Git repo (default: None). Structure within this
directory are preserved when training on Amazon SageMaker.
repack_output_path (str): The S3 prefix URI where the repacked model will be
uploaded (default: None) - don't include a trailing slash.
If not specified, the default location is s3://default-bucket/job-name.
dependencies (list[str]): A list of paths to directories (absolute
or relative) with any additional libraries that will be exported
to the container (default: []). The library folders will be
Expand Down Expand Up @@ -170,6 +174,8 @@ def __init__(
},
subnets=subnets,
security_group_ids=security_group_ids,
output_path=repack_output_path,
code_location=repack_output_path,
**kwargs,
)
repacker.disable_profiler = True
Expand Down
1 change: 1 addition & 0 deletions src/sagemaker/workflow/lambda_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def _get_function_arn(self):
if self.lambda_func.function_arn is None:
account_id = self.lambda_func.session.account_id()
try:
account_id = self.lambda_func.session.account_id()
response = self.lambda_func.create()
return response["FunctionArn"]
except ValueError as error:
Expand Down
27 changes: 21 additions & 6 deletions src/sagemaker/workflow/step_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
estimator: EstimatorBase = None,
model_data=None,
depends_on: Union[List[str], List[Step]] = None,
repack_output_path=None,
repack_model_step_retry_policies: List[RetryPolicy] = None,
register_model_step_retry_policies: List[RetryPolicy] = None,
model_package_group_name=None,
Expand Down Expand Up @@ -92,6 +93,9 @@ def __init__(
job can be run or on which an endpoint can be deployed (default: None).
depends_on (List[str] or List[Step]): The list of step names or step instances
the first step in the collection depends on
repack_output_path (str): The S3 prefix URI where the repacked model will be
uploaded (default: None) - don't include a trailing slash.
If not specified, the default location is s3://default-bucket/job-name.
repack_model_step_retry_policies (List[RetryPolicy]): The list of retry policies
for the repack model step
register_model_step_retry_policies (List[RetryPolicy]): The list of retry policies
Expand Down Expand Up @@ -155,6 +159,7 @@ def __init__(
security_group_ids=security_group_ids,
description=description,
display_name=display_name,
repack_output_path=repack_output_path,
**kwargs,
)
steps.append(repack_model_step)
Expand Down Expand Up @@ -199,6 +204,7 @@ def __init__(
security_group_ids=security_group_ids,
description=description,
display_name=display_name,
repack_output_path=repack_output_path,
**kwargs,
)
steps.append(repack_model_step)
Expand Down Expand Up @@ -261,6 +267,7 @@ def __init__(
image_uri=None,
predictor_cls=None,
env=None,
repack_output_path=None,
# transformer arguments
strategy=None,
assemble_with=None,
Expand All @@ -282,8 +289,8 @@ def __init__(

An estimator-centric step collection. It models what happens in workflows
when invoking the `transform()` method on an estimator instance:
First, if custom
model artifacts are required, a `_RepackModelStep` is included.
First, if a custom
entry point script is required, a `_RepackModelStep` is included.
Second, a
`CreateModelStep` with the model data passed in from a training step or other
training job output.
Expand Down Expand Up @@ -312,6 +319,9 @@ def __init__(
it will be the format of the batch transform output.
env (dict): The Environment variables to be set for use during the
transform job (default: None).
repack_output_path (str): The S3 prefix URI where the repacked model will be
uploaded (default: None) - don't include a trailing slash.
If not specified, the default location is s3://default-bucket/job-name.
depends_on (List[str] or List[Step]): The list of step names or step instances
the first step in the collection depends on
repack_model_step_retry_policies (List[RetryPolicy]): The list of retry policies
Expand All @@ -322,10 +332,13 @@ def __init__(
transform step
"""
steps = []
repack_model = False

if "entry_point" in kwargs:
entry_point = kwargs.get("entry_point", None)
source_dir = kwargs.get("source_dir", None)
dependencies = kwargs.get("dependencies", None)
repack_model = True
entry_point = kwargs.pop("entry_point", None)
source_dir = kwargs.pop("source_dir", None)
dependencies = kwargs.pop("dependencies", None)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dependencies = kwargs.pop("dependencies", None)
dependencies = kwargs.pop("dependencies", None)
code_location = kwargs.pop("code_location", None)

repack_model_step = _RepackModelStep(
name=f"{name}RepackModel",
depends_on=depends_on,
Expand All @@ -341,6 +354,8 @@ def __init__(
security_group_ids=estimator.security_group_ids,
description=description,
display_name=display_name,
repack_output_path=repack_output_path,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a repack_image_uri argument? My customer is facing an issue where the python version SKLearn (3.7) estimator is different from the training image uri. That caused dependency errors when trying to install packages that are available on (for example) Python 3.8. Another option would be equating it to estimator.training_image_uri().

Suggested change
repack_output_path=repack_output_path,
repack_output_path=repack_output_path,
image_uri=estimator.training_image_uri(),

**kwargs,
)
steps.append(repack_model_step)
model_data = repack_model_step.properties.ModelArtifacts.S3ModelArtifacts
Expand Down Expand Up @@ -371,7 +386,7 @@ def predict_wrapper(endpoint, session):
display_name=display_name,
retry_policies=model_step_retry_policies,
)
if "entry_point" not in kwargs and depends_on:
if not repack_model and depends_on:
# if the CreateModelStep is the first step in the collection
model_step.add_depends_on(depends_on)
steps.append(model_step)
Expand Down
7 changes: 7 additions & 0 deletions tests/integ/sagemaker/lineage/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
association,
artifact,
)
from sagemaker.lineage.query import (
LineageFilter,
LineageEntityEnum,
LineageSourceEnum,
LineageQuery,
LineageQueryDirectionEnum,
)
from sagemaker.model import ModelPackage
from tests.integ.sagemaker.workflow.test_workflow import (
test_end_to_end_pipeline_successful_execution,
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/sagemaker/workflow/test_step_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,32 @@ def test_register_model_with_model_repack_with_pipeline_model(
raise Exception("A step exists in the collection of an invalid type.")


def test_register_model_with_model_repack_with_repack_output_path(model):
repack_output_path = "s3://{BUCKET}/repack_output"
register_model = RegisterModel(
name="RegisterModelStep",
model=model,
content_types=["content_type"],
response_types=["response_type"],
inference_instances=["inference_instance"],
transform_instances=["transform_instance"],
model_package_group_name="mpg",
approval_status="Approved",
description="description",
depends_on=["TestStep"],
tags=[{"Key": "myKey", "Value": "myValue"}],
repack_output_path=repack_output_path,
)

request_dicts = register_model.request_dicts()

for request_dict in request_dicts:
if request_dict["Type"] == "Training":
arguments = request_dict["Arguments"]
assert arguments["DebugHookConfig"]["S3OutputPath"] == repack_output_path
assert arguments["OutputDataConfig"]["S3OutputPath"] == repack_output_path


def test_estimator_transformer(estimator):
model_data = f"s3://{BUCKET}/model.tar.gz"
model_inputs = CreateModelInput(
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/sagemaker/workflow/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,41 @@ def test_processing_step_normalizes_args_with_no_code(mock_normalize_args, scrip
)


@patch("sagemaker.processing.ScriptProcessor._normalize_args")
def test_processing_step_normalizes_args_with_no_code(mock_normalize_args, script_processor):
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
inputs = [
ProcessingInput(
source=f"s3://{BUCKET}/processing_manifest",
destination="processing_manifest",
)
]
outputs = [
ProcessingOutput(
source=f"s3://{BUCKET}/processing_manifest",
destination="processing_manifest",
)
]
step = ProcessingStep(
name="MyProcessingStep",
processor=script_processor,
inputs=inputs,
outputs=outputs,
job_arguments=["arg1", "arg2"],
cache_config=cache_config,
)
mock_normalize_args.return_value = [step.inputs, step.outputs]
step.to_request()
mock_normalize_args.assert_called_with(
job_name=None,
arguments=step.job_arguments,
inputs=step.inputs,
outputs=step.outputs,
code=None,
kms_key=None,
)


def test_create_model_step(sagemaker_session):
model = Model(
image_uri=IMAGE_URI,
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/sagemaker/workflow/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,21 @@ def test_repack_model_step_with_source_dir(estimator, source_dir):
assert step.properties.TrainingJobName.expr == {
"Get": "Steps.MyRepackModelStep.TrainingJobName"
}


def test_repack_model_step_with_output_path(estimator):
repack_output_path = "s3://{BUCKET}/repack_output"
model_data = f"s3://{BUCKET}/model.tar.gz"
entry_point = f"{DATA_DIR}/dummy_script.py"
step = _RepackModelStep(
name="MyRepackModelStep",
sagemaker_session=estimator.sagemaker_session,
role=estimator.role,
model_data=model_data,
entry_point=entry_point,
repack_output_path=repack_output_path,
)
request_dict = step.to_request()

assert request_dict["Arguments"]["DebugHookConfig"]["S3OutputPath"] == repack_output_path
assert request_dict["Arguments"]["OutputDataConfig"]["S3OutputPath"] == repack_output_path