From b7d6a5936dd16cc07a0fe8223a853deeda7d3d25 Mon Sep 17 00:00:00 2001 From: Lauren Yu <6631887+laurenyu@users.noreply.github.com> Date: Thu, 25 Jun 2020 14:11:29 -0700 Subject: [PATCH 1/3] feature: add Predictor.update_endpoint() --- src/sagemaker/predictor.py | 102 ++++++++++++++++- src/sagemaker/session.py | 11 +- tests/integ/test_inference_pipeline.py | 25 ++-- tests/integ/test_multidatamodel.py | 22 ++-- tests/integ/test_mxnet_train.py | 25 ++-- tests/unit/test_predictor.py | 153 ++++++++++++++++++++++++- 6 files changed, 288 insertions(+), 50 deletions(-) diff --git a/src/sagemaker/predictor.py b/src/sagemaker/predictor.py index 1de0c2d663..af491fd55f 100644 --- a/src/sagemaker/predictor.py +++ b/src/sagemaker/predictor.py @@ -22,7 +22,7 @@ from sagemaker.content_types import CONTENT_TYPE_JSON, CONTENT_TYPE_CSV, CONTENT_TYPE_NPY from sagemaker.model_monitor import DataCaptureConfig -from sagemaker.session import Session +from sagemaker.session import production_variant, Session from sagemaker.utils import name_from_base from sagemaker.model_monitor.model_monitoring import ( @@ -157,6 +157,106 @@ def _create_request_args(self, data, initial_args=None, target_model=None, targe args["Body"] = data return args + def update_endpoint( + self, + initial_instance_count=None, + instance_type=None, + accelerator_type=None, + model_name=None, + tags=None, + kms_key=None, + data_capture_config_dict=None, + wait=True, + ): + """Update the existing endpoint with the provided attributes. + + This creates a new EndpointConfig in the process. If ``initial_instance_count``, + ``instance_type``, ``accelerator_type``, or ``model_name`` is specified, then a new + ``ProductionVariant`` configuration is created; values from the existing configuration + are not preserved if any of those parameters are specified. + + Args: + initial_instance_count (int): The initial number of instances to run in the endpoint. + This is required if ``instance_type``, ``accelerator_type``, or ``model_name`` is + specified. Otherwise, the values from the existing endpoint configuration's + ``ProductionVariant``s are used. + instance_type (str): The EC2 instance type to deploy the endpoint to. + This is required if ``initial_instance_count`` or ``accelerator_type`` is specified. + Otherwise, the values from the existing endpoint configuration's + ``ProductionVariant``s are used. + accelerator_type (str): The type of Elastic Inference accelerator to attach to + the endpoint, e.g. 'ml.eia1.medium'. If not specified, and + ``initial_instance_count``, ``instance_type``, and ``model_name`` are also ``None``, + the values from the existing endpoint configuration's ``ProductionVariant``s are + used. Otherwise, no Elastic Inference accelerator is attached to the endpoint. + model_name (str): The name of the model to be associated with the endpoint. + This is required if ``initial_instance_count``, ``instance_type``, or + ``accelerator_type`` is specified and if there is more than one model associated + with the endpoint. Otherwise, the existing model for the endpoint is used. + tags (list[dict[str, str]]): The list of tags to add to the endpoint + config. If not specified, the tags of the existing endpoint configuration are used. + If any of the existing tags are reserved AWS ones (i.e. begin with "aws"), + they are not carried over to the new endpoint configuration. + kms_key (str): The KMS key that is used to encrypt the data on the storage volume + attached to the instance hosting the endpoint If not specified, + the KMS key of the existing endpoint configuration is used. + data_capture_config_dict (dict): The endpoint data capture configuration + for use with Amazon SageMaker Model Monitoring. If not specified, + the data capture configuration of the existing endpoint configuration is used. + + Raises: + ValueError: If there is not enough information to create a new ``ProductionVariant``: + + - If ``initial_instance_count``, ``accelerator_type``, or ``model_name`` is + specified, but ``instance_type`` is ``None``. + - If ``initial_instance_count``, ``instance_type``, or ``accelerator_type`` is + specified and either ``model_name`` is ``None`` or there are multiple models + associated with the endpoint. + """ + production_variants = None + + if initial_instance_count or instance_type or accelerator_type or model_name: + if instance_type is None or initial_instance_count is None: + raise ValueError( + "Missing initial_instance_count and/or instance_type. Provided values: " + "initial_instance_count={}, instance_type={}, accelerator_type={}, " + "model_name={}.".format( + initial_instance_count, instance_type, accelerator_type, model_name + ) + ) + + if model_name is None: + if len(self._model_names) > 1: + raise ValueError( + "Unable to choose a default model for a new EndpointConfig because " + "the endpoint has multiple models: {}".format(", ".join(self._model_names)) + ) + model_name = self._model_names[0] + else: + self._model_names = [model_name] + + production_variant_config = production_variant( + model_name, + instance_type, + initial_instance_count=initial_instance_count, + accelerator_type=accelerator_type, + ) + production_variants = [production_variant_config] + + new_endpoint_config_name = name_from_base(self._endpoint_config_name) + self.sagemaker_session.create_endpoint_config_from_existing( + self._endpoint_config_name, + new_endpoint_config_name, + new_tags=tags, + new_kms_key=kms_key, + new_data_capture_config_dict=data_capture_config_dict, + new_production_variants=production_variants, + ) + self.sagemaker_session.update_endpoint( + self.endpoint_name, new_endpoint_config_name, wait=wait + ) + self._endpoint_config_name = new_endpoint_config_name + def _delete_endpoint_config(self): """Delete the Amazon SageMaker endpoint configuration""" self.sagemaker_session.delete_endpoint_config(self._endpoint_config_name) diff --git a/src/sagemaker/session.py b/src/sagemaker/session.py index ac51f5a37a..3a58ac974a 100644 --- a/src/sagemaker/session.py +++ b/src/sagemaker/session.py @@ -2333,6 +2333,7 @@ def create_endpoint_config_from_existing( new_tags=None, new_kms_key=None, new_data_capture_config_dict=None, + new_production_variants=None, ): """Create an Amazon SageMaker endpoint configuration from an existing one. Updating any values that were passed in. @@ -2346,7 +2347,7 @@ def create_endpoint_config_from_existing( new_config_name (str): Name of the Amazon SageMaker endpoint configuration to create. existing_config_name (str): Name of the existing Amazon SageMaker endpoint configuration. - new_tags(List[dict[str, str]]): Optional. The list of tags to add to the endpoint + new_tags (list[dict[str, str]]): Optional. The list of tags to add to the endpoint config. If not specified, the tags of the existing endpoint configuration are used. If any of the existing tags are reserved AWS ones (i.e. begin with "aws"), they are not carried over to the new endpoint configuration. @@ -2357,6 +2358,9 @@ def create_endpoint_config_from_existing( capture for use with Amazon SageMaker Model Monitoring (default: None). If not specified, the data capture configuration of the existing endpoint configuration is used. + new_production_variants (list[dict]): The configuration for which model(s) to host and + the resources to deploy for hosting the model(s). If not specified, + the ``ProductionVariants`` of the existing endpoint configuration is used. Returns: str: Name of the endpoint point configuration created. @@ -2370,9 +2374,12 @@ def create_endpoint_config_from_existing( request = { "EndpointConfigName": new_config_name, - "ProductionVariants": existing_endpoint_config_desc["ProductionVariants"], } + request["ProductionVariants"] = ( + new_production_variants or existing_endpoint_config_desc["ProductionVariants"] + ) + request_tags = new_tags or self.list_tags( existing_endpoint_config_desc["EndpointConfigArn"] ) diff --git a/tests/integ/test_inference_pipeline.py b/tests/integ/test_inference_pipeline.py index ecaab043e1..cfac47ce12 100644 --- a/tests/integ/test_inference_pipeline.py +++ b/tests/integ/test_inference_pipeline.py @@ -29,7 +29,6 @@ from sagemaker.predictor import Predictor, json_serializer from sagemaker.sparkml.model import SparkMLModel from sagemaker.utils import sagemaker_timestamp -from tests.integ.retry import retries SPARKML_DATA_PATH = os.path.join(DATA_DIR, "sparkml_model") XGBOOST_DATA_PATH = os.path.join(DATA_DIR, "xgboost_model") @@ -151,7 +150,7 @@ def test_inference_pipeline_model_deploy(sagemaker_session, cpu_instance_type): assert "Could not find model" in str(exception.value) -def test_inference_pipeline_model_deploy_with_update_endpoint( +def test_inference_pipeline_model_deploy_and_update_endpoint( sagemaker_session, cpu_instance_type, alternative_cpu_instance_type ): sparkml_data_path = os.path.join(DATA_DIR, "sparkml_model") @@ -181,24 +180,18 @@ def test_inference_pipeline_model_deploy_with_update_endpoint( role="SageMakerRole", sagemaker_session=sagemaker_session, ) - model.deploy(1, alternative_cpu_instance_type, endpoint_name=endpoint_name) - old_endpoint = sagemaker_session.sagemaker_client.describe_endpoint( + predictor = model.deploy(1, alternative_cpu_instance_type, endpoint_name=endpoint_name) + endpoint_desc = sagemaker_session.sagemaker_client.describe_endpoint( EndpointName=endpoint_name ) - old_config_name = old_endpoint["EndpointConfigName"] + old_config_name = endpoint_desc["EndpointConfigName"] - model.deploy(1, cpu_instance_type, update_endpoint=True, endpoint_name=endpoint_name) + predictor.update_endpoint(initial_instance_count=1, instance_type=cpu_instance_type) - # Wait for endpoint to finish updating - # Endpoint update takes ~7min. 40 retries * 30s sleeps = 20min timeout - for _ in retries(40, "Waiting for 'InService' endpoint status", seconds_to_sleep=30): - new_endpoint = sagemaker_session.sagemaker_client.describe_endpoint( - EndpointName=endpoint_name - ) - if new_endpoint["EndpointStatus"] == "InService": - break - - new_config_name = new_endpoint["EndpointConfigName"] + endpoint_desc = sagemaker_session.sagemaker_client.describe_endpoint( + EndpointName=endpoint_name + ) + new_config_name = endpoint_desc["EndpointConfigName"] new_config = sagemaker_session.sagemaker_client.describe_endpoint_config( EndpointConfigName=new_config_name ) diff --git a/tests/integ/test_multidatamodel.py b/tests/integ/test_multidatamodel.py index 2c82e919fc..0588c8c74d 100644 --- a/tests/integ/test_multidatamodel.py +++ b/tests/integ/test_multidatamodel.py @@ -496,25 +496,20 @@ def test_multi_data_model_deploy_pretrained_models_update_endpoint( result = predictor.predict(data, target_model=PRETRAINED_MODEL_PATH_2) assert result == "Invoked model: {}".format(PRETRAINED_MODEL_PATH_2) - old_endpoint = sagemaker_session.sagemaker_client.describe_endpoint( + endpoint_desc = sagemaker_session.sagemaker_client.describe_endpoint( EndpointName=endpoint_name ) - old_config_name = old_endpoint["EndpointConfigName"] + old_config_name = endpoint_desc["EndpointConfigName"] # Update endpoint - multi_data_model.deploy( - 1, alternative_cpu_instance_type, endpoint_name=endpoint_name, update_endpoint=True + predictor.update_endpoint( + initial_instance_count=1, instance_type=alternative_cpu_instance_type ) - # Wait for endpoint to finish updating - for _ in retries(40, "Waiting for 'InService' endpoint status", seconds_to_sleep=30): - new_endpoint = sagemaker_session.sagemaker_client.describe_endpoint( - EndpointName=endpoint_name - ) - if new_endpoint["EndpointStatus"] == "InService": - break - - new_config_name = new_endpoint["EndpointConfigName"] + endpoint_desc = sagemaker_session.sagemaker_client.describe_endpoint( + EndpointName=endpoint_name + ) + new_config_name = endpoint_desc["EndpointConfigName"] new_config = sagemaker_session.sagemaker_client.describe_endpoint_config( EndpointConfigName=new_config_name @@ -531,6 +526,7 @@ def test_multi_data_model_deploy_pretrained_models_update_endpoint( EndpointConfigName=new_config_name ) multi_data_model.delete_model() + with pytest.raises(Exception) as exception: sagemaker_session.sagemaker_client.describe_model(ModelName=model_name) assert "Could not find model" in str(exception.value) diff --git a/tests/integ/test_mxnet_train.py b/tests/integ/test_mxnet_train.py index 2476133ac8..51ca132c56 100644 --- a/tests/integ/test_mxnet_train.py +++ b/tests/integ/test_mxnet_train.py @@ -24,7 +24,6 @@ from sagemaker.utils import sagemaker_timestamp from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES from tests.integ.kms_utils import get_or_create_kms_key -from tests.integ.retry import retries from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name @@ -203,7 +202,7 @@ def test_deploy_model_with_tags_and_kms( assert endpoint_config["KmsKeyId"] == kms_key_arn -def test_deploy_model_with_update_endpoint( +def test_deploy_model_and_update_endpoint( mxnet_training_job, sagemaker_session, mxnet_full_version, @@ -227,24 +226,18 @@ def test_deploy_model_with_update_endpoint( sagemaker_session=sagemaker_session, framework_version=mxnet_full_version, ) - model.deploy(1, alternative_cpu_instance_type, endpoint_name=endpoint_name) - old_endpoint = sagemaker_session.sagemaker_client.describe_endpoint( + predictor = model.deploy(1, alternative_cpu_instance_type, endpoint_name=endpoint_name) + endpoint_desc = sagemaker_session.sagemaker_client.describe_endpoint( EndpointName=endpoint_name ) - old_config_name = old_endpoint["EndpointConfigName"] - - model.deploy(1, cpu_instance_type, update_endpoint=True, endpoint_name=endpoint_name) + old_config_name = endpoint_desc["EndpointConfigName"] - # Wait for endpoint to finish updating - # Endpoint update takes ~7min. 40 retries * 30s sleeps = 20min timeout - for _ in retries(40, "Waiting for 'InService' endpoint status", seconds_to_sleep=30): - new_endpoint = sagemaker_session.sagemaker_client.describe_endpoint( - EndpointName=endpoint_name - ) - if new_endpoint["EndpointStatus"] == "InService": - break + predictor.update_endpoint(initial_instance_count=1, instance_type=cpu_instance_type) - new_config_name = new_endpoint["EndpointConfigName"] + endpoint_desc = sagemaker_session.sagemaker_client.describe_endpoint( + EndpointName=endpoint_name + ) + new_config_name = endpoint_desc["EndpointConfigName"] new_config = sagemaker_session.sagemaker_client.describe_endpoint_config( EndpointConfigName=new_config_name ) diff --git a/tests/unit/test_predictor.py b/tests/unit/test_predictor.py index db4c5d3ee9..7c879e0508 100644 --- a/tests/unit/test_predictor.py +++ b/tests/unit/test_predictor.py @@ -15,10 +15,10 @@ import io import json import os -import pytest -from mock import Mock, call import numpy as np +import pytest +from mock import Mock, call, patch from sagemaker.predictor import Predictor from sagemaker.predictor import ( @@ -547,6 +547,155 @@ def test_predict_call_with_headers_and_csv(): assert result == CSV_RETURN_VALUE +@patch("sagemaker.predictor.name_from_base") +def test_update_endpoint_no_args(name_from_base): + new_endpoint_config_name = "new-endpoint-config" + name_from_base.return_value = new_endpoint_config_name + + sagemaker_session = empty_sagemaker_session() + existing_endpoint_config_name = "existing-endpoint-config" + + predictor = Predictor(ENDPOINT, sagemaker_session=sagemaker_session) + predictor._endpoint_config_name = existing_endpoint_config_name + + predictor.update_endpoint() + + assert ["model-1", "model-2"] == predictor._model_names + assert new_endpoint_config_name == predictor._endpoint_config_name + + name_from_base.assert_called_with(existing_endpoint_config_name) + sagemaker_session.create_endpoint_config_from_existing.assert_called_with( + existing_endpoint_config_name, + new_endpoint_config_name, + new_tags=None, + new_kms_key=None, + new_data_capture_config_dict=None, + new_production_variants=None, + ) + sagemaker_session.update_endpoint.assert_called_with( + ENDPOINT, new_endpoint_config_name, wait=True + ) + + +@patch("sagemaker.predictor.production_variant") +@patch("sagemaker.predictor.name_from_base") +def test_update_endpoint_all_args(name_from_base, production_variant): + new_endpoint_config_name = "new-endpoint-config" + name_from_base.return_value = new_endpoint_config_name + + sagemaker_session = empty_sagemaker_session() + existing_endpoint_config_name = "existing-endpoint-config" + + predictor = Predictor(ENDPOINT, sagemaker_session=sagemaker_session) + predictor._endpoint_config_name = existing_endpoint_config_name + + new_instance_count = 2 + new_instance_type = "ml.c4.xlarge" + new_accelerator_type = "ml.eia1.medium" + new_model_name = "new-model" + new_tags = {"Key": "foo", "Value": "bar"} + new_kms_key = "new-key" + new_data_capture_config_dict = {} + + predictor.update_endpoint( + initial_instance_count=new_instance_count, + instance_type=new_instance_type, + accelerator_type=new_accelerator_type, + model_name=new_model_name, + tags=new_tags, + kms_key=new_kms_key, + data_capture_config_dict=new_data_capture_config_dict, + wait=False, + ) + + assert [new_model_name] == predictor._model_names + assert new_endpoint_config_name == predictor._endpoint_config_name + + production_variant.assert_called_with( + new_model_name, + new_instance_type, + initial_instance_count=new_instance_count, + accelerator_type=new_accelerator_type, + ) + sagemaker_session.create_endpoint_config_from_existing.assert_called_with( + existing_endpoint_config_name, + new_endpoint_config_name, + new_tags=new_tags, + new_kms_key=new_kms_key, + new_data_capture_config_dict=new_data_capture_config_dict, + new_production_variants=[production_variant.return_value], + ) + sagemaker_session.update_endpoint.assert_called_with( + ENDPOINT, new_endpoint_config_name, wait=False + ) + + +@patch("sagemaker.predictor.production_variant") +@patch("sagemaker.predictor.name_from_base") +def test_update_endpoint_instance_type_and_count(name_from_base, production_variant): + new_endpoint_config_name = "new-endpoint-config" + name_from_base.return_value = new_endpoint_config_name + + sagemaker_session = empty_sagemaker_session() + existing_endpoint_config_name = "existing-endpoint-config" + existing_model_name = "existing-model" + + predictor = Predictor(ENDPOINT, sagemaker_session=sagemaker_session) + predictor._endpoint_config_name = existing_endpoint_config_name + predictor._model_names = [existing_model_name] + + new_instance_count = 2 + new_instance_type = "ml.c4.xlarge" + + predictor.update_endpoint( + initial_instance_count=new_instance_count, instance_type=new_instance_type, + ) + + assert [existing_model_name] == predictor._model_names + assert new_endpoint_config_name == predictor._endpoint_config_name + + production_variant.assert_called_with( + existing_model_name, + new_instance_type, + initial_instance_count=new_instance_count, + accelerator_type=None, + ) + sagemaker_session.create_endpoint_config_from_existing.assert_called_with( + existing_endpoint_config_name, + new_endpoint_config_name, + new_tags=None, + new_kms_key=None, + new_data_capture_config_dict=None, + new_production_variants=[production_variant.return_value], + ) + sagemaker_session.update_endpoint.assert_called_with( + ENDPOINT, new_endpoint_config_name, wait=True + ) + + +def test_update_endpoint_no_instance_type_or_no_instance_count(): + sagemaker_session = empty_sagemaker_session() + predictor = Predictor(ENDPOINT, sagemaker_session=sagemaker_session) + + bad_args = ({"instance_type": "ml.c4.xlarge"}, {"initial_instance_count": 2}) + for args in bad_args: + with pytest.raises(ValueError) as exception: + predictor.update_endpoint(**args) + + expected_msg = "Missing initial_instance_count and/or instance_type." + assert expected_msg in str(exception.value) + + +def test_update_endpoint_no_one_default_model_name_with_instance_type_and_count(): + sagemaker_session = empty_sagemaker_session() + predictor = Predictor(ENDPOINT, sagemaker_session=sagemaker_session) + + with pytest.raises(ValueError) as exception: + predictor.update_endpoint(initial_instance_count=2, instance_type="ml.c4.xlarge") + + assert "Unable to choose a default model for a new EndpointConfig" in str(exception.value) + + def test_delete_endpoint_with_config(): sagemaker_session = empty_sagemaker_session() sagemaker_session.sagemaker_client.describe_endpoint = Mock( From 7b88b6dd6da80fe066b6946228e53df48024ac94 Mon Sep 17 00:00:00 2001 From: Lauren Yu <6631887+laurenyu@users.noreply.github.com> Date: Wed, 1 Jul 2020 08:51:38 -0700 Subject: [PATCH 2/3] fix docstring --- src/sagemaker/predictor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sagemaker/predictor.py b/src/sagemaker/predictor.py index af491fd55f..ef05acf418 100644 --- a/src/sagemaker/predictor.py +++ b/src/sagemaker/predictor.py @@ -172,22 +172,22 @@ def update_endpoint( This creates a new EndpointConfig in the process. If ``initial_instance_count``, ``instance_type``, ``accelerator_type``, or ``model_name`` is specified, then a new - ``ProductionVariant`` configuration is created; values from the existing configuration + ProductionVariant configuration is created; values from the existing configuration are not preserved if any of those parameters are specified. Args: initial_instance_count (int): The initial number of instances to run in the endpoint. This is required if ``instance_type``, ``accelerator_type``, or ``model_name`` is specified. Otherwise, the values from the existing endpoint configuration's - ``ProductionVariant``s are used. + ProductionVariants are used. instance_type (str): The EC2 instance type to deploy the endpoint to. This is required if ``initial_instance_count`` or ``accelerator_type`` is specified. Otherwise, the values from the existing endpoint configuration's - ``ProductionVariant``s are used. + ``ProductionVariants`` are used. accelerator_type (str): The type of Elastic Inference accelerator to attach to - the endpoint, e.g. 'ml.eia1.medium'. If not specified, and + the endpoint, e.g. "ml.eia1.medium". If not specified, and ``initial_instance_count``, ``instance_type``, and ``model_name`` are also ``None``, - the values from the existing endpoint configuration's ``ProductionVariant``s are + the values from the existing endpoint configuration's ``ProductionVariants`` are used. Otherwise, no Elastic Inference accelerator is attached to the endpoint. model_name (str): The name of the model to be associated with the endpoint. This is required if ``initial_instance_count``, ``instance_type``, or From c5a19b898801f17400807121e4b5d02aeda56301 Mon Sep 17 00:00:00 2001 From: Lauren Yu <6631887+laurenyu@users.noreply.github.com> Date: Wed, 1 Jul 2020 09:03:03 -0700 Subject: [PATCH 3/3] fix integ test --- tests/integ/test_inference_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integ/test_inference_pipeline.py b/tests/integ/test_inference_pipeline.py index cfac47ce12..71bfd0258f 100644 --- a/tests/integ/test_inference_pipeline.py +++ b/tests/integ/test_inference_pipeline.py @@ -178,6 +178,7 @@ def test_inference_pipeline_model_deploy_and_update_endpoint( model = PipelineModel( models=[sparkml_model, xgb_model], role="SageMakerRole", + predictor_cls=Predictor, sagemaker_session=sagemaker_session, ) predictor = model.deploy(1, alternative_cpu_instance_type, endpoint_name=endpoint_name)