diff --git a/CHANGELOG.md b/CHANGELOG.md index d211500561..3436c26fc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ ## v2.77.1 (2022-02-25) +### Features + + * default repack encryption + * support large pipeline + * add support for pytorch 1.10.0 + +### Documentation Changes + + * SageMaker model parallel library 1.6.0 API doc + ### Bug Fixes and Other Changes * jumpstart model table diff --git a/src/sagemaker/workflow/_utils.py b/src/sagemaker/workflow/_utils.py index fbbb6acba9..3454efe09c 100644 --- a/src/sagemaker/workflow/_utils.py +++ b/src/sagemaker/workflow/_utils.py @@ -59,6 +59,7 @@ def __init__( display_name: str = None, description: str = None, source_dir: str = None, + repack_output_path=None, dependencies: List = None, depends_on: Union[List[str], List[Step]] = None, retry_policies: List[RetryPolicy] = None, @@ -101,6 +102,9 @@ def __init__( or model hosting source code dependencies aside from the entry point file in the Git repo (default: None). Structure within this directory are preserved when training on Amazon SageMaker. + repack_output_path (str): The S3 prefix URI where the repacked model will be + uploaded (default: None) - don't include a trailing slash. + If not specified, the default location is s3://default-bucket/job-name. dependencies (list[str]): A list of paths to directories (absolute or relative) with any additional libraries that will be exported to the container (default: []). The library folders will be @@ -170,6 +174,8 @@ def __init__( }, subnets=subnets, security_group_ids=security_group_ids, + output_path=repack_output_path, + code_location=repack_output_path, **kwargs, ) repacker.disable_profiler = True diff --git a/src/sagemaker/workflow/lambda_step.py b/src/sagemaker/workflow/lambda_step.py index 5240ae60b9..ef8e60d55d 100644 --- a/src/sagemaker/workflow/lambda_step.py +++ b/src/sagemaker/workflow/lambda_step.py @@ -163,6 +163,7 @@ def _get_function_arn(self): if self.lambda_func.function_arn is None: account_id = self.lambda_func.session.account_id() try: + account_id = self.lambda_func.session.account_id() response = self.lambda_func.create() return response["FunctionArn"] except ValueError as error: diff --git a/src/sagemaker/workflow/step_collections.py b/src/sagemaker/workflow/step_collections.py index 1280637006..f808fad360 100644 --- a/src/sagemaker/workflow/step_collections.py +++ b/src/sagemaker/workflow/step_collections.py @@ -63,6 +63,7 @@ def __init__( estimator: EstimatorBase = None, model_data=None, depends_on: Union[List[str], List[Step]] = None, + repack_output_path=None, repack_model_step_retry_policies: List[RetryPolicy] = None, register_model_step_retry_policies: List[RetryPolicy] = None, model_package_group_name=None, @@ -92,6 +93,9 @@ def __init__( job can be run or on which an endpoint can be deployed (default: None). depends_on (List[str] or List[Step]): The list of step names or step instances the first step in the collection depends on + repack_output_path (str): The S3 prefix URI where the repacked model will be + uploaded (default: None) - don't include a trailing slash. + If not specified, the default location is s3://default-bucket/job-name. repack_model_step_retry_policies (List[RetryPolicy]): The list of retry policies for the repack model step register_model_step_retry_policies (List[RetryPolicy]): The list of retry policies @@ -155,6 +159,7 @@ def __init__( security_group_ids=security_group_ids, description=description, display_name=display_name, + repack_output_path=repack_output_path, **kwargs, ) steps.append(repack_model_step) @@ -199,6 +204,7 @@ def __init__( security_group_ids=security_group_ids, description=description, display_name=display_name, + repack_output_path=repack_output_path, **kwargs, ) steps.append(repack_model_step) @@ -261,6 +267,7 @@ def __init__( image_uri=None, predictor_cls=None, env=None, + repack_output_path=None, # transformer arguments strategy=None, assemble_with=None, @@ -282,8 +289,8 @@ def __init__( An estimator-centric step collection. It models what happens in workflows when invoking the `transform()` method on an estimator instance: - First, if custom - model artifacts are required, a `_RepackModelStep` is included. + First, if a custom + entry point script is required, a `_RepackModelStep` is included. Second, a `CreateModelStep` with the model data passed in from a training step or other training job output. @@ -312,6 +319,9 @@ def __init__( it will be the format of the batch transform output. env (dict): The Environment variables to be set for use during the transform job (default: None). + repack_output_path (str): The S3 prefix URI where the repacked model will be + uploaded (default: None) - don't include a trailing slash. + If not specified, the default location is s3://default-bucket/job-name. depends_on (List[str] or List[Step]): The list of step names or step instances the first step in the collection depends on repack_model_step_retry_policies (List[RetryPolicy]): The list of retry policies @@ -322,10 +332,13 @@ def __init__( transform step """ steps = [] + repack_model = False + if "entry_point" in kwargs: - entry_point = kwargs.get("entry_point", None) - source_dir = kwargs.get("source_dir", None) - dependencies = kwargs.get("dependencies", None) + repack_model = True + entry_point = kwargs.pop("entry_point", None) + source_dir = kwargs.pop("source_dir", None) + dependencies = kwargs.pop("dependencies", None) repack_model_step = _RepackModelStep( name=f"{name}RepackModel", depends_on=depends_on, @@ -341,6 +354,8 @@ def __init__( security_group_ids=estimator.security_group_ids, description=description, display_name=display_name, + repack_output_path=repack_output_path, + **kwargs, ) steps.append(repack_model_step) model_data = repack_model_step.properties.ModelArtifacts.S3ModelArtifacts @@ -371,7 +386,7 @@ def predict_wrapper(endpoint, session): display_name=display_name, retry_policies=model_step_retry_policies, ) - if "entry_point" not in kwargs and depends_on: + if not repack_model and depends_on: # if the CreateModelStep is the first step in the collection model_step.add_depends_on(depends_on) steps.append(model_step) diff --git a/tests/integ/sagemaker/lineage/conftest.py b/tests/integ/sagemaker/lineage/conftest.py index 8922c011f7..cad36e065a 100644 --- a/tests/integ/sagemaker/lineage/conftest.py +++ b/tests/integ/sagemaker/lineage/conftest.py @@ -25,6 +25,13 @@ association, artifact, ) +from sagemaker.lineage.query import ( + LineageFilter, + LineageEntityEnum, + LineageSourceEnum, + LineageQuery, + LineageQueryDirectionEnum, +) from sagemaker.model import ModelPackage from tests.integ.sagemaker.workflow.test_workflow import ( test_end_to_end_pipeline_successful_execution, diff --git a/tests/unit/sagemaker/workflow/test_step_collections.py b/tests/unit/sagemaker/workflow/test_step_collections.py index ea810796f4..cb879b1149 100644 --- a/tests/unit/sagemaker/workflow/test_step_collections.py +++ b/tests/unit/sagemaker/workflow/test_step_collections.py @@ -808,6 +808,32 @@ def test_register_model_with_model_repack_with_pipeline_model( raise Exception("A step exists in the collection of an invalid type.") +def test_register_model_with_model_repack_with_repack_output_path(model): + repack_output_path = "s3://{BUCKET}/repack_output" + register_model = RegisterModel( + name="RegisterModelStep", + model=model, + content_types=["content_type"], + response_types=["response_type"], + inference_instances=["inference_instance"], + transform_instances=["transform_instance"], + model_package_group_name="mpg", + approval_status="Approved", + description="description", + depends_on=["TestStep"], + tags=[{"Key": "myKey", "Value": "myValue"}], + repack_output_path=repack_output_path, + ) + + request_dicts = register_model.request_dicts() + + for request_dict in request_dicts: + if request_dict["Type"] == "Training": + arguments = request_dict["Arguments"] + assert arguments["DebugHookConfig"]["S3OutputPath"] == repack_output_path + assert arguments["OutputDataConfig"]["S3OutputPath"] == repack_output_path + + def test_estimator_transformer(estimator): model_data = f"s3://{BUCKET}/model.tar.gz" model_inputs = CreateModelInput( diff --git a/tests/unit/sagemaker/workflow/test_steps.py b/tests/unit/sagemaker/workflow/test_steps.py index fd3bd7d0b9..01f80df38b 100644 --- a/tests/unit/sagemaker/workflow/test_steps.py +++ b/tests/unit/sagemaker/workflow/test_steps.py @@ -715,6 +715,41 @@ def test_processing_step_normalizes_args_with_no_code(mock_normalize_args, scrip ) +@patch("sagemaker.processing.ScriptProcessor._normalize_args") +def test_processing_step_normalizes_args_with_no_code(mock_normalize_args, script_processor): + cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") + inputs = [ + ProcessingInput( + source=f"s3://{BUCKET}/processing_manifest", + destination="processing_manifest", + ) + ] + outputs = [ + ProcessingOutput( + source=f"s3://{BUCKET}/processing_manifest", + destination="processing_manifest", + ) + ] + step = ProcessingStep( + name="MyProcessingStep", + processor=script_processor, + inputs=inputs, + outputs=outputs, + job_arguments=["arg1", "arg2"], + cache_config=cache_config, + ) + mock_normalize_args.return_value = [step.inputs, step.outputs] + step.to_request() + mock_normalize_args.assert_called_with( + job_name=None, + arguments=step.job_arguments, + inputs=step.inputs, + outputs=step.outputs, + code=None, + kms_key=None, + ) + + def test_create_model_step(sagemaker_session): model = Model( image_uri=IMAGE_URI, diff --git a/tests/unit/sagemaker/workflow/test_utils.py b/tests/unit/sagemaker/workflow/test_utils.py index e534aa531e..cb4ff45653 100644 --- a/tests/unit/sagemaker/workflow/test_utils.py +++ b/tests/unit/sagemaker/workflow/test_utils.py @@ -208,3 +208,21 @@ def test_repack_model_step_with_source_dir(estimator, source_dir): assert step.properties.TrainingJobName.expr == { "Get": "Steps.MyRepackModelStep.TrainingJobName" } + + +def test_repack_model_step_with_output_path(estimator): + repack_output_path = "s3://{BUCKET}/repack_output" + model_data = f"s3://{BUCKET}/model.tar.gz" + entry_point = f"{DATA_DIR}/dummy_script.py" + step = _RepackModelStep( + name="MyRepackModelStep", + sagemaker_session=estimator.sagemaker_session, + role=estimator.role, + model_data=model_data, + entry_point=entry_point, + repack_output_path=repack_output_path, + ) + request_dict = step.to_request() + + assert request_dict["Arguments"]["DebugHookConfig"]["S3OutputPath"] == repack_output_path + assert request_dict["Arguments"]["OutputDataConfig"]["S3OutputPath"] == repack_output_path