diff --git a/src/sagemaker/image_uri_config/sagemaker-distribution.json b/src/sagemaker/image_uri_config/sagemaker-distribution.json new file mode 100644 index 0000000000..d9ffca5d7b --- /dev/null +++ b/src/sagemaker/image_uri_config/sagemaker-distribution.json @@ -0,0 +1,37 @@ +{ + "processors": ["cpu", "gpu"], + "scope": ["inference"], + "version_aliases": { + "3.0": "3.0.0" + }, + "versions": { + "3.0.0": { + "registries": { + "us-east-1": "885854791233", + "us-east-2": "137914896644", + "us-west-1": "053634841547", + "us-west-2": "542918446943", + "af-south-1": "238384257742", + "ap-east-1": "523751269255", + "ap-south-1": "245090515133", + "ap-northeast-2": "064688005998", + "ap-southeast-1": "022667117163", + "ap-southeast-2": "648430277019", + "ap-northeast-1": "010972774902", + "ca-central-1": "481561238223", + "eu-central-1": "545423591354", + "eu-west-1": "819792524951", + "eu-west-2": "021081402939", + "eu-west-3": "856416204555", + "eu-north-1": "175620155138", + "eu-south-1": "810671768855", + "sa-east-1": "567556641782", + "ap-northeast-3": "564864627153", + "ap-southeast-3": "370607712162", + "me-south-1": "523774347010", + "me-central-1": "358593528301" + }, + "repository": "sagemaker-distribution-prod" + } + } +} diff --git a/src/sagemaker/serve/builder/model_builder.py b/src/sagemaker/serve/builder/model_builder.py index 9122f22e44..ed5455daec 100644 --- a/src/sagemaker/serve/builder/model_builder.py +++ b/src/sagemaker/serve/builder/model_builder.py @@ -11,7 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """Holds the ModelBuilder class and the ModelServer enum.""" -from __future__ import absolute_import +from __future__ import absolute_import, annotations import importlib.util import json @@ -24,6 +24,7 @@ from pathlib import Path +from botocore.exceptions import ClientError from sagemaker_core.main.resources import TrainingJob from sagemaker.transformer import Transformer @@ -37,6 +38,7 @@ from sagemaker.s3 import S3Downloader from sagemaker import Session from sagemaker.model import Model +from sagemaker.jumpstart.model import JumpStartModel from sagemaker.base_predictor import PredictorBase from sagemaker.serializers import NumpySerializer, TorchTensorSerializer from sagemaker.deserializers import JSONDeserializer, TorchTensorDeserializer @@ -75,6 +77,7 @@ ) from sagemaker.serve.save_retrive.version_1_0_0.metadata.metadata import Metadata from sagemaker.serve.spec.inference_spec import InferenceSpec +from sagemaker.serve.spec.inference_base import CustomOrchestrator, AsyncCustomOrchestrator from sagemaker.serve.utils import task from sagemaker.serve.utils.exceptions import TaskNotFoundException from sagemaker.serve.utils.lineage_utils import _maintain_lineage_tracking_for_mlflow_model @@ -102,6 +105,7 @@ _get_model_base, ) from sagemaker.serve.model_server.torchserve.prepare import prepare_for_torchserve +from sagemaker.serve.model_server.smd.prepare import prepare_for_smd from sagemaker.serve.model_server.triton.triton_builder import Triton from sagemaker.serve.utils.telemetry_logger import _capture_telemetry from sagemaker.serve.utils.types import ModelServer, ModelHub @@ -131,6 +135,7 @@ ModelServer.MMS, ModelServer.TGI, ModelServer.TEI, + ModelServer.SMD, } @@ -220,6 +225,18 @@ class ModelBuilder(Triton, DJL, JumpStart, TGI, Transformers, TensorflowServing, available for providing s3 path to fine-tuned model artifacts. ``FINE_TUNING_JOB_NAME`` is available for providing fine-tuned job name. Both ``FINE_TUNING_MODEL_PATH`` and ``FINE_TUNING_JOB_NAME`` are mutually exclusive. + inference_component_name (Optional[str]): The name for an inference component + created from this ModelBuilder instance. This or ``resource_requirements`` must be set + to denote that this instance refers to an inference component. + modelbuilder_list: Optional[List[ModelBuilder]] = List of ModelBuilder objects which + can be built in bulk and subsequently deployed in bulk. Currently only supports + deployments for inference components. + resource_requirements: Optional[ResourceRequirements] = Defines the compute resources + allocated to run the model assigned to the inference component. This or + ``inference_component_name`` must be set to denote that this instance refers + to an inference component. If ``inference_component_name`` is set but this is not and a + JumpStart model ID is specified, pre-benchmarked deployment configs will attempt to be + retrieved for the model. """ model_path: Optional[str] = field( @@ -233,7 +250,7 @@ class ModelBuilder(Triton, DJL, JumpStart, TGI, Transformers, TensorflowServing, default=None, metadata={"help": "Define sagemaker session for execution"} ) name: Optional[str] = field( - default="model-name-" + uuid.uuid1().hex, + default_factory=lambda: "model-name-" + uuid.uuid1().hex, metadata={"help": "Define the model name"}, ) mode: Optional[Mode] = field( @@ -320,6 +337,23 @@ class ModelBuilder(Triton, DJL, JumpStart, TGI, Transformers, TensorflowServing, "in the Hub, Adding unsupported task types will throw an exception." }, ) + inference_component_name: Optional[str] = field( + default=None, + metadata={ + "help": "Defines the name for an Inference Component created from this ModelBuilder." + }, + ) + modelbuilder_list: Optional[List[ModelBuilder]] = field( + default=None, + metadata={"help": "Defines a list of ModelBuilder objects."}, + ) + resource_requirements: Optional[ResourceRequirements] = field( + default=None, + metadata={ + "help": "Defines the compute resources allocated to run the model assigned" + " to the inference component." + }, + ) def _save_model_inference_spec(self): """Placeholder docstring""" @@ -465,7 +499,7 @@ def _get_client_translators(self): elif self.schema_builder: serializer = self.schema_builder.input_serializer else: - raise Exception("Cannot serialize") + raise Exception("Cannot serialize. Try providing a SchemaBuilder if not present.") deserializer = None if self.accept_type == "application/json": @@ -477,7 +511,7 @@ def _get_client_translators(self): elif self.schema_builder: deserializer = self.schema_builder.output_deserializer else: - raise Exception("Cannot deserialize") + raise Exception("Cannot deserialize. Try providing a SchemaBuilder if not present.") return serializer, deserializer @@ -562,6 +596,83 @@ def _model_builder_deploy_model_package_wrapper(self, *args, **kwargs): self.pysdk_model.model_package_arn = None return predictor + def _deploy_for_ic( + self, + *args, + ic_data: Dict[str, Any], + container_timeout_in_seconds: int = 300, + model_data_download_timeout: int = 3600, + instance_type: Optional[str] = None, + initial_instance_count: Optional[int] = None, + endpoint_name: Optional[str] = None, + **kwargs, + ) -> Predictor: + """Creates an Inference Component from a ModelBuilder.""" + ic_name = ic_data.get("Name", None) + model = ic_data.get("Model", None) + resource_requirements = ic_data.get("ResourceRequirements", {}) + + # Ensure resource requirements are set for non-JumpStart models + if not resource_requirements: + raise ValueError( + f"Cannot create/update inference component {ic_name} without resource requirements." + ) + + # Check if the Inference Component exists + if ic_name and self._does_ic_exist(ic_name=ic_name): + logger.info("Updating Inference Component %s as it already exists.", ic_name) + + # Create spec for updating the IC + startup_parameters = {} + if model_data_download_timeout is not None: + startup_parameters["ModelDataDownloadTimeoutInSeconds"] = ( + model_data_download_timeout + ) + if container_timeout_in_seconds is not None: + startup_parameters["ContainerStartupHealthCheckTimeoutInSeconds"] = ( + container_timeout_in_seconds + ) + compute_rr = resource_requirements.get_compute_resource_requirements() + inference_component_spec = { + "ModelName": self.name, + "StartupParameters": startup_parameters, + "ComputeResourceRequirements": compute_rr, + } + runtime_config = {"CopyCount": resource_requirements.copy_count} + response = self.sagemaker_session.update_inference_component( + inference_component_name=ic_name, + specification=inference_component_spec, + runtime_config=runtime_config, + ) + return Predictor(endpoint_name=response.get("EndpointName"), component_name=ic_name) + else: + kwargs.update( + { + "resources": resource_requirements, + "endpoint_type": EndpointType.INFERENCE_COMPONENT_BASED, + "inference_component_name": ic_name, + "endpoint_logging": False, + } + ) + return model.deploy( + *args, + container_startup_health_check_timeout=container_timeout_in_seconds, + initial_instance_count=initial_instance_count, + instance_type=instance_type, + mode=Mode.SAGEMAKER_ENDPOINT, + endpoint_name=endpoint_name, + **kwargs, + ) + + def _does_ic_exist(self, ic_name: str) -> bool: + """Returns true if an Inference Component exists with the given name.""" + try: + self.sagemaker_session.describe_inference_component(inference_component_name=ic_name) + return True + except ClientError as e: + msg = e.response["Error"]["Message"] + return "Could not find inference component" not in msg + @_capture_telemetry("torchserve.deploy") def _model_builder_deploy_wrapper( self, @@ -615,6 +726,13 @@ def _model_builder_deploy_wrapper( if "endpoint_logging" not in kwargs: kwargs["endpoint_logging"] = True + + if "inference_component_name" not in kwargs and self.inference_component_name: + kwargs["inference_component_name"] = self.inference_component_name + + if "resources" not in kwargs and self.resource_requirements: + kwargs["resources"] = self.resource_requirements + kwargs.pop("mode", None) self.pysdk_model.role = kwargs.pop("role", self.pysdk_model.role) predictor = self._original_deploy( @@ -673,6 +791,24 @@ def _build_for_torchserve(self) -> Type[Model]: self.model = self._create_model() return self.model + def _build_for_smd(self) -> Type[Model]: + """Build the model for SageMaker Distribution""" + self._save_model_inference_spec() + + if self.mode != Mode.IN_PROCESS: + self._auto_detect_container() + + self.secret_key = prepare_for_smd( + model_path=self.model_path, + shared_libs=self.shared_libs, + dependencies=self.dependencies, + inference_spec=self.inference_spec, + ) + + self._prepare_for_mode() + self.model = self._create_model() + return self.model + def _user_agent_decorator(self, func): """Placeholder docstring""" @@ -854,13 +990,225 @@ def _collect_estimator_model_telemetry(self): """Dummy method to collect telemetry for estimator handshake""" return + def build( + self, + mode: Type[Mode] = None, + role_arn: str = None, + sagemaker_session: Optional[Session] = None, + ) -> Union[ModelBuilder, Type[Model]]: + """Creates deployable ``Model`` instances with all provided ``ModelBuilder`` objects. + + Args: + mode (Type[Mode], optional): The mode. Defaults to ``None``. + role_arn (str, optional): The IAM role arn. Defaults to ``None``. + sagemaker_session (Optional[Session]): Session object which manages interactions + with Amazon SageMaker APIs and any other AWS services needed. If not specified, the + function creates one using the default AWS configuration chain. + + Returns: + Union[ModelBuilder, Type[Model]]: A deployable ``ModelBuilder`` object if multiple + ``ModelBuilders`` were built, or a deployable ``Model`` object. + """ + if role_arn: + self.role_arn = role_arn + self.sagemaker_session = sagemaker_session or self.sagemaker_session or Session() + + deployables = {} + + if not self.modelbuilder_list and not isinstance( + self.inference_spec, (CustomOrchestrator, AsyncCustomOrchestrator) + ): + self.serve_settings = self._get_serve_setting() + return self._build_single_modelbuilder( + mode=mode, + role_arn=self.role_arn, + sagemaker_session=sagemaker_session, + ) + + # Multi-ModelBuilder case: deploy + built_ic_models = [] + if self.modelbuilder_list: + logger.info("Detected ModelBuilders in modelbuilder_list.") + for mb in self.modelbuilder_list: + if mb.mode == Mode.IN_PROCESS or mb.mode == Mode.LOCAL_CONTAINER: + raise ValueError( + "Bulk ModelBuilder building is only supported for SageMaker Endpoint Mode." + ) + + if (not mb.resource_requirements and not mb.inference_component_name) and ( + not mb.inference_spec + or not isinstance( + mb.inference_spec, (CustomOrchestrator, AsyncCustomOrchestrator) + ) + ): + raise ValueError( + "Bulk ModelBuilder building is only supported for Inference Components " + + "and custom orchestrators." + ) + + for mb in self.modelbuilder_list: + # Custom orchestrator definition found in inference_spec + mb.serve_settings = mb._get_serve_setting() + # Build for Inference Component + logger.info("Building ModelBuilder %s.", mb.name) + # Get JS deployment configs if ResourceRequirements not set + + mb = mb._get_ic_resource_requirements(mb=mb) + + built_model = mb._build_single_modelbuilder( + role_arn=self.role_arn, sagemaker_session=self.sagemaker_session + ) + built_ic_models.append( + { + "Name": mb.inference_component_name, + "ResourceRequirements": mb.resource_requirements, + "Model": built_model, + } + ) + logger.info( + "=====================Build for %s complete.===================", + mb.model, + ) + deployables["InferenceComponents"] = built_ic_models + + if isinstance(self.inference_spec, (CustomOrchestrator, AsyncCustomOrchestrator)): + logger.info("Building custom orchestrator.") + if self.mode == Mode.IN_PROCESS or self.mode == Mode.LOCAL_CONTAINER: + raise ValueError( + "Custom orchestrator deployment is only supported for" + "SageMaker Endpoint Mode." + ) + self.serve_settings = self._get_serve_setting() + cpu_or_gpu_instance = self._get_processing_unit() + self.image_uri = self._get_smd_image_uri(processing_unit=cpu_or_gpu_instance) + self.model_server = ModelServer.SMD + built_orchestrator = self._build_single_modelbuilder( + mode=Mode.SAGEMAKER_ENDPOINT, + role_arn=role_arn, + sagemaker_session=sagemaker_session, + ) + if not self.resource_requirements: + logger.info( + "Custom orchestrator resource_requirements not found. " + "Building as a SageMaker Endpoint instead of Inference Component." + ) + deployables["CustomOrchestrator"] = { + "Mode": "Endpoint", + "Model": built_orchestrator, + } + else: + # Network isolation of ICs on an endpoint must be consistent + if built_ic_models: + if ( + self.dependencies["auto"] + or "requirements" in self.dependencies + or "custom" in self.dependencies + ): + logger.warning( + "Custom orchestrator network isolation must be False when dependencies " + "are specified or using autocapture. To enable network isolation, " + "package all dependencies in the container or model artifacts " + "ahead of time." + ) + built_orchestrator._enable_network_isolation = False + for model in built_ic_models: + model["Model"]._enable_network_isolation = False + deployables["CustomOrchestrator"] = { + "Name": self.inference_component_name, + "Mode": "InferenceComponent", + "ResourceRequirements": self.resource_requirements, + "Model": built_orchestrator, + } + + logger.info( + "=====================Custom orchestrator build complete.===================", + ) + + self._deployables = deployables + return self + + def _get_processing_unit(self): + """Detects if the resource requirements are intended for a CPU or GPU instance.""" + # Assume custom orchestrator will be deployed as an endpoint to a CPU instance + if not self.resource_requirements or not self.resource_requirements.num_accelerators: + return "cpu" + for ic in self.modelbuilder_list or []: + if ic.resource_requirements.num_accelerators > 0: + return "gpu" + if self.resource_requirements.num_accelerators > 0: + return "gpu" + + return "cpu" + + def _get_ic_resource_requirements(self, mb: ModelBuilder = None) -> ModelBuilder: + """Attempts fetching pre-benchmarked resource requirements for the MB from JumpStart.""" + if mb._is_jumpstart_model_id() and not mb.resource_requirements: + js_model = JumpStartModel(model_id=mb.model) + deployment_configs = js_model.list_deployment_configs() + if not deployment_configs: + raise ValueError( + "No resource requirements were provided for Inference Component " + f"{mb.inference_component_name} and no default deployment" + " configs were found in JumpStart." + ) + compute_requirements = ( + deployment_configs[0].get("DeploymentArgs").get("ComputeResourceRequirements") + ) + logger.info("Retrieved pre-benchmarked deployment configurations from JumpStart.") + mb.resource_requirements = ResourceRequirements( + requests={ + "memory": compute_requirements["MinMemoryRequiredInMb"], + "num_accelerators": compute_requirements.get( + "NumberOfAcceleratorDevicesRequired", None + ), + "copies": 1, + "num_cpus": compute_requirements.get("NumberOfCpuCoresRequired", None), + }, + limits={"memory": compute_requirements.get("MaxMemoryRequiredInMb", None)}, + ) + + return mb + + @_capture_telemetry("build_custom_orchestrator") + def _get_smd_image_uri(self, processing_unit: str = None) -> str: + """Gets the SMD Inference Image URI. + + Returns: + str: SMD Inference Image URI. + """ + from sagemaker import image_uris + import sys + + self.sagemaker_session = self.sagemaker_session or Session() + from packaging.version import Version + + formatted_py_version = f"py{sys.version_info.major}{sys.version_info.minor}" + if Version(f"{sys.version_info.major}{sys.version_info.minor}") < Version("3.12"): + raise ValueError( + f"Found Python version {formatted_py_version} but" + f"Custom orchestrator deployment requires Python version >= 3.12." + ) + + INSTANCE_TYPES = {"cpu": "ml.c5.xlarge", "gpu": "ml.g5.4xlarge"} + + logger.info("Finding SMD inference image URI for a %s instance.", processing_unit) + + smd_uri = image_uris.retrieve( + framework="sagemaker-distribution", + image_scope="inference", + instance_type=INSTANCE_TYPES[processing_unit], + region=self.sagemaker_session.boto_region_name, + ) + logger.info("Found compatible image %s", smd_uri) + return smd_uri + # Model Builder is a class to build the model for deployment. # It supports three modes of deployment # 1/ SageMaker Endpoint # 2/ Local launch with container # 3/ In process mode with Transformers server in beta release @_capture_telemetry("ModelBuilder.build") - def build( # pylint: disable=R0911 + def _build_single_modelbuilder( # pylint: disable=R0911 self, mode: Type[Mode] = None, role_arn: str = None, @@ -1039,6 +1387,9 @@ def _build_for_model_server(self): # pylint: disable=R0911, R1710 if self.model_server == ModelServer.MMS: return self._build_for_transformers() + if self.model_server == ModelServer.SMD: + return self._build_for_smd() + @_capture_telemetry("ModelBuilder.save") def save( self, @@ -1593,6 +1944,8 @@ def _optimize_prepare_for_hf(self): def deploy( self, endpoint_name: str = None, + container_timeout_in_second: int = 300, + instance_type: str = None, initial_instance_count: Optional[int] = 1, inference_config: Optional[ Union[ @@ -1603,7 +1956,10 @@ def deploy( ] ] = None, update_endpoint: Optional[bool] = False, - ) -> Union[Predictor, Transformer]: + custom_orchestrator_instance_type: str = None, + custom_orchestrator_initial_instance_count: int = None, + **kwargs, + ) -> Union[Predictor, Transformer, List[Predictor]]: """Deploys the built Model. Depending on the type of config provided, this function will call deployment accordingly. @@ -1625,42 +1981,43 @@ def deploy( Transformer for Batch Deployments Predictors for all others """ - if not hasattr(self, "built_model"): - raise ValueError("Model Needs to be built before deploying") + if not hasattr(self, "built_model") and not hasattr(self, "_deployables"): + raise ValueError("Model needs to be built before deploying") if not update_endpoint: endpoint_name = unique_name_from_base(endpoint_name) - if not inference_config: # Real-time Deployment - return self.built_model.deploy( - instance_type=self.instance_type, - initial_instance_count=initial_instance_count, - endpoint_name=endpoint_name, - update_endpoint=update_endpoint, - ) + if not hasattr(self, "_deployables"): + if not inference_config: # Real-time Deployment + return self.built_model.deploy( + instance_type=self.instance_type, + initial_instance_count=initial_instance_count, + endpoint_name=endpoint_name, + update_endpoint=update_endpoint, + ) - if isinstance(inference_config, ServerlessInferenceConfig): - return self.built_model.deploy( - serverless_inference_config=inference_config, - endpoint_name=endpoint_name, - update_endpoint=update_endpoint, - ) + if isinstance(inference_config, ServerlessInferenceConfig): + return self.built_model.deploy( + serverless_inference_config=inference_config, + endpoint_name=endpoint_name, + update_endpoint=update_endpoint, + ) - if isinstance(inference_config, AsyncInferenceConfig): - return self.built_model.deploy( - instance_type=self.instance_type, - initial_instance_count=initial_instance_count, - async_inference_config=inference_config, - endpoint_name=endpoint_name, - update_endpoint=update_endpoint, - ) + if isinstance(inference_config, AsyncInferenceConfig): + return self.built_model.deploy( + instance_type=self.instance_type, + initial_instance_count=initial_instance_count, + async_inference_config=inference_config, + endpoint_name=endpoint_name, + update_endpoint=update_endpoint, + ) - if isinstance(inference_config, BatchTransformInferenceConfig): - transformer = self.built_model.transformer( - instance_type=inference_config.instance_type, - output_path=inference_config.output_path, - instance_count=inference_config.instance_count, - ) - return transformer + if isinstance(inference_config, BatchTransformInferenceConfig): + transformer = self.built_model.transformer( + instance_type=inference_config.instance_type, + output_path=inference_config.output_path, + instance_count=inference_config.instance_count, + ) + return transformer if isinstance(inference_config, ResourceRequirements): if update_endpoint: @@ -1678,7 +2035,61 @@ def deploy( update_endpoint=update_endpoint, ) - raise ValueError("Deployment Options not supported") + raise ValueError("Deployment Options not supported") + + # Iterate through deployables for a custom orchestrator deployment. + # Create all Inference Components first before deploying custom orchestrator if present. + predictors = [] + for inference_component in self._deployables.get("InferenceComponents", []): + predictors.append( + self._deploy_for_ic( + ic_data=inference_component, + container_timeout_in_seconds=container_timeout_in_second, + instance_type=instance_type, + initial_instance_count=initial_instance_count, + endpoint_name=endpoint_name, + **kwargs, + ) + ) + if self._deployables.get("CustomOrchestrator", None): + custom_orchestrator = self._deployables.get("CustomOrchestrator") + if not custom_orchestrator_instance_type and not instance_type: + logger.warning( + "Deploying custom orchestrator as an endpoint but no instance type was " + "set. Defaulting to `ml.c5.xlarge`." + ) + custom_orchestrator_instance_type = "ml.c5.xlarge" + custom_orchestrator_initial_instance_count = 1 + if custom_orchestrator["Mode"] == "Endpoint": + logger.info( + "Deploying custom orchestrator on instance type %s.", + custom_orchestrator_instance_type, + ) + predictors.append( + custom_orchestrator["Model"].deploy( + instance_type=custom_orchestrator_instance_type, + initial_instance_count=custom_orchestrator_initial_instance_count, + **kwargs, + ) + ) + elif custom_orchestrator["Mode"] == "InferenceComponent": + logger.info( + "Deploying custom orchestrator as an inference component " + f"to endpoint {endpoint_name}" + ) + predictors.append( + self._deploy_for_ic( + ic_data=custom_orchestrator, + container_timeout_in_seconds=container_timeout_in_second, + instance_type=custom_orchestrator_instance_type or instance_type, + initial_instance_count=custom_orchestrator_initial_instance_count + or initial_instance_count, + endpoint_name=endpoint_name, + **kwargs, + ) + ) + + return predictors def display_benchmark_metrics(self, **kwargs): """Display Markdown Benchmark Metrics for deployment configs.""" diff --git a/src/sagemaker/serve/mode/sagemaker_endpoint_mode.py b/src/sagemaker/serve/mode/sagemaker_endpoint_mode.py index 2f09d3d572..2b4473a706 100644 --- a/src/sagemaker/serve/mode/sagemaker_endpoint_mode.py +++ b/src/sagemaker/serve/mode/sagemaker_endpoint_mode.py @@ -16,10 +16,13 @@ from sagemaker.serve.model_server.djl_serving.server import SageMakerDjlServing from sagemaker.serve.model_server.tgi.server import SageMakerTgiServing from sagemaker.serve.model_server.multi_model_server.server import SageMakerMultiModelServer +from sagemaker.serve.model_server.smd.server import SageMakerSmdServer + logger = logging.getLogger(__name__) +# pylint: disable=R0901 class SageMakerEndpointMode( SageMakerTorchServe, SageMakerTritonServer, @@ -27,6 +30,7 @@ class SageMakerEndpointMode( SageMakerTgiServing, SageMakerMultiModelServer, SageMakerTensorflowServing, + SageMakerSmdServer, ): """Holds the required method to deploy a model to a SageMaker Endpoint""" @@ -144,6 +148,16 @@ def prepare( should_upload_artifacts=should_upload_artifacts, ) + if self.model_server == ModelServer.SMD: + upload_artifacts = self._upload_smd_artifacts( + model_path=model_path, + sagemaker_session=sagemaker_session, + secret_key=secret_key, + s3_model_data_url=s3_model_data_url, + image=image, + should_upload_artifacts=True, + ) + if upload_artifacts or isinstance(self.model_server, ModelServer): return upload_artifacts diff --git a/src/sagemaker/serve/model_server/smd/custom_execution_inference.py b/src/sagemaker/serve/model_server/smd/custom_execution_inference.py new file mode 100644 index 0000000000..f53677fc69 --- /dev/null +++ b/src/sagemaker/serve/model_server/smd/custom_execution_inference.py @@ -0,0 +1,72 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""This module is for SageMaker inference.py.""" + +from __future__ import absolute_import +import asyncio +import os +import platform +import cloudpickle +import logging +from pathlib import Path +from sagemaker.serve.validations.check_integrity import perform_integrity_check + +logger = LOGGER = logging.getLogger("sagemaker") + + +def initialize_custom_orchestrator(): + """Initializes the custom orchestrator.""" + code_dir = os.getenv("SAGEMAKER_INFERENCE_CODE_DIRECTORY", None) + serve_path = Path(code_dir).joinpath("serve.pkl") + with open(str(serve_path), mode="rb") as pkl_file: + return cloudpickle.load(pkl_file) + + +def _run_preflight_diagnostics(): + _py_vs_parity_check() + _pickle_file_integrity_check() + + +def _py_vs_parity_check(): + container_py_vs = platform.python_version() + local_py_vs = os.getenv("LOCAL_PYTHON") + + if not local_py_vs or container_py_vs.split(".")[1] != local_py_vs.split(".")[1]: + logger.warning( + f"The local python version {local_py_vs} differs from the python version " + f"{container_py_vs} on the container. Please align the two to avoid unexpected behavior" + ) + + +def _pickle_file_integrity_check(): + with open("/opt/ml/model/code/serve.pkl", "rb") as f: + buffer = f.read() + + metadata_path = Path("/opt/ml/model/code/metadata.json") + perform_integrity_check(buffer=buffer, metadata_path=metadata_path) + + +_run_preflight_diagnostics() +custom_orchestrator, _ = initialize_custom_orchestrator() + + +async def handler(request): + """Custom service entry point function. + + :param request: raw input from request + :return: outputs to be send back to client + """ + if asyncio.iscoroutinefunction(custom_orchestrator.handle): + return await custom_orchestrator.handle(request.body) + else: + return custom_orchestrator.handle(request.body) diff --git a/src/sagemaker/serve/model_server/smd/prepare.py b/src/sagemaker/serve/model_server/smd/prepare.py new file mode 100644 index 0000000000..6461e4023f --- /dev/null +++ b/src/sagemaker/serve/model_server/smd/prepare.py @@ -0,0 +1,74 @@ +"""Summary of MyModule. + +Extended discussion of my module. +""" + +from __future__ import absolute_import +import os +from pathlib import Path +import shutil +from typing import List + +from sagemaker.serve.spec.inference_spec import InferenceSpec +from sagemaker.serve.detector.dependency_manager import capture_dependencies +from sagemaker.serve.validations.check_integrity import ( + generate_secret_key, + compute_hash, +) +from sagemaker.remote_function.core.serialization import _MetaData +from sagemaker.serve.spec.inference_base import CustomOrchestrator, AsyncCustomOrchestrator + + +def prepare_for_smd( + model_path: str, + shared_libs: List[str], + dependencies: dict, + inference_spec: InferenceSpec = None, +) -> str: + """Prepares artifacts for SageMaker model deployment. + + Args:to + model_path (str) : Argument + shared_libs (List[]) : Argument + dependencies (dict) : Argument + inference_spec (InferenceSpec, optional) : Argument + (default is None) + + Returns: + ( str ) : + + """ + model_path = Path(model_path) + if not model_path.exists(): + model_path.mkdir() + elif not model_path.is_dir(): + raise Exception("model_dir is not a valid directory") + + if inference_spec and isinstance(inference_spec, InferenceSpec): + inference_spec.prepare(str(model_path)) + + code_dir = model_path.joinpath("code") + code_dir.mkdir(exist_ok=True) + + if inference_spec and isinstance(inference_spec, (CustomOrchestrator, AsyncCustomOrchestrator)): + shutil.copy2(Path(__file__).parent.joinpath("custom_execution_inference.py"), code_dir) + os.rename( + str(code_dir.joinpath("custom_execution_inference.py")), + str(code_dir.joinpath("inference.py")), + ) + + shared_libs_dir = model_path.joinpath("shared_libs") + shared_libs_dir.mkdir(exist_ok=True) + for shared_lib in shared_libs: + shutil.copy2(Path(shared_lib), shared_libs_dir) + + capture_dependencies(dependencies=dependencies, work_dir=code_dir) + + secret_key = generate_secret_key() + with open(str(code_dir.joinpath("serve.pkl")), "rb") as f: + buffer = f.read() + hash_value = compute_hash(buffer=buffer, secret_key=secret_key) + with open(str(code_dir.joinpath("metadata.json")), "wb") as metadata: + metadata.write(_MetaData(hash_value).to_json()) + + return secret_key diff --git a/src/sagemaker/serve/model_server/smd/server.py b/src/sagemaker/serve/model_server/smd/server.py new file mode 100644 index 0000000000..c700c39727 --- /dev/null +++ b/src/sagemaker/serve/model_server/smd/server.py @@ -0,0 +1,59 @@ +"""Module for SMD Server""" + +from __future__ import absolute_import + +import logging +import platform +from sagemaker.serve.utils.optimize_utils import _is_s3_uri +from sagemaker.session import Session +from sagemaker.s3_utils import determine_bucket_and_prefix, parse_s3_url +from sagemaker import fw_utils +from sagemaker.serve.utils.uploader import upload + +logger = logging.getLogger(__name__) + + +class SageMakerSmdServer: + """Placeholder docstring""" + + def _upload_smd_artifacts( + self, + model_path: str, + sagemaker_session: Session, + secret_key: str, + s3_model_data_url: str = None, + image: str = None, + should_upload_artifacts: bool = False, + ): + """Tar the model artifact and upload to S3 bucket, then prepare for the environment variables""" + s3_upload_path = None + if _is_s3_uri(model_path): + s3_upload_path = model_path + elif should_upload_artifacts: + if s3_model_data_url: + bucket, key_prefix = parse_s3_url(url=s3_model_data_url) + else: + bucket, key_prefix = None, None + + code_key_prefix = fw_utils.model_code_key_prefix(key_prefix, None, image) + + bucket, code_key_prefix = determine_bucket_and_prefix( + bucket=bucket, key_prefix=code_key_prefix, sagemaker_session=sagemaker_session + ) + + logger.debug( + "Uploading the model resources to bucket=%s, key_prefix=%s.", + bucket, + code_key_prefix, + ) + s3_upload_path = upload(sagemaker_session, model_path, bucket, code_key_prefix) + logger.debug("Model resources uploaded to: %s", s3_upload_path) + + env_vars = { + "SAGEMAKER_INFERENCE_CODE_DIRECTORY": "/opt/ml/model/code", + "SAGEMAKER_INFERENCE_CODE": "inference.handler", + "SAGEMAKER_REGION": sagemaker_session.boto_region_name, + "SAGEMAKER_SERVE_SECRET_KEY": secret_key, + "LOCAL_PYTHON": platform.python_version(), + } + return s3_upload_path, env_vars diff --git a/src/sagemaker/serve/spec/inference_base.py b/src/sagemaker/serve/spec/inference_base.py new file mode 100644 index 0000000000..23ea6cb01d --- /dev/null +++ b/src/sagemaker/serve/spec/inference_base.py @@ -0,0 +1,45 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Holds templated classes to enable users to provide custom inference scripting capabilities""" +from __future__ import absolute_import +from abc import ABC, abstractmethod + + +class CustomOrchestrator(ABC): + """Templated class to standardize sync entrypoint-based inference scripts""" + + def __init__(self): + self._client = None + + @property + def client(self): + """Boto3 SageMaker runtime client to use with custom orchestrator""" + if not hasattr(self, "_client") or not self._client: + from boto3 import Session + + self._client = Session().client("sagemaker-runtime") + return self._client + + @abstractmethod + def handle(self, data, context=None): + """Abstract class for defining an entrypoint for the model server""" + return NotImplemented + + +class AsyncCustomOrchestrator(ABC): + """Templated class to standardize async entrypoint-based inference scripts""" + + @abstractmethod + async def handle(self, data, context=None): + """Abstract class for defining an aynchronous entrypoint for the model server""" + return NotImplemented diff --git a/src/sagemaker/serve/utils/telemetry_logger.py b/src/sagemaker/serve/utils/telemetry_logger.py index c02fe9bf78..6e7db9043b 100644 --- a/src/sagemaker/serve/utils/telemetry_logger.py +++ b/src/sagemaker/serve/utils/telemetry_logger.py @@ -64,6 +64,7 @@ str(ModelServer.TRITON): 5, str(ModelServer.TGI): 6, str(ModelServer.TEI): 7, + str(ModelServer.SMD): 8, } MLFLOW_MODEL_PATH_CODE = { diff --git a/src/sagemaker/serve/utils/types.py b/src/sagemaker/serve/utils/types.py index e50be62440..b405d85b21 100644 --- a/src/sagemaker/serve/utils/types.py +++ b/src/sagemaker/serve/utils/types.py @@ -19,6 +19,7 @@ def __str__(self): TRITON = 5 TGI = 6 TEI = 7 + SMD = 8 class HardwareType(Enum): diff --git a/tests/integ/sagemaker/serve/constants.py b/tests/integ/sagemaker/serve/constants.py index d5e7a56f83..3f25f6a575 100644 --- a/tests/integ/sagemaker/serve/constants.py +++ b/tests/integ/sagemaker/serve/constants.py @@ -25,6 +25,7 @@ PYTHON_VERSION_IS_NOT_38 = platform.python_version_tuple()[1] != "8" PYTHON_VERSION_IS_NOT_310 = platform.python_version_tuple()[1] != "10" +PYTHON_VERSION_IS_NOT_312 = platform.python_version_tuple()[1] != "12" XGB_RESOURCE_DIR = os.path.join(DATA_DIR, "serve_resources", "xgboost") PYTORCH_SQUEEZENET_RESOURCE_DIR = os.path.join(DATA_DIR, "serve_resources", "pytorch") diff --git a/tests/integ/sagemaker/serve/test_serve_model_builder_inference_component_happy.py b/tests/integ/sagemaker/serve/test_serve_model_builder_inference_component_happy.py new file mode 100644 index 0000000000..b72b84aeac --- /dev/null +++ b/tests/integ/sagemaker/serve/test_serve_model_builder_inference_component_happy.py @@ -0,0 +1,149 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import pytest +import tests.integ + +from botocore.exceptions import ClientError +from sagemaker.predictor import Predictor +from sagemaker.serve.builder.model_builder import ModelBuilder +from sagemaker.serve.builder.schema_builder import SchemaBuilder +from sagemaker.compute_resource_requirements.resource_requirements import ResourceRequirements +from sagemaker.utils import unique_name_from_base + +from tests.integ.sagemaker.serve.constants import ( + SERVE_SAGEMAKER_ENDPOINT_TIMEOUT, +) +from tests.integ.timeout import timeout +import logging + +logger = logging.getLogger(__name__) + +sample_input = {"inputs": "What are falcons?", "parameters": {"max_new_tokens": 32}} + +sample_output = [ + { + "generated_text": "Falcons are small to medium-sized birds of prey related to hawks and eagles." + } +] + +LLAMA_2_7B_JS_ID = "meta-textgeneration-llama-2-7b" +LLAMA_IC_NAME = "llama2-mb-ic" +INSTANCE_TYPE = "ml.g5.24xlarge" + + +@pytest.fixture +def model_builder_llama_inference_component(): + return ModelBuilder( + model=LLAMA_2_7B_JS_ID, + schema_builder=SchemaBuilder(sample_input, sample_output), + resource_requirements=ResourceRequirements( + requests={"memory": 98304, "num_accelerators": 4, "copies": 1, "num_cpus": 40} + ), + ) + + +@pytest.mark.skipif( + tests.integ.test_region() not in "us-west-2", + reason="G5 capacity available in PDX.", +) +def test_model_builder_ic_sagemaker_endpoint( + sagemaker_session, + model_builder_llama_inference_component, +): + logger.info("Running in SAGEMAKER_ENDPOINT mode...") + caught_ex = None + + model_builder_llama_inference_component.sagemaker_session = sagemaker_session + model_builder_llama_inference_component.instance_type = INSTANCE_TYPE + + model_builder_llama_inference_component.inference_component_name = unique_name_from_base( + LLAMA_IC_NAME + ) + + iam_client = sagemaker_session.boto_session.client("iam") + role_arn = iam_client.get_role(RoleName="SageMakerRole")["Role"]["Arn"] + + chain = ModelBuilder( + modelbuilder_list=[ + model_builder_llama_inference_component, + ], + role_arn=role_arn, + sagemaker_session=sagemaker_session, + ) + + chain.build() + + with timeout(minutes=SERVE_SAGEMAKER_ENDPOINT_TIMEOUT): + try: + logger.info("Deploying and predicting in SAGEMAKER_ENDPOINT mode...") + endpoint_name = "llama-ic-endpoint-name" + predictors = chain.deploy( + instance_type=INSTANCE_TYPE, + initial_instance_count=1, + accept_eula=True, + endpoint_name=endpoint_name, + ) + logger.info("Inference components successfully deployed.") + predictors[0].predict(sample_input) + assert len(predictors) == 1 + except Exception as e: + caught_ex = e + finally: + if caught_ex: + logger.exception(caught_ex) + cleanup_resources(sagemaker_session, [LLAMA_IC_NAME]) + assert False, f"{caught_ex} thrown when running mb-IC deployment test." + + cleanup_resources(sagemaker_session, [LLAMA_IC_NAME]) + + +def cleanup_resources(sagemaker_session, ic_base_names): + sm_client = sagemaker_session.sagemaker_client + + endpoint_names = set() + for ic_base_name in ic_base_names: + response = sm_client.list_inference_components( + NameContains=ic_base_name, StatusEquals="InService" + ) + ics = response["InferenceComponents"] + + logger.info(f"Cleaning up {len(ics)} ICs with base name {ic_base_name}.") + for ic in ics: + ic_name = ic["InferenceComponentName"] + ep_name = ic["EndpointName"] + + try: + logger.info(f"Deleting IC with name {ic_name}") + Predictor( + endpoint_name=ep_name, + component_name=ic_name, + sagemaker_session=sagemaker_session, + ).delete_predictor() + sagemaker_session.wait_for_inference_component_deletion( + inference_component_name=ic_name, + poll=10, + ) + endpoint_names.add(ep_name) + except ClientError as e: + logger.warning(e) + + for endpoint_name in endpoint_names: + logger.info(f"Deleting endpoint with name {endpoint_name}") + try: + Predictor( + endpoint_name=endpoint_name, sagemaker_session=sagemaker_session + ).delete_endpoint() + except ClientError as e: + logger.warning(e) diff --git a/tests/unit/sagemaker/image_uris/expected_uris.py b/tests/unit/sagemaker/image_uris/expected_uris.py index 01e4d4991f..eb198454fc 100644 --- a/tests/unit/sagemaker/image_uris/expected_uris.py +++ b/tests/unit/sagemaker/image_uris/expected_uris.py @@ -107,3 +107,12 @@ def base_python_uri(repo, account, region=REGION): domain = ALTERNATE_DOMAINS.get(region, DOMAIN) tag = "1.0" return IMAGE_URI_FORMAT.format(account, region, domain, repo, tag) + + +def sagemaker_distribution_uri(repo, account, tag, processor, region=REGION): + domain = ALTERNATE_DOMAINS.get(region, DOMAIN) + if processor == "cpu": + tag = f"{tag}-cpu" + else: + tag = f"{tag}-gpu" + return IMAGE_URI_FORMAT.format(account, region, domain, repo, tag) diff --git a/tests/unit/sagemaker/image_uris/test_sagemaker_distribution.py b/tests/unit/sagemaker/image_uris/test_sagemaker_distribution.py new file mode 100644 index 0000000000..d339a50b2e --- /dev/null +++ b/tests/unit/sagemaker/image_uris/test_sagemaker_distribution.py @@ -0,0 +1,47 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import pytest +from sagemaker import image_uris +from tests.unit.sagemaker.image_uris import expected_uris + +INSTANCE_TYPES = {"cpu": "ml.c4.xlarge", "gpu": "ml.p2.xlarge"} + + +def _test_ecr_uri(account, region, version, tag, instance_type, processor): + actual_uri = image_uris.retrieve( + "sagemaker-distribution", region=region, instance_type=instance_type, version=version + ) + expected_uri = expected_uris.sagemaker_distribution_uri( + "sagemaker-distribution-prod", account, tag, processor, region + ) + return expected_uri == actual_uri + + +@pytest.mark.parametrize("load_config", ["sagemaker-distribution.json"], indirect=True) +def test_sagemaker_distribution_ecr_uri(load_config): + VERSIONS = load_config["versions"] + processors = load_config["processors"] + for version in VERSIONS: + SAGEMAKER_DISTRIBUTION_ACCOUNTS = load_config["versions"][version]["registries"] + for region in SAGEMAKER_DISTRIBUTION_ACCOUNTS.keys(): + for processor in processors: + assert _test_ecr_uri( + account=SAGEMAKER_DISTRIBUTION_ACCOUNTS[region], + region=region, + version=version, + tag="3.0.0", + instance_type=INSTANCE_TYPES[processor], + processor=processor, + ) diff --git a/tests/unit/sagemaker/serve/builder/test_model_builder.py b/tests/unit/sagemaker/serve/builder/test_model_builder.py index 6661c6e2bf..de4304d63d 100644 --- a/tests/unit/sagemaker/serve/builder/test_model_builder.py +++ b/tests/unit/sagemaker/serve/builder/test_model_builder.py @@ -74,6 +74,7 @@ ModelServer.MMS, ModelServer.TGI, ModelServer.TEI, + ModelServer.SMD, } mock_session = MagicMock() @@ -2890,6 +2891,86 @@ def test_optimize_for_hf_without_custom_s3_path( }, ) + @patch("sagemaker.serve.builder.model_builder._ServeSettings") + @patch("sagemaker.serve.builder.model_builder.ModelBuilder._build_for_jumpstart") + @patch( + "sagemaker.serve.builder.model_builder.ModelBuilder._is_jumpstart_model_id", + return_value=True, + ) + @patch( + "sagemaker.serve.builder.jumpstart_builder.JumpStart._create_pre_trained_js_model", + return_value=MagicMock(), + ) + def test_build_multiple_inference_component_modelbuilders( + self, + mock_pre_trained_model, + mock_is_jumpstart_model_id, + mock_build_for_js, + mock_serve_settings, + ): + mock_setting_object = mock_serve_settings.return_value + mock_setting_object.role_arn = mock_role_arn + mock_setting_object.s3_model_data_url = mock_s3_model_data_url + + builder1 = ModelBuilder( + model="gpt_llm_burt", inference_component_name="ic1", resource_requirements=Mock() + ) + builder2 = ModelBuilder( + model="gpt_llm_burt", inference_component_name="ic2", resource_requirements=Mock() + ) + + builder3 = ModelBuilder( + model="gpt_llm_burt", inference_component_name="ic3", resource_requirements=Mock() + ) + + chain_builder = ModelBuilder( + modelbuilder_list=[builder1, builder2, builder3], + ) + chain_builder.build(sagemaker_session=mock_session) + assert mock_build_for_js.call_count == 3 + + @patch("sagemaker.serve.builder.model_builder._ServeSettings") + @patch("sagemaker.serve.builder.model_builder.ModelBuilder._build_for_jumpstart") + @patch( + "sagemaker.serve.builder.model_builder.ModelBuilder._is_jumpstart_model_id", + return_value=True, + ) + @patch( + "sagemaker.serve.builder.jumpstart_builder.JumpStart._create_pre_trained_js_model", + return_value=MagicMock(), + ) + @patch( + "sagemaker.serve.builder.model_builder.ModelBuilder._does_ic_exist", + return_value=True, + ) + @patch( + "sagemaker.session.Session.update_inference_component", + return_value=MagicMock(), + ) + def test_deploy_existing_inference_component_calls_update_inference_component( + self, + mock_update_inference_component, + mock_ic_exists, + mock_pre_trained_model, + mock_is_jumpstart_model_id, + mock_build_for_js, + mock_serve_settings, + ): + mock_setting_object = mock_serve_settings.return_value + mock_setting_object.role_arn = mock_role_arn + mock_setting_object.s3_model_data_url = mock_s3_model_data_url + + builder1 = ModelBuilder( + model="gpt_llm_burt", inference_component_name="ic1", resource_requirements=Mock() + ) + + chain_builder = ModelBuilder( + modelbuilder_list=[builder1], + ).build() + inputs = {"endpoint_name": "endpoint-001"} + chain_builder.deploy(**inputs) + assert mock_update_inference_component.call_count == 1 + def test_deploy_invalid_inputs(self): model_builder = ModelBuilder( model="meta-llama/Meta-Llama-3-8B-Instruct", @@ -2902,7 +2983,7 @@ def test_deploy_invalid_inputs(self): try: model_builder.deploy(**inputs) except ValueError as e: - assert "Model Needs to be built before deploying" in str(e) + assert "Model needs to be built before deploying" in str(e) @patch("sagemaker.serve.builder.model_builder.ModelBuilder._is_jumpstart_model_id") def test_display_benchmark_metrics_non_string_model(self, mock_is_jumpstart):