diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a80788661a..1393f1169a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,13 +2,17 @@ CHANGELOG ========= +1.18.5dev +====== + +* feature: ``PipelineModel``: Create a Transformer from a PipelineModel 1.18.4 ====== * doc-fix: Remove incorrect parameter for EI TFS Python README * feature: ``Predictor``: delete SageMaker model -* feature: ``Pipeline``: delete SageMaker model +* feature: ``PipelineModel``: delete SageMaker model * bug-fix: Estimator.attach works with training jobs without hyperparameters * doc-fix: remove duplicate content from mxnet/README.rst * doc-fix: move overview content in main README into sphynx project diff --git a/README.rst b/README.rst index 91d8bda716..3cbd7b2354 100644 --- a/README.rst +++ b/README.rst @@ -965,8 +965,9 @@ the ML Pipeline. endpoint_name = 'inference-pipeline-endpoint' sm_model = PipelineModel(name=model_name, role=sagemaker_role, models=[sparkml_model, xgb_model]) -This will define a ``PipelineModel`` consisting of SparkML model and an XGBoost model stacked sequentially. For more -information about how to train an XGBoost model, please refer to the XGBoost notebook here_. +This will define a ``PipelineModel`` consisting of SparkML model and an XGBoost model stacked sequentially. + +For more information about how to train an XGBoost model, please refer to the XGBoost notebook here_. .. _here: https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html#xgboost-sample-notebooks @@ -978,6 +979,37 @@ This returns a predictor the same way an ``Estimator`` does when ``deploy()`` is request using this predictor, you should pass the data that the first container expects and the predictor will return the output from the last container. +You can also use a ``PipelineModel`` to create Transform Jobs for batch transformations. Using the same ``PipelineModel`` ``sm_model`` as above: + +.. code:: python + + # Only instance_type and instance_count are required. + transformer = sm_model.transformer(instance_type='ml.c5.xlarge', + instance_count=1, + strategy='MultiRecord', + max_payload=6, + max_concurrent_transforms=8, + accept='text/csv', + assemble_with='Line', + output_path='s3://my-output-bucket/path/to/my/output/data/') + # Only data is required. + transformer.transform(data='s3://my-input-bucket/path/to/my/csv/data', + content_type='text/csv', + split_type='Line') + # Waits for the Pipeline Transform Job to finish. + transformer.wait() + +This runs a transform job against all the files under ``s3://mybucket/path/to/my/csv/data``, transforming the input +data in order with each model container in the pipeline. For each input file that was successfully transformed, one output file in ``s3://my-output-bucket/path/to/my/output/data/`` +will be created with the same name, appended with '.out'. + +This transform job will split CSV files by newline separators, which is especially useful if the input files are large. The Transform Job will +assemble the outputs with line separators when writing each input file's corresponding output file. + +Each payload entering the first model container will be up to six megabytes, and up to eight inference requests will be sent at the +same time to the first model container. Since each payload will consist of a mini-batch of multiple CSV records, the model +containers will transform each mini-batch of records. + For comprehensive examples on how to use Inference Pipelines please refer to the following notebooks: - `inference_pipeline_sparkml_xgboost_abalone.ipynb `__ diff --git a/src/sagemaker/model.py b/src/sagemaker/model.py index a28a1b3fbf..64076377ba 100644 --- a/src/sagemaker/model.py +++ b/src/sagemaker/model.py @@ -102,7 +102,9 @@ def _create_sagemaker_model(self, instance_type, accelerator_type=None): Args: instance_type (str): The EC2 instance type that this Model will be used for, this is only used to determine if the image needs GPU support or not. - accelerator_type (str): + accelerator_type (str): Type of Elastic Inference accelerator to attach to an endpoint for model loading + and inference, for example, 'ml.eia1.medium'. If not specified, no Elastic Inference accelerator + will be attached to the endpoint. """ container_def = self.prepare_container_def(instance_type, accelerator_type=accelerator_type) self.name = self.name or utils.name_from_image(container_def['Image']) @@ -292,10 +294,6 @@ def transformer(self, instance_count, instance_type, strategy=None, assemble_wit max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB. tags (list[dict]): List of tags for labeling a transform job. If none specified, then the tags used for the training job are used for the transform job. - role (str): The ``ExecutionRoleArn`` IAM Role ARN for the ``Model``, which is also used during - transform jobs. If not specified, the role from the Model will be used. - model_server_workers (int): Optional. The number of worker processes used by the inference server. - If None, server will use one worker per vCPU. volume_kms_key (str): Optional. KMS key ID for encrypting the volume attached to the ML compute instance (default: None). """ diff --git a/src/sagemaker/pipeline.py b/src/sagemaker/pipeline.py index 4d9d3cd19e..58f3b4b1a0 100644 --- a/src/sagemaker/pipeline.py +++ b/src/sagemaker/pipeline.py @@ -15,6 +15,7 @@ import sagemaker from sagemaker.session import Session from sagemaker.utils import name_from_image +from sagemaker.transformer import Transformer class PipelineModel(object): @@ -104,6 +105,56 @@ def deploy(self, initial_instance_count, instance_type, endpoint_name=None, tags if self.predictor_cls: return self.predictor_cls(self.endpoint_name, self.sagemaker_session) + def _create_sagemaker_pipeline_model(self, instance_type): + """Create a SageMaker Model Entity + + Args: + instance_type (str): The EC2 instance type that this Model will be used for, this is only + used to determine if the image needs GPU support or not. + accelerator_type (str): Type of Elastic Inference accelerator to attach to an endpoint for model loading + and inference, for example, 'ml.eia1.medium'. If not specified, no Elastic Inference accelerator + will be attached to the endpoint. + """ + if not self.sagemaker_session: + self.sagemaker_session = Session() + + containers = self.pipeline_container_def(instance_type) + + self.name = self.name or name_from_image(containers[0]['Image']) + self.sagemaker_session.create_model(self.name, self.role, containers, vpc_config=self.vpc_config) + + def transformer(self, instance_count, instance_type, strategy=None, assemble_with=None, output_path=None, + output_kms_key=None, accept=None, env=None, max_concurrent_transforms=None, + max_payload=None, tags=None, volume_kms_key=None): + """Return a ``Transformer`` that uses this Model. + + Args: + instance_count (int): Number of EC2 instances to use. + instance_type (str): Type of EC2 instance to use, for example, 'ml.c4.xlarge'. + strategy (str): The strategy used to decide how to batch records in a single request (default: None). + Valid values: 'MULTI_RECORD' and 'SINGLE_RECORD'. + assemble_with (str): How the output is assembled (default: None). Valid values: 'Line' or 'None'. + output_path (str): S3 location for saving the transform result. If not specified, results are stored to + a default bucket. + output_kms_key (str): Optional. KMS key ID for encrypting the transform output (default: None). + accept (str): The content type accepted by the endpoint deployed during the transform job. + env (dict): Environment variables to be set for use during the transform job (default: None). + max_concurrent_transforms (int): The maximum number of HTTP requests to be made to + each individual transform container at one time. + max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB. + tags (list[dict]): List of tags for labeling a transform job. If none specified, then the tags used for + the training job are used for the transform job. + volume_kms_key (str): Optional. KMS key ID for encrypting the volume attached to the ML + compute instance (default: None). + """ + self._create_sagemaker_pipeline_model(instance_type) + + return Transformer(self.name, instance_count, instance_type, strategy=strategy, assemble_with=assemble_with, + output_path=output_path, output_kms_key=output_kms_key, accept=accept, + max_concurrent_transforms=max_concurrent_transforms, max_payload=max_payload, + env=env, tags=tags, base_transform_job_name=self.name, + volume_kms_key=volume_kms_key, sagemaker_session=self.sagemaker_session) + def delete_model(self): """Delete the SageMaker model backing this pipeline model. This does not delete the list of SageMaker models used in multiple containers to build the inference pipeline. diff --git a/tests/data/sparkml_xgboost_pipeline/invalid_input.csv b/tests/data/sparkml_xgboost_pipeline/invalid_input.csv new file mode 100644 index 0000000000..01e289fc69 --- /dev/null +++ b/tests/data/sparkml_xgboost_pipeline/invalid_input.csv @@ -0,0 +1 @@ +1.0,28.0,C,38.0,71.5,1.0 diff --git a/tests/data/sparkml_xgboost_pipeline/valid_input.csv b/tests/data/sparkml_xgboost_pipeline/valid_input.csv new file mode 100644 index 0000000000..d7867994cb --- /dev/null +++ b/tests/data/sparkml_xgboost_pipeline/valid_input.csv @@ -0,0 +1 @@ +1.0,C,38.0,71.5,1.0,female diff --git a/tests/integ/test_inference_pipeline.py b/tests/integ/test_inference_pipeline.py index ec170798a5..bfd2f3bae2 100644 --- a/tests/integ/test_inference_pipeline.py +++ b/tests/integ/test_inference_pipeline.py @@ -16,8 +16,8 @@ import os import pytest -from tests.integ import DATA_DIR -from tests.integ.timeout import timeout_and_delete_endpoint_by_name +from tests.integ import DATA_DIR, TRANSFORM_DEFAULT_TIMEOUT_MINUTES +from tests.integ.timeout import timeout_and_delete_endpoint_by_name, timeout_and_delete_model_with_transformer from sagemaker.amazon.amazon_estimator import get_image_uri from sagemaker.content_types import CONTENT_TYPE_CSV @@ -27,6 +27,74 @@ from sagemaker.sparkml.model import SparkMLModel from sagemaker.utils import sagemaker_timestamp +SPARKML_DATA_PATH = os.path.join(DATA_DIR, 'sparkml_model') +XGBOOST_DATA_PATH = os.path.join(DATA_DIR, 'xgboost_model') +SPARKML_XGBOOST_DATA_DIR = 'sparkml_xgboost_pipeline' +VALID_DATA_PATH = os.path.join(DATA_DIR, SPARKML_XGBOOST_DATA_DIR, 'valid_input.csv') +INVALID_DATA_PATH = os.path.join(DATA_DIR, SPARKML_XGBOOST_DATA_DIR, 'invalid_input.csv') +SCHEMA = json.dumps({ + "input": [ + { + "name": "Pclass", + "type": "float" + }, + { + "name": "Embarked", + "type": "string" + }, + { + "name": "Age", + "type": "float" + }, + { + "name": "Fare", + "type": "float" + }, + { + "name": "SibSp", + "type": "float" + }, + { + "name": "Sex", + "type": "string" + } + ], + "output": { + "name": "features", + "struct": "vector", + "type": "double" + } +}) + + +@pytest.mark.continuous_testing +@pytest.mark.regional_testing +def test_inference_pipeline_batch_transform(sagemaker_session): + sparkml_model_data = sagemaker_session.upload_data( + path=os.path.join(SPARKML_DATA_PATH, 'mleap_model.tar.gz'), + key_prefix='integ-test-data/sparkml/model') + xgb_model_data = sagemaker_session.upload_data( + path=os.path.join(XGBOOST_DATA_PATH, 'xgb_model.tar.gz'), + key_prefix='integ-test-data/xgboost/model') + batch_job_name = 'test-inference-pipeline-batch-{}'.format(sagemaker_timestamp()) + sparkml_model = SparkMLModel(model_data=sparkml_model_data, + env={'SAGEMAKER_SPARKML_SCHEMA': SCHEMA}, + sagemaker_session=sagemaker_session) + xgb_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost') + xgb_model = Model(model_data=xgb_model_data, image=xgb_image, + sagemaker_session=sagemaker_session) + model = PipelineModel(models=[sparkml_model, xgb_model], role='SageMakerRole', + sagemaker_session=sagemaker_session, name=batch_job_name) + transformer = model.transformer(1, 'ml.m4.xlarge') + transform_input_key_prefix = 'integ-test-data/sparkml_xgboost/transform' + transform_input = transformer.sagemaker_session.upload_data(path=VALID_DATA_PATH, + key_prefix=transform_input_key_prefix) + + with timeout_and_delete_model_with_transformer(transformer, sagemaker_session, + minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES): + transformer.transform(transform_input, content_type=CONTENT_TYPE_CSV, job_name=batch_job_name) + transformer.wait() + @pytest.mark.canary_quick @pytest.mark.regional_testing @@ -40,42 +108,10 @@ def test_inference_pipeline_model_deploy(sagemaker_session): xgb_model_data = sagemaker_session.upload_data( path=os.path.join(xgboost_data_path, 'xgb_model.tar.gz'), key_prefix='integ-test-data/xgboost/model') - schema = json.dumps({ - "input": [ - { - "name": "Pclass", - "type": "float" - }, - { - "name": "Embarked", - "type": "string" - }, - { - "name": "Age", - "type": "float" - }, - { - "name": "Fare", - "type": "float" - }, - { - "name": "SibSp", - "type": "float" - }, - { - "name": "Sex", - "type": "string" - } - ], - "output": { - "name": "features", - "struct": "vector", - "type": "double" - } - }) + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session): sparkml_model = SparkMLModel(model_data=sparkml_model_data, - env={'SAGEMAKER_SPARKML_SCHEMA': schema}, + env={'SAGEMAKER_SPARKML_SCHEMA': SCHEMA}, sagemaker_session=sagemaker_session) xgb_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost') xgb_model = Model(model_data=xgb_model_data, image=xgb_image, @@ -87,11 +123,13 @@ def test_inference_pipeline_model_deploy(sagemaker_session): serializer=json_serializer, content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_CSV) - valid_data = '1.0,C,38.0,71.5,1.0,female' - assert predictor.predict(valid_data) == "0.714013934135" + with open(VALID_DATA_PATH, 'r') as f: + valid_data = f.read() + assert predictor.predict(valid_data) == '0.714013934135' - invalid_data = "1.0,28.0,C,38.0,71.5,1.0" - assert (predictor.predict(invalid_data) is None) + with open(INVALID_DATA_PATH, 'r') as f: + invalid_data = f.read() + assert (predictor.predict(invalid_data) is None) model.delete_model() with pytest.raises(Exception) as exception: diff --git a/tests/unit/test_pipeline_model.py b/tests/unit/test_pipeline_model.py index d640aa3235..3fa48e8010 100644 --- a/tests/unit/test_pipeline_model.py +++ b/tests/unit/test_pipeline_model.py @@ -111,17 +111,58 @@ def test_deploy_endpoint_name(tfo, time, sagemaker_session): framework_model = DummyFrameworkModel(sagemaker_session) sparkml_model = SparkMLModel(model_data=MODEL_DATA_2, role=ROLE, sagemaker_session=sagemaker_session) model = PipelineModel(models=[framework_model, sparkml_model], role=ROLE, sagemaker_session=sagemaker_session) - model.deploy(endpoint_name='blah', instance_type=INSTANCE_TYPE, initial_instance_count=55) + model.deploy(instance_type=INSTANCE_TYPE, initial_instance_count=1) sagemaker_session.endpoint_from_production_variants.assert_called_with( - 'blah', + 'mi-1-2017-10-10-14-14-15', [{'InitialVariantWeight': 1, 'ModelName': 'mi-1-2017-10-10-14-14-15', 'InstanceType': INSTANCE_TYPE, - 'InitialInstanceCount': 55, + 'InitialInstanceCount': 1, 'VariantName': 'AllTraffic'}], None) +@patch('tarfile.open') +@patch('time.strftime', return_value=TIMESTAMP) +def test_transformer(tfo, time, sagemaker_session): + framework_model = DummyFrameworkModel(sagemaker_session) + sparkml_model = SparkMLModel(model_data=MODEL_DATA_2, role=ROLE, sagemaker_session=sagemaker_session) + model_name = 'ModelName' + model = PipelineModel(models=[framework_model, sparkml_model], role=ROLE, sagemaker_session=sagemaker_session, + name=model_name) + + instance_count = 55 + strategy = 'MultiRecord' + assemble_with = 'Line' + output_path = "s3://output/path" + output_kms_key = "output:kms:key" + accept = "application/jsonlines" + env = {"my_key": "my_value"} + max_concurrent_transforms = 20 + max_payload = 5 + tags = [{"my_tag": "my_value"}] + volume_kms_key = "volume:kms:key" + transformer = model.transformer(instance_type=INSTANCE_TYPE, instance_count=instance_count, + strategy=strategy, assemble_with=assemble_with, output_path=output_path, + output_kms_key=output_kms_key, accept=accept, env=env, + max_concurrent_transforms=max_concurrent_transforms, + max_payload=max_payload, tags=tags, volume_kms_key=volume_kms_key + ) + assert transformer.instance_type == INSTANCE_TYPE + assert transformer.instance_count == instance_count + assert transformer.strategy == strategy + assert transformer.assemble_with == assemble_with + assert transformer.output_path == output_path + assert transformer.output_kms_key == output_kms_key + assert transformer.accept == accept + assert transformer.env == env + assert transformer.max_concurrent_transforms == max_concurrent_transforms + assert transformer.max_payload == max_payload + assert transformer.tags == tags + assert transformer.volume_kms_key == volume_kms_key + assert transformer.model_name == model_name + + @patch('tarfile.open') @patch('time.strftime', return_value=TIMESTAMP) def test_deploy_tags(tfo, time, sagemaker_session):