Skip to content

Add support for Batch Transform and update README with TF Pipe Mode #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
CHANGELOG
=========

1.7.0
=====

* feature: Transformer: add support for batch transform jobs
* feature: Documentation: add instructions for using Pipe Mode with TensorFlow

1.6.1
=====

Expand Down
40 changes: 37 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ Table of Contents
7. `AWS SageMaker Estimators <#aws-sagemaker-estimators>`__
8. `BYO Docker Containers with SageMaker Estimators <#byo-docker-containers-with-sagemaker-estimators>`__
9. `SageMaker Automatic Model Tuning <#sagemaker-automatic-model-tuning>`__
10. `BYO Model <#byo-model>`__
10. `SageMaker Batch Transform <#sagemaker-batch-transform>`__
11. `BYO Model <#byo-model>`__


Getting SageMaker Python SDK
Expand All @@ -50,7 +51,7 @@ You can install from source by cloning this repository and issuing a pip install

git clone https://github.com/aws/sagemaker-python-sdk.git
python setup.py sdist
pip install dist/sagemaker-1.6.1.tar.gz
pip install dist/sagemaker-1.7.0.tar.gz

Supported Python versions
~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -375,6 +376,39 @@ For more detailed explanations of the classes that this library provides for aut
- `API docs for analytics classes <https://sagemaker.readthedocs.io/en/latest/analytics.html>`__


SageMaker Batch Transform
-------------------------

Once you have a trained model, you can use Amazon SageMaker Batch Transform to perform inferences with the model.
Batch Transform manages all compute resources necessary, including launching instances to deploy endpoints and deleting them afterward.
You can read more about SageMaker Batch Transform in the `AWS documentation <https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-batch.html>`__.

If you have trained the model using a SageMaker Python SDK Estimator, you can simply invoke ``transformer()`` to create a ``Transformer`` for the training job:

.. code:: python

transformer = estimator.transformer(instance_count=1, instance_type='ml.m4.xlarge')

Alternatively, if you already have a SageMaker Model, you can instantiate a ``Transformer`` directly with its constructor:

.. code:: python

transformer = Transformer(model_name='my-previously-trained-model',
instance_count=1,
instance_type='ml.m4.xlarge')

For a full list of the possible options to configure through either of these methods, please refer to the API docs for `Estimator <https://sagemaker.readthedocs.io/en/latest/estimators.html#sagemaker.estimator.Estimator.transformer>`__ or `Transformer <https://sagemaker.readthedocs.io/en/latest/transformer.html#sagemaker.transformer.Transformer>`__.

Once you've created a ``Transformer`` object, you can invoke ``transform()`` to being a batch transform job with the S3 location of your data.
You can also specify other attributes about your data, such as the content type.

.. code:: python

transformer.transform('s3://my-bucket/batch-transform-input')

For more details about what can be specified here, please refer to the `API docs <https://sagemaker.readthedocs.io/en/latest/transformer.html#sagemaker.transformer.Transformer.transform>`__.


FAQ
---

Expand Down Expand Up @@ -422,7 +456,7 @@ Example code using the TensorFlow predictor:


BYO Model
-----------------------------------------------
---------
You can also create an endpoint from an existing model rather than training one - i.e. bring your own model.

First, package the files for the trained model into a ``.tar.gz`` file, and upload the archive to S3.
Expand Down
1 change: 1 addition & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The SageMaker Python SDK consists of a few primary interfaces:
estimators
tuner
predictors
transformer
session
model
analytics
Expand Down
7 changes: 7 additions & 0 deletions doc/transformer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Transformer
-----------

.. autoclass:: sagemaker.transformer.Transformer
:members:
:undoc-members:
:show-inheritance:
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def read(fname):


