Skip to content

Adding transformer method to PipelineModel #676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ CHANGELOG

* doc-fix: Remove incorrect parameter for EI TFS Python README
* feature: ``Predictor``: delete SageMaker model
* feature: ``Pipeline``: delete SageMaker model
* feature: ``PipelineModel``: delete SageMaker model
* bug-fix: Estimator.attach works with training jobs without hyperparameters
* doc-fix: remove duplicate content from mxnet/README.rst
* doc-fix: move overview content in main README into sphynx project
* bug-fix: pass accelerator_type in ``deploy`` for REST API TFS ``Model``
* feature: ``PipelineModel``: Create a Transformer from a PipelineModel
* doc-fix: move content from tf/README.rst into sphynx project
* doc-fix: Improve new developer experience in README

Expand Down
36 changes: 34 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,9 @@ the ML Pipeline.
endpoint_name = 'inference-pipeline-endpoint'
sm_model = PipelineModel(name=model_name, role=sagemaker_role, models=[sparkml_model, xgb_model])

This will define a ``PipelineModel`` consisting of SparkML model and an XGBoost model stacked sequentially. For more
information about how to train an XGBoost model, please refer to the XGBoost notebook here_.
This will define a ``PipelineModel`` consisting of SparkML model and an XGBoost model stacked sequentially.

For more information about how to train an XGBoost model, please refer to the XGBoost notebook here_.

.. _here: https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html#xgboost-sample-notebooks

Expand All @@ -978,6 +979,37 @@ This returns a predictor the same way an ``Estimator`` does when ``deploy()`` is
request using this predictor, you should pass the data that the first container expects and the predictor will return the
output from the last container.

You can also use a ``PipelineModel`` to create Transform Jobs for batch transformations. Using the same ``PipelineModel`` ``sm_model`` as above:

.. code:: python

# Only instance_type and instance_count are required.
transformer = sm_model.transformer(instance_type='ml.c5.xlarge',
instance_count=1,
strategy='MultiRecord',
max_payload=6,
max_concurrent_transforms=8,
accept='text/csv',
assemble_with='Line',
output_path='s3://my-output-bucket/path/to/my/output/data/')
# Only data is required.
transformer.transform(data='s3://my-input-bucket/path/to/my/csv/data',
content_type='text/csv',
split_type='Line')
# Waits for the Pipeline Transform Job to finish.
transformer.wait()

This runs a transform job against all the files under ``s3://mybucket/path/to/my/csv/data``, transforming the input
data in order with each model container in the pipeline. For each input file that was successfully transformed, one output file in ``s3://my-output-bucket/path/to/my/output/data/``
will be created with the same name, appended with '.out'.

This transform job will split CSV files by newline separators, which is especially useful if the input files are large. The Transform Job will
assemble the outputs with line separators when writing each input file's corresponding output file.

Each payload entering the first model container will be up to six megabytes, and up to eight inference requests will be sent at the
same time to the first model container. Since each payload will consist of a mini-batch of multiple CSV records, the model
containers will transform each mini-batch of records.

For comprehensive examples on how to use Inference Pipelines please refer to the following notebooks:

- `inference_pipeline_sparkml_xgboost_abalone.ipynb <https://github.com/awslabs/amazon-sagemaker-examples/blob/master/advanced_functionality/inference_pipeline_sparkml_xgboost_abalone/inference_pipeline_sparkml_xgboost_abalone.ipynb>`__
Expand Down
8 changes: 3 additions & 5 deletions src/sagemaker/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def _create_sagemaker_model(self, instance_type, accelerator_type=None):
Args:
instance_type (str): The EC2 instance type that this Model will be used for, this is only
used to determine if the image needs GPU support or not.
accelerator_type (str): <put docs here>
accelerator_type (str): Type of Elastic Inference accelerator to attach to an endpoint for model loading
and inference, for example, 'ml.eia1.medium'. If not specified, no Elastic Inference accelerator
will be attached to the endpoint.
"""
container_def = self.prepare_container_def(instance_type, accelerator_type=accelerator_type)
self.name = self.name or utils.name_from_image(container_def['Image'])
Expand Down Expand Up @@ -285,10 +287,6 @@ def transformer(self, instance_count, instance_type, strategy=None, assemble_wit
max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB.
tags (list[dict]): List of tags for labeling a transform job. If none specified, then the tags used for
the training job are used for the transform job.
role (str): The ``ExecutionRoleArn`` IAM Role ARN for the ``Model``, which is also used during
transform jobs. If not specified, the role from the Model will be used.
model_server_workers (int): Optional. The number of worker processes used by the inference server.
If None, server will use one worker per vCPU.
volume_kms_key (str): Optional. KMS key ID for encrypting the volume attached to the ML
compute instance (default: None).
"""
Expand Down
51 changes: 51 additions & 0 deletions src/sagemaker/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import sagemaker
from sagemaker.session import Session
from sagemaker.utils import name_from_image
from sagemaker.transformer import Transformer


