Skip to content

Commit 1787f78

Browse files
andremoellerlaurenyu
authored andcommitted
Add transformer method to PipelineModel (aws#676)
This enables creating multi-container Transform jobs with PipelineModel without having already created a SageMaker multi-container Model.
1 parent 129bdb3 commit 1787f78

File tree

8 files changed

+217
-51
lines changed

8 files changed

+217
-51
lines changed

CHANGELOG.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22
CHANGELOG
33
=========
44

5+
1.18.5dev
6+
======
7+
8+
* feature: ``PipelineModel``: Create a Transformer from a PipelineModel
59

610
1.18.4
711
======
812

913
* doc-fix: Remove incorrect parameter for EI TFS Python README
1014
* feature: ``Predictor``: delete SageMaker model
11-
* feature: ``Pipeline``: delete SageMaker model
15+
* feature: ``PipelineModel``: delete SageMaker model
1216
* bug-fix: Estimator.attach works with training jobs without hyperparameters
1317
* doc-fix: remove duplicate content from mxnet/README.rst
1418
* doc-fix: move overview content in main README into sphynx project

README.rst

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -965,8 +965,9 @@ the ML Pipeline.
965965
endpoint_name = 'inference-pipeline-endpoint'
966966
sm_model = PipelineModel(name=model_name, role=sagemaker_role, models=[sparkml_model, xgb_model])
967967
968-
This will define a ``PipelineModel`` consisting of SparkML model and an XGBoost model stacked sequentially. For more
969-
information about how to train an XGBoost model, please refer to the XGBoost notebook here_.
968+
This will define a ``PipelineModel`` consisting of SparkML model and an XGBoost model stacked sequentially.
969+
970+
For more information about how to train an XGBoost model, please refer to the XGBoost notebook here_.
970971
971972
.. _here: https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html#xgboost-sample-notebooks
972973
@@ -978,6 +979,37 @@ This returns a predictor the same way an ``Estimator`` does when ``deploy()`` is
978979
request using this predictor, you should pass the data that the first container expects and the predictor will return the
979980
output from the last container.
980981
982+
You can also use a ``PipelineModel`` to create Transform Jobs for batch transformations. Using the same ``PipelineModel`` ``sm_model`` as above:
983+
984+
.. code:: python
985+
986+
# Only instance_type and instance_count are required.
987+
transformer = sm_model.transformer(instance_type='ml.c5.xlarge',
988+
instance_count=1,
989+
strategy='MultiRecord',
990+
max_payload=6,
991+
max_concurrent_transforms=8,
992+
accept='text/csv',
993+
assemble_with='Line',
994+
output_path='s3://my-output-bucket/path/to/my/output/data/')
995+
# Only data is required.
996+
transformer.transform(data='s3://my-input-bucket/path/to/my/csv/data',
997+
content_type='text/csv',
998+
split_type='Line')
999+
# Waits for the Pipeline Transform Job to finish.
1000+
transformer.wait()
1001+
1002+
This runs a transform job against all the files under ``s3://mybucket/path/to/my/csv/data``, transforming the input
1003+
data in order with each model container in the pipeline. For each input file that was successfully transformed, one output file in ``s3://my-output-bucket/path/to/my/output/data/``
1004+
will be created with the same name, appended with '.out'.
1005+
1006+
This transform job will split CSV files by newline separators, which is especially useful if the input files are large. The Transform Job will
1007+
assemble the outputs with line separators when writing each input file's corresponding output file.
1008+
1009+
Each payload entering the first model container will be up to six megabytes, and up to eight inference requests will be sent at the
1010+
same time to the first model container. Since each payload will consist of a mini-batch of multiple CSV records, the model
1011+
containers will transform each mini-batch of records.
1012+
9811013
For comprehensive examples on how to use Inference Pipelines please refer to the following notebooks:
9821014
9831015
- `inference_pipeline_sparkml_xgboost_abalone.ipynb <https://github.com/awslabs/amazon-sagemaker-examples/blob/master/advanced_functionality/inference_pipeline_sparkml_xgboost_abalone/inference_pipeline_sparkml_xgboost_abalone.ipynb>`__

src/sagemaker/model.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ def _create_sagemaker_model(self, instance_type, accelerator_type=None):
102102
Args:
103103
instance_type (str): The EC2 instance type that this Model will be used for, this is only
104104
used to determine if the image needs GPU support or not.
105-
accelerator_type (str): <put docs here>
105+
accelerator_type (str): Type of Elastic Inference accelerator to attach to an endpoint for model loading
106+
and inference, for example, 'ml.eia1.medium'. If not specified, no Elastic Inference accelerator
107+
will be attached to the endpoint.
106108
"""
107109
container_def = self.prepare_container_def(instance_type, accelerator_type=accelerator_type)
108110
self.name = self.name or utils.name_from_image(container_def['Image'])
@@ -292,10 +294,6 @@ def transformer(self, instance_count, instance_type, strategy=None, assemble_wit
292294
max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB.
293295
tags (list[dict]): List of tags for labeling a transform job. If none specified, then the tags used for
294296
the training job are used for the transform job.
295-
role (str): The ``ExecutionRoleArn`` IAM Role ARN for the ``Model``, which is also used during
296-
transform jobs. If not specified, the role from the Model will be used.
297-
model_server_workers (int): Optional. The number of worker processes used by the inference server.
298-
If None, server will use one worker per vCPU.
299297
volume_kms_key (str): Optional. KMS key ID for encrypting the volume attached to the ML
300298
compute instance (default: None).
301299
"""

src/sagemaker/pipeline.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import sagemaker
1616
from sagemaker.session import Session
1717
from sagemaker.utils import name_from_image
18+
from sagemaker.transformer import Transformer
1819

1920

2021
class PipelineModel(object):
@@ -104,6 +105,56 @@ def deploy(self, initial_instance_count, instance_type, endpoint_name=None, tags
104105
if self.predictor_cls:
105106
return self.predictor_cls(self.endpoint_name, self.sagemaker_session)
106107

108+
def _create_sagemaker_pipeline_model(self, instance_type):
109+
"""Create a SageMaker Model Entity
110+
111+
Args:
112+
instance_type (str): The EC2 instance type that this Model will be used for, this is only
113+
used to determine if the image needs GPU support or not.
114+
accelerator_type (str): Type of Elastic Inference accelerator to attach to an endpoint for model loading
115+
and inference, for example, 'ml.eia1.medium'. If not specified, no Elastic Inference accelerator
116+
will be attached to the endpoint.
117+
"""
118+
if not self.sagemaker_session:
119+
self.sagemaker_session = Session()
120+
121+
containers = self.pipeline_container_def(instance_type)
122+
123+
self.name = self.name or name_from_image(containers[0]['Image'])
124+
self.sagemaker_session.create_model(self.name, self.role, containers, vpc_config=self.vpc_config)
125+
126+
def transformer(self, instance_count, instance_type, strategy=None, assemble_with=None, output_path=None,
127+
output_kms_key=None, accept=None, env=None, max_concurrent_transforms=None,
128+
max_payload=None, tags=None, volume_kms_key=None):
129+
"""Return a ``Transformer`` that uses this Model.
130+
131+
Args:
132+
instance_count (int): Number of EC2 instances to use.
133+
instance_type (str): Type of EC2 instance to use, for example, 'ml.c4.xlarge'.
134+
strategy (str): The strategy used to decide how to batch records in a single request (default: None).
135+
Valid values: 'MULTI_RECORD' and 'SINGLE_RECORD'.
136+
assemble_with (str): How the output is assembled (default: None). Valid values: 'Line' or 'None'.
137+
output_path (str): S3 location for saving the transform result. If not specified, results are stored to
138+
a default bucket.
139+
output_kms_key (str): Optional. KMS key ID for encrypting the transform output (default: None).
140+
accept (str): The content type accepted by the endpoint deployed during the transform job.
141+
env (dict): Environment variables to be set for use during the transform job (default: None).
142+
max_concurrent_transforms (int): The maximum number of HTTP requests to be made to
143+
each individual transform container at one time.
144+
max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB.
145+
tags (list[dict]): List of tags for labeling a transform job. If none specified, then the tags used for
146+
the training job are used for the transform job.
147+
volume_kms_key (str): Optional. KMS key ID for encrypting the volume attached to the ML
148+
compute instance (default: None).
149+
"""
150+
self._create_sagemaker_pipeline_model(instance_type)
151+
152+
return Transformer(self.name, instance_count, instance_type, strategy=strategy, assemble_with=assemble_with,
153+
output_path=output_path, output_kms_key=output_kms_key, accept=accept,
154+
max_concurrent_transforms=max_concurrent_transforms, max_payload=max_payload,
155+
env=env, tags=tags, base_transform_job_name=self.name,
156+
volume_kms_key=volume_kms_key, sagemaker_session=self.sagemaker_session)
157+
107158
def delete_model(self):
108159
"""Delete the SageMaker model backing this pipeline model. This does not delete the list of SageMaker models used
109160
in multiple containers to build the inference pipeline.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1.0,28.0,C,38.0,71.5,1.0
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1.0,C,38.0,71.5,1.0,female

tests/integ/test_inference_pipeline.py

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import os
1717

1818
import pytest
19-
from tests.integ import DATA_DIR
20-
from tests.integ.timeout import timeout_and_delete_endpoint_by_name
19+
from tests.integ import DATA_DIR, TRANSFORM_DEFAULT_TIMEOUT_MINUTES
20+
from tests.integ.timeout import timeout_and_delete_endpoint_by_name, timeout_and_delete_model_with_transformer
2121

2222
from sagemaker.amazon.amazon_estimator import get_image_uri
2323
from sagemaker.content_types import CONTENT_TYPE_CSV
@@ -27,6 +27,74 @@
2727
from sagemaker.sparkml.model import SparkMLModel
2828
from sagemaker.utils import sagemaker_timestamp
2929

30+
SPARKML_DATA_PATH = os.path.join(DATA_DIR, 'sparkml_model')
31+
XGBOOST_DATA_PATH = os.path.join(DATA_DIR, 'xgboost_model')
32+
SPARKML_XGBOOST_DATA_DIR = 'sparkml_xgboost_pipeline'
33+
VALID_DATA_PATH = os.path.join(DATA_DIR, SPARKML_XGBOOST_DATA_DIR, 'valid_input.csv')
34+
INVALID_DATA_PATH = os.path.join(DATA_DIR, SPARKML_XGBOOST_DATA_DIR, 'invalid_input.csv')
35+
SCHEMA = json.dumps({
36+
"input": [
37+
{
38+
"name": "Pclass",
39+
"type": "float"
40+
},
41+
{
42+
"name": "Embarked",
43+
"type": "string"
44+
},
45+
{
46+
"name": "Age",
47+
"type": "float"
48+
},
49+
{
50+
"name": "Fare",
51+
"type": "float"
52+
},
53+
{
54+
"name": "SibSp",
55+
"type": "float"
56+
},
57+
{
58+
"name": "Sex",
59+
"type": "string"
60+
}
61+
],
62+
"output": {
63+
"name": "features",
64+
"struct": "vector",
65+
"type": "double"
66+
}
67+
})
68+
69+
70+
@pytest.mark.continuous_testing
71+
@pytest.mark.regional_testing
72+
def test_inference_pipeline_batch_transform(sagemaker_session):
73+
sparkml_model_data = sagemaker_session.upload_data(
74+
path=os.path.join(SPARKML_DATA_PATH, 'mleap_model.tar.gz'),
75+
key_prefix='integ-test-data/sparkml/model')
76+
xgb_model_data = sagemaker_session.upload_data(
77+
path=os.path.join(XGBOOST_DATA_PATH, 'xgb_model.tar.gz'),
78+
key_prefix='integ-test-data/xgboost/model')
79+
batch_job_name = 'test-inference-pipeline-batch-{}'.format(sagemaker_timestamp())
80+
sparkml_model = SparkMLModel(model_data=sparkml_model_data,
81+
env={'SAGEMAKER_SPARKML_SCHEMA': SCHEMA},
82+
sagemaker_session=sagemaker_session)
83+
xgb_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost')
84+
xgb_model = Model(model_data=xgb_model_data, image=xgb_image,
85+
sagemaker_session=sagemaker_session)
86+
model = PipelineModel(models=[sparkml_model, xgb_model], role='SageMakerRole',
87+
sagemaker_session=sagemaker_session, name=batch_job_name)
88+
transformer = model.transformer(1, 'ml.m4.xlarge')
89+
transform_input_key_prefix = 'integ-test-data/sparkml_xgboost/transform'
90+
transform_input = transformer.sagemaker_session.upload_data(path=VALID_DATA_PATH,
91+
key_prefix=transform_input_key_prefix)
92+
93+
with timeout_and_delete_model_with_transformer(transformer, sagemaker_session,
94+
minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES):
95+
transformer.transform(transform_input, content_type=CONTENT_TYPE_CSV, job_name=batch_job_name)
96+
transformer.wait()
97+
3098

3199
@pytest.mark.canary_quick
32100
@pytest.mark.regional_testing
@@ -40,42 +108,10 @@ def test_inference_pipeline_model_deploy(sagemaker_session):
40108
xgb_model_data = sagemaker_session.upload_data(
41109
path=os.path.join(xgboost_data_path, 'xgb_model.tar.gz'),
42110
key_prefix='integ-test-data/xgboost/model')
43-
schema = json.dumps({
44-
"input": [
45-
{
46-
"name": "Pclass",
47-
"type": "float"
48-
},
49-
{
50-
"name": "Embarked",
51-
"type": "string"
52-
},
53-
{
54-
"name": "Age",
55-
"type": "float"
56-
},
57-
{
58-
"name": "Fare",
59-
"type": "float"
60-
},
61-
{
62-
"name": "SibSp",
63-
"type": "float"
64-
},
65-
{
66-
"name": "Sex",
67-
"type": "string"
68-
}
69-
],
70-
"output": {
71-
"name": "features",
72-
"struct": "vector",
73-
"type": "double"
74-
}
75-
})
111+
76112
with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
77113
sparkml_model = SparkMLModel(model_data=sparkml_model_data,
78-
env={'SAGEMAKER_SPARKML_SCHEMA': schema},
114+
env={'SAGEMAKER_SPARKML_SCHEMA': SCHEMA},
79115
sagemaker_session=sagemaker_session)
80116
xgb_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost')
81117
xgb_model = Model(model_data=xgb_model_data, image=xgb_image,
@@ -87,11 +123,13 @@ def test_inference_pipeline_model_deploy(sagemaker_session):
87123
serializer=json_serializer, content_type=CONTENT_TYPE_CSV,
88124
accept=CONTENT_TYPE_CSV)
89125

90-
valid_data = '1.0,C,38.0,71.5,1.0,female'
91-
assert predictor.predict(valid_data) == "0.714013934135"
126+
with open(VALID_DATA_PATH, 'r') as f:
127+
valid_data = f.read()
128+
assert predictor.predict(valid_data) == '0.714013934135'
92129

93-
invalid_data = "1.0,28.0,C,38.0,71.5,1.0"
94-
assert (predictor.predict(invalid_data) is None)
130+
with open(INVALID_DATA_PATH, 'r') as f:
131+
invalid_data = f.read()
132+
assert (predictor.predict(invalid_data) is None)
95133

96134
model.delete_model()
97135
with pytest.raises(Exception) as exception:

tests/unit/test_pipeline_model.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,58 @@ def test_deploy_endpoint_name(tfo, time, sagemaker_session):
111111
framework_model = DummyFrameworkModel(sagemaker_session)
112112
sparkml_model = SparkMLModel(model_data=MODEL_DATA_2, role=ROLE, sagemaker_session=sagemaker_session)
113113
model = PipelineModel(models=[framework_model, sparkml_model], role=ROLE, sagemaker_session=sagemaker_session)
114-
model.deploy(endpoint_name='blah', instance_type=INSTANCE_TYPE, initial_instance_count=55)
114+
model.deploy(instance_type=INSTANCE_TYPE, initial_instance_count=1)
115115
sagemaker_session.endpoint_from_production_variants.assert_called_with(
116-
'blah',
116+
'mi-1-2017-10-10-14-14-15',
117117
[{'InitialVariantWeight': 1,
118118
'ModelName': 'mi-1-2017-10-10-14-14-15',
119119
'InstanceType': INSTANCE_TYPE,
120-
'InitialInstanceCount': 55,
120+
'InitialInstanceCount': 1,
121121
'VariantName': 'AllTraffic'}],
122122
None)
123123

124124

125+
@patch('tarfile.open')
126+
@patch('time.strftime', return_value=TIMESTAMP)
127+
def test_transformer(tfo, time, sagemaker_session):
128+
framework_model = DummyFrameworkModel(sagemaker_session)
129+
sparkml_model = SparkMLModel(model_data=MODEL_DATA_2, role=ROLE, sagemaker_session=sagemaker_session)
130+
model_name = 'ModelName'
131+
model = PipelineModel(models=[framework_model, sparkml_model], role=ROLE, sagemaker_session=sagemaker_session,
132+
name=model_name)
133+
134+
instance_count = 55
135+
strategy = 'MultiRecord'
136+
assemble_with = 'Line'
137+
output_path = "s3://output/path"
138+
output_kms_key = "output:kms:key"
139+
accept = "application/jsonlines"
140+
env = {"my_key": "my_value"}
141+
max_concurrent_transforms = 20
142+
max_payload = 5
143+
tags = [{"my_tag": "my_value"}]
144+
volume_kms_key = "volume:kms:key"
145+
transformer = model.transformer(instance_type=INSTANCE_TYPE, instance_count=instance_count,
146+
strategy=strategy, assemble_with=assemble_with, output_path=output_path,
147+
output_kms_key=output_kms_key, accept=accept, env=env,
148+
max_concurrent_transforms=max_concurrent_transforms,
149+
max_payload=max_payload, tags=tags, volume_kms_key=volume_kms_key
150+
)
151+
assert transformer.instance_type == INSTANCE_TYPE
152+
assert transformer.instance_count == instance_count
153+
assert transformer.strategy == strategy
154+
assert transformer.assemble_with == assemble_with
155+
assert transformer.output_path == output_path
156+
assert transformer.output_kms_key == output_kms_key
157+
assert transformer.accept == accept
158+
assert transformer.env == env
159+
assert transformer.max_concurrent_transforms == max_concurrent_transforms
160+
assert transformer.max_payload == max_payload
161+
assert transformer.tags == tags
162+
assert transformer.volume_kms_key == volume_kms_key
163+
assert transformer.model_name == model_name
164+
165+
125166
@patch('tarfile.open')
126167
@patch('time.strftime', return_value=TIMESTAMP)
127168
def test_deploy_tags(tfo, time, sagemaker_session):

0 commit comments

Comments
 (0)