setup(name="sagemaker",
version="1.6.1",
version="1.7.0",
description="Open source library for training and deploying models on Amazon SageMaker.",
packages=find_packages('src'),
package_dir={'': 'src'},
Expand Down
91 changes: 86 additions & 5 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from sagemaker.predictor import RealTimePredictor
from sagemaker.session import Session
from sagemaker.session import s3_input
from sagemaker.utils import base_name_from_image, name_from_base, get_config_value
from sagemaker.transformer import Transformer
from sagemaker.utils import base_name_from_image, name_from_base, name_from_image, get_config_value


class EstimatorBase(with_metaclass(ABCMeta, object)):
Expand Down Expand Up @@ -253,8 +254,7 @@ def deploy(self, initial_instance_count, instance_type, endpoint_name=None, **kw
sagemaker.predictor.RealTimePredictor: A predictor that provides a ``predict()`` method,
which can be used to send requests to the Amazon SageMaker endpoint and obtain inferences.
"""
if not self.latest_training_job:
raise RuntimeError('Estimator has not been fit yet.')
self._ensure_latest_training_job()
endpoint_name = endpoint_name or self.latest_training_job.name
self.deploy_instance_type = instance_type
return self.create_model(**kwargs).deploy(
Expand Down Expand Up @@ -314,10 +314,43 @@ def delete_endpoint(self):
Raises:
ValueError: If the endpoint does not exist.
"""
if self.latest_training_job is None:
raise ValueError('Endpoint was not created yet')
self._ensure_latest_training_job(error_message='Endpoint was not created yet')
self.sagemaker_session.delete_endpoint(self.latest_training_job.name)

def transformer(self, instance_count, instance_type, strategy=None, assemble_with=None, output_path=None,
output_kms_key=None, accept=None, env=None, max_concurrent_transforms=None,
max_payload=None, tags=None):
"""Return a ``Transformer`` that uses a SageMaker Model based on the training job. It reuses the
SageMaker Session and base job name used by the Estimator.

Args:
instance_count (int): Number of EC2 instances to use.
instance_type (str): Type of EC2 instance to use, for example, 'ml.c4.xlarge'.
strategy (str): The strategy used to decide how to batch records in a single request (default: None).
Valid values: 'MULTI_RECORD' and 'SINGLE_RECORD'.
assemble_with (str): How the output is assembled (default: None). Valid values: 'Line' or 'None'.
output_path (str): S3 location for saving the transform result. If not specified, results are stored to
a default bucket.
output_kms_key (str): Optional. KMS key ID for encrypting the transform output (default: None).
accept (str): The content type accepted by the endpoint deployed during the transform job.
env (dict): Environment variables to be set for use during the transform job (default: None).
max_concurrent_transforms (int): The maximum number of HTTP requests to be made to
each individual transform container at one time.
max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB.
tags (list[dict]): List of tags for labeling a transform job. If none specified, then the tags used for
the training job are used for the transform job.
"""
self._ensure_latest_training_job()

model_name = self.sagemaker_session.create_model_from_job(self.latest_training_job.name)
tags = tags or self.tags

return Transformer(model_name, instance_count, instance_type, strategy=strategy, assemble_with=assemble_with,
output_path=output_path, output_kms_key=output_kms_key, accept=accept,
max_concurrent_transforms=max_concurrent_transforms, max_payload=max_payload,
env=env, tags=tags, base_transform_job_name=self.base_job_name,
sagemaker_session=self.sagemaker_session)

@property
def training_job_analytics(self):
"""Return a ``TrainingJobAnalytics`` object for the current training job.
Expand All @@ -326,6 +359,10 @@ def training_job_analytics(self):
raise ValueError('Estimator is not associated with a TrainingJob')
return TrainingJobAnalytics(self._current_job_name, sagemaker_session=self.sagemaker_session)

def _ensure_latest_training_job(self, error_message='Estimator is not associated with a training job'):
if self.latest_training_job is None:
raise ValueError(error_message)


class _TrainingJob(_Job):
def __init__(self, sagemaker_session, training_job_name):
Expand Down Expand Up @@ -698,6 +735,50 @@ def _update_init_params(cls, hp, tf_arguments):
updated_params[argument] = value
return updated_params

def transformer(self, instance_count, instance_type, strategy=None, assemble_with=None, output_path=None,
output_kms_key=None, accept=None, env=None, max_concurrent_transforms=None,
max_payload=None, tags=None, model_server_workers=None):
"""Return a ``Transformer`` that uses a SageMaker Model based on the training job. It reuses the
SageMaker Session and base job name used by the Estimator.

Args:
instance_count (int): Number of EC2 instances to use.
instance_type (str): Type of EC2 instance to use, for example, 'ml.c4.xlarge'.
strategy (str): The strategy used to decide how to batch records in a single request (default: None).
Valid values: 'MULTI_RECORD' and 'SINGLE_RECORD'.
assemble_with (str): How the output is assembled (default: None). Valid values: 'Line' or 'None'.
output_path (str): S3 location for saving the transform result. If not specified, results are stored to
a default bucket.
output_kms_key (str): Optional. KMS key ID for encrypting the transform output (default: None).
accept (str): The content type accepted by the endpoint deployed during the transform job.
env (dict): Environment variables to be set for use during the transform job (default: None).
max_concurrent_transforms (int): The maximum number of HTTP requests to be made to
each individual transform container at one time.
max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB.
tags (list[dict]): List of tags for labeling a transform job. If none specified, then the tags used for
the training job are used for the transform job.
model_server_workers (int): Optional. The number of worker processes used by the inference server.
If None, server will use one worker per vCPU.
"""
self._ensure_latest_training_job()

model = self.create_model(model_server_workers=model_server_workers)

container_def = model.prepare_container_def(instance_type)
model_name = model.name or name_from_image(container_def['Image'])
self.sagemaker_session.create_model(model_name, self.role, container_def)

transform_env = model.env.copy()
if env is not None:
transform_env.update(env)

tags = tags or self.tags
return Transformer(model_name, instance_count, instance_type, strategy=strategy, assemble_with=assemble_with,
output_path=output_path, output_kms_key=output_kms_key, accept=accept,
max_concurrent_transforms=max_concurrent_transforms, max_payload=max_payload,
env=transform_env, tags=tags, base_transform_job_name=self.base_job_name,
sagemaker_session=self.sagemaker_session)


def _s3_uri_prefix(channel_name, s3_data):
if isinstance(s3_data, s3_input):
Expand Down
9 changes: 3 additions & 6 deletions src/sagemaker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,14 @@ def _format_string_uri_input(input):
elif input.startswith('file://'):
return file_input(input)
else:
raise ValueError(
'Training input data must be a valid S3 or FILE URI: must start with "s3://" or '
'"file://"')
raise ValueError('Training input data must be a valid S3 or FILE URI: must start with "s3://" or '
'"file://"')
elif isinstance(input, s3_input):
return input
elif isinstance(input, file_input):
return input
else:
raise ValueError(
'Cannot format input {}. Expecting one of str, s3_input, or file_input'.format(
input))
raise ValueError('Cannot format input {}. Expecting one of str, s3_input, or file_input'.format(input))

@staticmethod
def _format_record_set_list_input(inputs):
Expand Down
88 changes: 87 additions & 1 deletion src/sagemaker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,52 @@ def stop_tuning_job(self, name):
LOGGER.error('Error occurred while attempting to stop tuning job: {}. Please try again.'.format(name))
raise

def transform(self, job_name, model_name, strategy, max_concurrent_transforms, max_payload, env,
input_config, output_config, resource_config, tags):
"""Create an Amazon SageMaker transform job.

Args:
job_name (str): Name of the transform job being created.
model_name (str): Name of the SageMaker model being used for the transform job.
strategy (str): The strategy used to decide how to batch records in a single request.
Possible values are 'MULTI_RECORD' and 'SINGLE_RECORD'.
max_concurrent_transforms (int): The maximum number of HTTP requests to be made to
each individual transform container at one time.
max_payload (int): Maximum size of the payload in a single HTTP request to the container in MB.
env (dict): Environment variables to be set for use during the transform job.
input_config (dict): A dictionary describing the input data (and its location) for the job.
output_config (dict): A dictionary describing the output location for the job.
resource_config (dict): A dictionary describing the resources to complete the job.
tags (list[dict]): List of tags for labeling a training job. For more, see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html.
"""
transform_request = {
'TransformJobName': job_name,
'ModelName': model_name,
'TransformInput': input_config,
'TransformOutput': output_config,
'TransformResources': resource_config,
}

if strategy is not None:
transform_request['BatchStrategy'] = strategy

if max_concurrent_transforms is not None:
transform_request['MaxConcurrentTransforms'] = max_concurrent_transforms

if max_payload is not None:
transform_request['MaxPayloadInMB'] = max_payload

if env is not None:
transform_request['Environment'] = env

if tags is not None:
transform_request['Tags'] = tags

LOGGER.info('Creating transform job with name: {}'.format(job_name))
LOGGER.debug('Transform request: {}'.format(json.dumps(transform_request, indent=4)))
self.sagemaker_client.create_transform_job(**transform_request)

def create_model(self, name, role, primary_container):
"""Create an Amazon SageMaker ``Model``.

Expand Down Expand Up @@ -522,6 +568,23 @@ def wait_for_tuning_job(self, job, poll=5):
self._check_job_status(job, desc, 'HyperParameterTuningJobStatus')
return desc

def wait_for_transform_job(self, job, poll=5):
"""Wait for an Amazon SageMaker transform job to complete.

Args:
job (str): Name of the transform job to wait for.
poll (int): Polling interval in seconds (default: 5).

Returns:
(dict): Return value from the ``DescribeTransformJob`` API.

Raises:
ValueError: If the transform job fails.
"""
desc = _wait_until(lambda: _transform_job_status(self.sagemaker_client, job), poll)
self._check_job_status(job, desc, 'TransformJobStatus')
return desc

def _check_job_status(self, job, desc, status_key_name):
"""Check to see if the job completed successfully and, if not, construct and
raise a ValueError.
Expand Down Expand Up @@ -898,7 +961,7 @@ def __init__(self, s3_data, distribution='FullyReplicated', compression=None,
compression (str): Valid values: 'Gzip', None (default: None). This is used only in Pipe input mode.
content_type (str): MIME type of the input data (default: None).
record_wrapping (str): Valid values: 'RecordIO' (default: None).
s3_data_type (str): Value values: 'S3Prefix', 'ManifestFile'. If 'S3Prefix', ``s3_data`` defines
s3_data_type (str): Valid values: 'S3Prefix', 'ManifestFile'. If 'S3Prefix', ``s3_data`` defines
a prefix of s3 objects to train on. All objects with s3 keys beginning with ``s3_data`` will
be used to train. If 'ManifestFile', then ``s3_data`` defines a single s3 manifest file, listing
each s3 object to train on. The Manifest file format is described in the SageMaker API documentation:
Expand Down Expand Up @@ -982,6 +1045,29 @@ def _tuning_job_status(sagemaker_client, job_name):
return desc


def _transform_job_status(sagemaker_client, job_name):
transform_job_status_codes = {
'Completed': '!',
'InProgress': '.',
'Failed': '*',
'Stopped': 's',
'Stopping': '_'
}
in_progress_statuses = ['InProgress', 'Stopping']

desc = sagemaker_client.describe_transform_job(TransformJobName=job_name)
status = desc['TransformJobStatus']

print(transform_job_status_codes.get(status, '?'), end='')
sys.stdout.flush()

if status in in_progress_statuses:
return None

print('')
return desc


def _deploy_done(sagemaker_client, endpoint_name):
hosting_status_codes = {
"OutOfService": "x",
Expand Down
Loading