class PipelineModel(object):
Expand Down Expand Up @@ -104,6 +105,56 @@ def deploy(self, initial_instance_count, instance_type, endpoint_name=None, tags
if self.predictor_cls:
return self.predictor_cls(self.endpoint_name, self.sagemaker_session)

def _create_sagemaker_pipeline_model(self, instance_type):
"""Create a SageMaker Model Entity

Args:
instance_type (str): The EC2 instance type that this Model will be used for, this is only
used to determine if the image needs GPU support or not.
accelerator_type (str): Type of Elastic Inference accelerator to attach to an endpoint for model loading
and inference, for example, 'ml.eia1.medium'. If not specified, no Elastic Inference accelerator
will be attached to the endpoint.
"""
if not self.sagemaker_session:
self.sagemaker_session = Session()

containers = self.pipeline_container_def(instance_type)

self.name = self.name or name_from_image(containers[0]['Image'])
self.sagemaker_session.create_model(self.name, self.role, containers, vpc_config=self.vpc_config)

def transformer(self, instance_count, instance_type, strategy=None, assemble_with=None, output_path=None,
output_kms_key=None, accept=None, env=None, max_concurrent_transforms=None,
max_payload=None, tags=None, volume_kms_key=None):
"""Return a ``Transformer`` that uses this Model.

Args:
instance_count (int): Number of EC2 instances to use.
instance_type (str): Type of EC2 instance to use, for example, 'ml.c4.xlarge'.
strategy (str): The strategy used to decide how to batch records in a single request (default: None).
Valid values: 'MULTI_RECORD' and 'SINGLE_RECORD'.
assemble_with (str): How the output is assembled (default: None). Valid values: 'Line' or 'None'.
output_path (str): S3 location for saving the transform result. If not specified, results are stored to
a default bucket.
output_kms_key (str): Optional. KMS key ID for encrypting the transform output (default: None).
accept (str): The content type accepted by the endpoint deployed during the transform job.
env (dict): Environment variables to be set for use during the transform job (default: None).
max_concurrent_transforms (int): The maximum number of HTTP requests to be made to
each individual transform container at one time.
max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB.
tags (list[dict]): List of tags for labeling a transform job. If none specified, then the tags used for
the training job are used for the transform job.
volume_kms_key (str): Optional. KMS key ID for encrypting the volume attached to the ML
compute instance (default: None).
"""
self._create_sagemaker_pipeline_model(instance_type)

return Transformer(self.name, instance_count, instance_type, strategy=strategy, assemble_with=assemble_with,
output_path=output_path, output_kms_key=output_kms_key, accept=accept,
max_concurrent_transforms=max_concurrent_transforms, max_payload=max_payload,
env=env, tags=tags, base_transform_job_name=self.name,
volume_kms_key=volume_kms_key, sagemaker_session=self.sagemaker_session)

def delete_model(self):
"""Delete the SageMaker model backing this pipeline model. This does not delete the list of SageMaker models used
in multiple containers to build the inference pipeline.
Expand Down
1 change: 1 addition & 0 deletions tests/data/sparkml_xgboost_pipeline/invalid_input.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.0,28.0,C,38.0,71.5,1.0
1 change: 1 addition & 0 deletions tests/data/sparkml_xgboost_pipeline/valid_input.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.0,C,38.0,71.5,1.0,female
118 changes: 78 additions & 40 deletions tests/integ/test_inference_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import os

import pytest
from tests.integ import DATA_DIR
from tests.integ.timeout import timeout_and_delete_endpoint_by_name
from tests.integ import DATA_DIR, TRANSFORM_DEFAULT_TIMEOUT_MINUTES
from tests.integ.timeout import timeout_and_delete_endpoint_by_name, timeout_and_delete_model_with_transformer

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.content_types import CONTENT_TYPE_CSV
Expand All @@ -27,6 +27,74 @@
from sagemaker.sparkml.model import SparkMLModel
from sagemaker.utils import sagemaker_timestamp

SPARKML_DATA_PATH = os.path.join(DATA_DIR, 'sparkml_model')
XGBOOST_DATA_PATH = os.path.join(DATA_DIR, 'xgboost_model')
SPARKML_XGBOOST_DATA_DIR = 'sparkml_xgboost_pipeline'
VALID_DATA_PATH = os.path.join(DATA_DIR, SPARKML_XGBOOST_DATA_DIR, 'valid_input.csv')
INVALID_DATA_PATH = os.path.join(DATA_DIR, SPARKML_XGBOOST_DATA_DIR, 'invalid_input.csv')
SCHEMA = json.dumps({
"input": [
{
"name": "Pclass",
"type": "float"
},
{
"name": "Embarked",
"type": "string"
},
{
"name": "Age",
"type": "float"
},
{
"name": "Fare",
"type": "float"
},
{
"name": "SibSp",
"type": "float"
},
{
"name": "Sex",
"type": "string"
}
],
"output": {
"name": "features",
"struct": "vector",
"type": "double"
}
})


@pytest.mark.continuous_testing
@pytest.mark.regional_testing
def test_inference_pipeline_batch_transform(sagemaker_session):
sparkml_model_data = sagemaker_session.upload_data(
path=os.path.join(SPARKML_DATA_PATH, 'mleap_model.tar.gz'),
key_prefix='integ-test-data/sparkml/model')
xgb_model_data = sagemaker_session.upload_data(
path=os.path.join(XGBOOST_DATA_PATH, 'xgb_model.tar.gz'),
key_prefix='integ-test-data/xgboost/model')
batch_job_name = 'test-inference-pipeline-batch-{}'.format(sagemaker_timestamp())
sparkml_model = SparkMLModel(model_data=sparkml_model_data,
env={'SAGEMAKER_SPARKML_SCHEMA': SCHEMA},
sagemaker_session=sagemaker_session)
xgb_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost')
xgb_model = Model(model_data=xgb_model_data, image=xgb_image,
sagemaker_session=sagemaker_session)
model = PipelineModel(models=[sparkml_model, xgb_model], role='SageMakerRole',
sagemaker_session=sagemaker_session, name=batch_job_name)
transformer = model.transformer(1, 'ml.m4.xlarge')
transform_input_key_prefix = 'integ-test-data/sparkml_xgboost/transform'
transform_input = transformer.sagemaker_session.upload_data(path=VALID_DATA_PATH,
key_prefix=transform_input_key_prefix)

with timeout_and_delete_model_with_transformer(transformer, sagemaker_session,
minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES):
transformer.transform(transform_input, content_type=CONTENT_TYPE_CSV, job_name=batch_job_name)
transformer.wait()


@pytest.mark.canary_quick
@pytest.mark.regional_testing
Expand All @@ -40,42 +108,10 @@ def test_inference_pipeline_model_deploy(sagemaker_session):
xgb_model_data = sagemaker_session.upload_data(
path=os.path.join(xgboost_data_path, 'xgb_model.tar.gz'),
key_prefix='integ-test-data/xgboost/model')
schema = json.dumps({
"input": [
{
"name": "Pclass",
"type": "float"
},
{
"name": "Embarked",
"type": "string"
},
{
"name": "Age",
"type": "float"
},
{
"name": "Fare",
"type": "float"
},
{
"name": "SibSp",
"type": "float"
},
{
"name": "Sex",
"type": "string"
}
],
"output": {
"name": "features",
"struct": "vector",
"type": "double"
}
})

with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
sparkml_model = SparkMLModel(model_data=sparkml_model_data,
env={'SAGEMAKER_SPARKML_SCHEMA': schema},
env={'SAGEMAKER_SPARKML_SCHEMA': SCHEMA},
sagemaker_session=sagemaker_session)
xgb_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost')
xgb_model = Model(model_data=xgb_model_data, image=xgb_image,
Expand All @@ -87,11 +123,13 @@ def test_inference_pipeline_model_deploy(sagemaker_session):
serializer=json_serializer, content_type=CONTENT_TYPE_CSV,
accept=CONTENT_TYPE_CSV)

valid_data = '1.0,C,38.0,71.5,1.0,female'
assert predictor.predict(valid_data) == "0.714013934135"
with open(VALID_DATA_PATH, 'r') as f:
valid_data = f.read()
assert predictor.predict(valid_data) == "0.714013934135"

invalid_data = "1.0,28.0,C,38.0,71.5,1.0"
assert (predictor.predict(invalid_data) is None)
with open(INVALID_DATA_PATH, 'r') as f:
invalid_data = f.read()
assert (predictor.predict(invalid_data) is None)

model.delete_model()
with pytest.raises(Exception) as exception:
Expand Down
47 changes: 44 additions & 3 deletions tests/unit/test_pipeline_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,58 @@ def test_deploy_endpoint_name(tfo, time, sagemaker_session):
framework_model = DummyFrameworkModel(sagemaker_session)
sparkml_model = SparkMLModel(model_data=MODEL_DATA_2, role=ROLE, sagemaker_session=sagemaker_session)
model = PipelineModel(models=[framework_model, sparkml_model], role=ROLE, sagemaker_session=sagemaker_session)
model.deploy(endpoint_name='blah', instance_type=INSTANCE_TYPE, initial_instance_count=55)
model.deploy(instance_type=INSTANCE_TYPE, initial_instance_count=1)
sagemaker_session.endpoint_from_production_variants.assert_called_with(
'blah',
'mi-1-2017-10-10-14-14-15',
[{'InitialVariantWeight': 1,
'ModelName': 'mi-1-2017-10-10-14-14-15',
'InstanceType': INSTANCE_TYPE,
'InitialInstanceCount': 55,
'InitialInstanceCount': 1,
'VariantName': 'AllTraffic'}],
None)


@patch('tarfile.open')
@patch('time.strftime', return_value=TIMESTAMP)
def test_transformer(tfo, time, sagemaker_session):
framework_model = DummyFrameworkModel(sagemaker_session)
sparkml_model = SparkMLModel(model_data=MODEL_DATA_2, role=ROLE, sagemaker_session=sagemaker_session)
model_name = 'ModelName'
model = PipelineModel(models=[framework_model, sparkml_model], role=ROLE, sagemaker_session=sagemaker_session,
name=model_name)

instance_count = 55
strategy = 'MultiRecord'
assemble_with = 'Line'
output_path = "s3://output/path"
output_kms_key = "output:kms:key"
accept = "application/jsonlines"
env = {"my_key": "my_value"}
max_concurrent_transforms = 20
max_payload = 5
tags = [{"my_tag": "my_value"}]
volume_kms_key = "volume:kms:key"
transformer = model.transformer(instance_type=INSTANCE_TYPE, instance_count=instance_count,
strategy=strategy, assemble_with=assemble_with, output_path=output_path,
output_kms_key=output_kms_key, accept=accept, env=env,
max_concurrent_transforms=max_concurrent_transforms,
max_payload=max_payload, tags=tags, volume_kms_key=volume_kms_key
)
assert transformer.instance_type == INSTANCE_TYPE
assert transformer.instance_count == instance_count
assert transformer.strategy == strategy
assert transformer.assemble_with == assemble_with
assert transformer.output_path == output_path
assert transformer.output_kms_key == output_kms_key
assert transformer.accept == accept
assert transformer.env == env
assert transformer.max_concurrent_transforms == max_concurrent_transforms
assert transformer.max_payload == max_payload
assert transformer.tags == tags
assert transformer.volume_kms_key == volume_kms_key
assert transformer.model_name == model_name


@patch('tarfile.open')
@patch('time.strftime', return_value=TIMESTAMP)
def test_deploy_tags(tfo, time, sagemaker_session):
Expand Down