From 7759f1b481369da0bed21c0b2569633ca36947df Mon Sep 17 00:00:00 2001 From: sindhuso Date: Wed, 26 Apr 2023 22:39:47 -0700 Subject: [PATCH] feature: Partition support for DJLModel using SM Training job --- doc/frameworks/djl/using_djl.rst | 25 +++ src/sagemaker/djl_inference/model.py | 318 +++++++++++++++++++++++++-- tests/unit/test_djl_inference.py | 70 ++++++ 3 files changed, 390 insertions(+), 23 deletions(-) diff --git a/doc/frameworks/djl/using_djl.rst b/doc/frameworks/djl/using_djl.rst index e1f58183a9..217f5ed7dd 100644 --- a/doc/frameworks/djl/using_djl.rst +++ b/doc/frameworks/djl/using_djl.rst @@ -221,6 +221,31 @@ see the `DJL Serving Documentation on Python Mode. `_ +************************** +Ahead of time partitioning +************************** + +To optimize the deployment of large models that do not fit in a single GPU, the model’s tensor weights are partitioned at +runtime and each partition is loaded in individual GPU. But runtime partitioning takes significant amount of time and +memory on model loading. So, DJLModel offers an ahead of time partitioning capability for DeepSpeed and FasterTransformer +engines, which lets you partition your model weights and save them before deployment. HuggingFace does not support +tensor parallelism, so ahead of time partitioning cannot be done for it. In our experiment with GPT-J model, loading +this model with partitioned checkpoints increased the model loading time by 40%. + +`partition` method invokes an Amazon SageMaker Training job to partition the model and upload those partitioned +checkpoints to S3 bucket. You can either provide your desired S3 bucket to upload the partitioned checkpoints or it will be +uploaded to the default SageMaker S3 bucket. Please note that this S3 bucket will be remembered for deployment. When you +call `deploy` method after partition, DJLServing downloads the partitioned model checkpoints directly from the uploaded +s3 url, if available. + +.. code:: + + # partitions the model using Amazon Sagemaker Training Job. + djl_model.partition("ml.g5.12xlarge") + + predictor = deepspeed_model.deploy("ml.g5.12xlarge", + initial_instance_count=1) + *********************** SageMaker DJL Classes *********************** diff --git a/src/sagemaker/djl_inference/model.py b/src/sagemaker/djl_inference/model.py index b91851576e..632681ffbb 100644 --- a/src/sagemaker/djl_inference/model.py +++ b/src/sagemaker/djl_inference/model.py @@ -20,7 +20,7 @@ from json import JSONDecodeError from urllib.error import HTTPError, URLError from enum import Enum -from typing import Optional, Union, Dict, Any +from typing import Optional, Union, Dict, Any, List import sagemaker from sagemaker import s3, Predictor, image_uris, fw_utils @@ -31,6 +31,8 @@ from sagemaker.session import Session from sagemaker.utils import _tmpdir, _create_or_update_code_dir from sagemaker.workflow.entities import PipelineVariable +from sagemaker.estimator import Estimator +from sagemaker.s3 import S3Uploader logger = logging.getLogger("sagemaker") @@ -187,6 +189,53 @@ def _get_model_config_properties_from_hf(model_id: str): return model_config +def _create_estimator( + instance_type: str, + s3_output_uri: str, + image_uri: str, + role: str, + sagemaker_session: Optional[Session], + volume_size: int = 30, + vpc_config: Optional[ + Dict[ + str, + List[ + str, + ], + ] + ] = None, + volume_kms_key=None, + output_kms_key=None, + use_spot_instances: bool = False, + max_wait: int = None, + enable_network_isolation: bool = False, +): + """Placeholder docstring""" + + subnets = None + security_group_ids = None + if vpc_config: + subnets = vpc_config.get("Subnets") + security_group_ids = vpc_config.get("SecurityGroupIds") + + return Estimator( + image_uri=image_uri, + role=role, + instance_count=1, + instance_type=instance_type, + volume_size=volume_size, + volume_kms_key=volume_kms_key, + output_path=s3_output_uri, + output_kms_key=output_kms_key, + sagemaker_session=sagemaker_session, + subnets=subnets, + security_group_ids=security_group_ids, + use_spot_instances=use_spot_instances, + max_wait=max_wait, + enable_network_isolation=enable_network_isolation, + ) + + class DJLModel(FrameworkModel): """A DJL SageMaker ``Model`` that can be deployed to a SageMaker ``Endpoint``.""" @@ -351,6 +400,7 @@ def __init__( self.model_loading_timeout = model_loading_timeout self.prediction_timeout = prediction_timeout self.sagemaker_session = self.sagemaker_session or Session() + self.save_mp_checkpoint_path = None def package_for_edge(self, **_): """Not implemented. @@ -398,6 +448,95 @@ def right_size(self, **_): "DJLModels do not currently support Inference Recommendation Jobs" ) + def partition( + self, + instance_type: str, + s3_output_uri: str = None, + job_name: Optional[str] = None, + volume_kms_key: Optional[str] = None, + output_kms_key: Optional[str] = None, + use_spot_instances: bool = False, + max_wait: int = None, + enable_network_isolation: bool = False, + ): + """Partitions the model using SageMaker Training Job. This is a synchronous API call. + + Args: + instance_type (str): The EC2 instance type to partition this Model. + For example, 'ml.p4d.24xlarge'. + s3_output_uri (str): S3 location for saving the training result (model + artifacts and output files). If not specified, results are + stored to a default bucket. If the bucket with the specific name + does not exist, it will be created. + job_name (str): Training job name. If not specified, a unique training job + name will be created. + volume_kms_key (str): Optional. KMS key ID for encrypting EBS + volume attached to the training instance (default: None). + output_kms_key (str): Optional. KMS key ID for encrypting the + training output (default: None). + use_spot_instances (bool): Specifies whether to use SageMaker + Managed Spot instances for training. If enabled then the + ``max_wait`` arg should also be set. + + More information: + https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html + (default: ``False``). + max_wait (int): Timeout in seconds waiting for spot training + job (default: None). After this amount of time Amazon + SageMaker will stop waiting for managed spot training job to + complete (default: None). + enable_network_isolation (bool): Specifies whether container will + run in network isolation mode (default: ``False``). Network + isolation mode restricts the container access to outside networks + (such as the Internet). The container does not make any inbound or + outbound network calls. Also known as Internet-free mode. + Returns: + None + """ + + if not self.image_uri: + region_name = self.sagemaker_session.boto_session.region_name + self.image_uri = self.serving_image_uri(region_name) + + deploy_key_prefix = fw_utils.model_code_key_prefix( + self.key_prefix, self.name, self.image_uri + ) + if s3_output_uri is None: + bucket = self.bucket or self.sagemaker_session.default_bucket() + s3_output_uri = f"s3://{bucket}/{deploy_key_prefix}" + else: + s3_output_uri = f"{s3_output_uri}/{deploy_key_prefix}" + + self.save_mp_checkpoint_path = f"{s3_output_uri}/aot-partitioned-checkpoints" + + container_def = self._upload_model_to_s3(upload_as_tar=False) + estimator = _create_estimator( + instance_type=instance_type, + s3_output_uri=s3_output_uri, + image_uri=self.image_uri, + role=self.role, + sagemaker_session=self.sagemaker_session, + vpc_config=self.vpc_config, + volume_kms_key=volume_kms_key, + output_kms_key=output_kms_key, + use_spot_instances=use_spot_instances, + max_wait=max_wait, + enable_network_isolation=enable_network_isolation, + ) + + # creates a training job to do partitions + estimator.fit( + inputs=container_def["ModelDataUrl"], + wait=True, + logs="All", + job_name=job_name, + experiment_config=None, + ) + + self.model_id = self.save_mp_checkpoint_path + # reset save_mp_checkpoint_path since partition is completed. + self.save_mp_checkpoint_path = None + def deploy( self, instance_type, @@ -494,18 +633,8 @@ def deploy( container_startup_health_check_timeout=container_startup_health_check_timeout, ) - def prepare_container_def( - self, - instance_type=None, - accelerator_type=None, - serverless_inference_config=None, - ): # pylint: disable=unused-argument - """A container definition with framework configuration set in model environment variables. - - Returns: - dict[str, str]: A container definition object usable with the - CreateModel API. - """ + def _upload_model_to_s3(self, upload_as_tar: bool = True): + """Placeholder docstring""" if not self.image_uri: region_name = self.sagemaker_session.boto_session.region_name @@ -545,19 +674,44 @@ def prepare_container_def( self.key_prefix, self.name, self.image_uri ) bucket = self.bucket or self.sagemaker_session.default_bucket() - uploaded_code = fw_utils.tar_and_upload_dir( - self.sagemaker_session.boto_session, - bucket, - deploy_key_prefix, - self.entry_point, - directory=tmp_code_dir, - dependencies=self.dependencies, - kms_key=self.model_kms_key, - ) + if upload_as_tar: + uploaded_code = fw_utils.tar_and_upload_dir( + self.sagemaker_session.boto_session, + bucket, + deploy_key_prefix, + self.entry_point, + directory=tmp_code_dir, + dependencies=self.dependencies, + kms_key=self.model_kms_key, + ) + model_data_url = uploaded_code.s3_prefix + else: + key_prefix = f"{deploy_key_prefix}/aot-model" + model_data_url = S3Uploader.upload( + tmp_code_dir, + "s3://%s/%s" % (bucket, key_prefix), + self.model_kms_key, + self.sagemaker_session, + ) return sagemaker.container_def( - self.image_uri, model_data_url=uploaded_code.s3_prefix, env=environment + self.image_uri, model_data_url=model_data_url, env=environment ) + def prepare_container_def( + self, + instance_type=None, + accelerator_type=None, + serverless_inference_config=None, + ): # pylint: disable=unused-argument + """A container definition with framework configuration set in model environment variables. + + Returns: + dict[str, str]: A container definition object usable with the + CreateModel API. + """ + + return self._upload_model_to_s3(upload_as_tar=True) + def generate_serving_properties(self, serving_properties=None) -> Dict[str, str]: """Generates the DJL Serving configuration to use for the model. @@ -601,6 +755,8 @@ def generate_serving_properties(self, serving_properties=None) -> Dict[str, str] serving_properties["option.model_loading_timeout"] = self.model_loading_timeout if self.prediction_timeout: serving_properties["option.prediction_timeout"] = self.prediction_timeout + if self.save_mp_checkpoint_path: + serving_properties["option.save_mp_checkpoint_path"] = self.save_mp_checkpoint_path return serving_properties def serving_image_uri(self, region_name): @@ -716,6 +872,8 @@ def __init__( self.enable_cuda_graph = enable_cuda_graph self.triangular_masking = triangular_masking self.return_tuple = return_tuple + self.save_mp_checkpoint_path = None + self.checkpoint = None def generate_serving_properties(self, serving_properties=None) -> Dict[str, Any]: """Generates the DJL Serving configuration to use for the model. @@ -750,9 +908,72 @@ def generate_serving_properties(self, serving_properties=None) -> Dict[str, Any] serving_properties["option.triangular_masking"] = self.triangular_masking if self.return_tuple: serving_properties["option.return_tuple"] = self.return_tuple + if self.save_mp_checkpoint_path: + serving_properties["option.save_mp_checkpoint_path"] = self.save_mp_checkpoint_path + if self.checkpoint: + serving_properties["option.checkpoint"] = self.checkpoint return serving_properties + def partition( + self, + instance_type: str, + s3_output_uri: str = None, + job_name: Optional[str] = None, + volume_kms_key: Optional[str] = None, + output_kms_key: Optional[str] = None, + use_spot_instances: bool = False, + max_wait: int = None, + enable_network_isolation: bool = False, + ): + """Partitions the model using SageMaker Training Job. This is a synchronous API call. + + Args: + instance_type (str): The EC2 instance type to partition this Model. + For example, 'ml.p4d.24xlarge'. + s3_output_uri (str): S3 location for saving the training result (model + artifacts and output files). If not specified, results are + stored to a default bucket. If the bucket with the specific name + does not exist, it will be created. + job_name (str): Training job name. If not specified, a unique training job + name will be created. + volume_kms_key (str): Optional. KMS key ID for encrypting EBS + volume attached to the training instance (default: None). + output_kms_key (str): Optional. KMS key ID for encrypting the + training output (default: None). + use_spot_instances (bool): Specifies whether to use SageMaker + Managed Spot instances for training. If enabled then the + ``max_wait`` arg should also be set. + + More information: + https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html + (default: ``False``). + max_wait (int): Timeout in seconds waiting for spot training + job (default: None). After this amount of time Amazon + SageMaker will stop waiting for managed spot training job to + complete (default: None). + enable_network_isolation (bool): Specifies whether container will + run in network isolation mode (default: ``False``). Network + isolation mode restricts the container access to outside networks + (such as the Internet). The container does not make any inbound or + outbound network calls. Also known as Internet-free mode. + Returns: + None + """ + + super(DeepSpeedModel, self).partition( + instance_type, + s3_output_uri, + job_name, + volume_kms_key=volume_kms_key, + output_kms_key=output_kms_key, + use_spot_instances=use_spot_instances, + max_wait=max_wait, + enable_network_isolation=enable_network_isolation, + ) + + self.checkpoint = "ds_inference_config.json" + class HuggingFaceAccelerateModel(DJLModel): """A DJL Hugging Face SageMaker ``Model`` that can be deployed to a SageMaker ``Endpoint``.""" @@ -866,6 +1087,57 @@ def generate_serving_properties(self, serving_properties=None) -> Dict[str, str] serving_properties.pop("option.load_in_8bit", None) return serving_properties + def partition( + self, + instance_type: str, + s3_output_uri: str = None, + job_name: Optional[str] = None, + volume_kms_key: Optional[str] = None, + output_kms_key: Optional[str] = None, + use_spot_instances: bool = False, + max_wait: int = None, + enable_network_isolation: bool = False, + ): + """Partitions the model using SageMaker Training Job. This is a synchronous API call. + + Args: + instance_type (str): The EC2 instance type to partition this Model. + For example, 'ml.p4d.24xlarge'. + s3_output_uri (str): S3 location for saving the training result (model + artifacts and output files). If not specified, results are + stored to a default bucket. If the bucket with the specific name + does not exist, it will be created. + job_name (str): Training job name. If not specified, a unique training job + name will be created. + volume_kms_key (str): Optional. KMS key ID for encrypting EBS + volume attached to the training instance (default: None). + output_kms_key (str): Optional. KMS key ID for encrypting the + training output (default: None). + use_spot_instances (bool): Specifies whether to use SageMaker + Managed Spot instances for training. If enabled then the + ``max_wait`` arg should also be set. + + More information: + https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html + (default: ``False``). + max_wait (int): Timeout in seconds waiting for spot training + job (default: None). After this amount of time Amazon + SageMaker will stop waiting for managed spot training job to + complete (default: None). + enable_network_isolation (bool): Specifies whether container will + run in network isolation mode (default: ``False``). Network + isolation mode restricts the container access to outside networks + (such as the Internet). The container does not make any inbound or + outbound network calls. Also known as Internet-free mode. + Returns: + None + """ + + logger.warning( + "HuggingFace engine does not currently support tensor parallelism. " + "Hence ahead of time partitioning is skipped" + ) + class FasterTransformerModel(DJLModel): """A DJL FasterTransformer SageMaker ``Model`` diff --git a/tests/unit/test_djl_inference.py b/tests/unit/test_djl_inference.py index 06adea8e76..d2f12cc7a1 100644 --- a/tests/unit/test_djl_inference.py +++ b/tests/unit/test_djl_inference.py @@ -523,3 +523,73 @@ def test_deploy_model_no_local_code( mock_container_def.assert_called_once_with( IMAGE_URI, model_data_url="s3prefix", env=expected_env ) + + +@patch("sagemaker.image_uris.retrieve", return_value=IMAGE_URI) +@patch("shutil.rmtree") +@patch("sagemaker.utils.base_name_from_image") +@patch("tempfile.mkdtemp") +@patch("sagemaker.container_def") +@patch("sagemaker.utils._tmpdir") +@patch("sagemaker.utils._create_or_update_code_dir") +@patch("os.mkdir") +@patch("os.path.exists") +@patch("sagemaker.s3.S3Downloader.read_file") +@patch("sagemaker.s3.S3Downloader.list") +@patch("sagemaker.s3.S3Uploader.upload") +@patch("sagemaker.estimator.Estimator.fit") +@patch("sagemaker.fw_utils.model_code_key_prefix") +def test_partition( + mock_model_key_prefix, + mock_estimator_fit, + mock_upload, + mock_s3_list, + mock_read_file, + mock_path_exists, + mock_mkdir, + mock_create_code_dir, + mock_tmpdir, + mock_container_def, + mock_mktmp, + mock_name_from_base, + mock_shutil_rmtree, + mock_imguri_retrieve, + sagemaker_session, +): + mock_s3_list.return_value = [VALID_UNCOMPRESSED_MODEL_DATA + "/config.json"] + model_config = { + "model_type": "bloom", + "n_heads": 120, + } + mock_read_file.return_value = json.dumps(model_config) + model = DJLModel( + VALID_UNCOMPRESSED_MODEL_DATA, + ROLE, + sagemaker_session=sagemaker_session, + number_of_partitions=4, + data_type="fp16", + container_log_level=logging.DEBUG, + env=ENV, + ) + + assert model.image_uri is None + mock_path_exists.side_effect = [True, False, True] + mock_mktmp.return_value = "/tmp/dir" + expected_env = {"ENV_VAR": "env_value", "SERVING_OPTS": '"-Dai.djl.logging.level=debug"'} + mock_upload.return_value = "s3prefix" + + s3_output_uri = f"s3://{BUCKET}/partitions/" + mock_model_key_prefix.return_value = "s3prefix" + with patch("builtins.open", mock_open()) as fake_serving_properties: + model.partition(GPU_INSTANCE, s3_output_uri) + + mock_mktmp.assert_called_once_with(prefix="tmp", suffix="", dir=None) + mock_mkdir.assert_called() + assert fake_serving_properties.call_count == 2 + fake_serving_properties.assert_any_call("/tmp/dir/code/serving.properties", "w+") + fake_serving_properties.assert_any_call("/tmp/dir/code/serving.properties", "r") + mock_container_def.assert_called_once_with( + IMAGE_URI, model_data_url="s3prefix", env=expected_env + ) + + assert model.model_id == f"{s3_output_uri}/s3prefix/aot-partitioned-checkpoints"