Skip to content

feature: Support for TFS preprocessing #797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/sagemaker/local/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,14 @@ def retrieve_artifacts(self, compose_data, output_data_config, job_name):
sagemaker.local.utils.recursive_copy(host_dir, output_artifacts)

# Tar Artifacts -> model.tar.gz and output.tar.gz
model_files = [os.path.join(model_artifacts, name) for name in os.listdir(model_artifacts)]
output_files = [os.path.join(output_artifacts, name) for name in os.listdir(output_artifacts)]
sagemaker.utils.create_tar_file(model_files, os.path.join(compressed_artifacts, 'model.tar.gz'))
sagemaker.utils.create_tar_file(output_files, os.path.join(compressed_artifacts, 'output.tar.gz'))
model_files = [os.path.join(model_artifacts, name) for name in
os.listdir(model_artifacts)]
output_files = [os.path.join(output_artifacts, name) for name in
os.listdir(output_artifacts)]
sagemaker.utils.create_tar_file(model_files,
os.path.join(compressed_artifacts, 'model.tar.gz'))
sagemaker.utils.create_tar_file(output_files,
os.path.join(compressed_artifacts, 'output.tar.gz'))

if output_data_config['S3OutputPath'] == '':
output_data = 'file://%s' % compressed_artifacts
Expand Down
8 changes: 6 additions & 2 deletions src/sagemaker/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Model(object):
"""A SageMaker ``Model`` that can be deployed to an ``Endpoint``."""

def __init__(self, model_data, image, role=None, predictor_cls=None, env=None, name=None, vpc_config=None,
sagemaker_session=None):
sagemaker_session=None, enable_network_isolation=False):
"""Initialize an SageMaker ``Model``.

Args:
Expand All @@ -58,6 +58,9 @@ def __init__(self, model_data, image, role=None, predictor_cls=None, env=None, n
* 'SecurityGroupIds' (list[str]): List of security group ids.
sagemaker_session (sagemaker.session.Session): A SageMaker Session object, used for SageMaker
interactions (default: None). If not specified, one is created using the default AWS configuration chain.
enable_network_isolation (Boolean): Default False. if True, enables network isolation in the endpoint,
isolating the model container. No inbound or outbound network calls can be made to or from the
model container.
"""
self.model_data = model_data
self.image = image
Expand All @@ -69,6 +72,7 @@ def __init__(self, model_data, image, role=None, predictor_cls=None, env=None, n
self.sagemaker_session = sagemaker_session
self._model_name = None
self._is_compiled_model = False
self._enable_network_isolation = enable_network_isolation

def prepare_container_def(self, instance_type, accelerator_type=None): # pylint: disable=unused-argument
"""Return a dict created by ``sagemaker.container_def()`` for deploying this model to a specified instance type.
Expand All @@ -92,7 +96,7 @@ def enable_network_isolation(self):
Returns:
bool: If network isolation should be enabled or not.
"""
return False
return self._enable_network_isolation

def _create_sagemaker_model(self, instance_type, accelerator_type=None, tags=None):
"""Create a SageMaker Model Entity
Expand Down
18 changes: 14 additions & 4 deletions src/sagemaker/tensorflow/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from __future__ import absolute_import

import logging

import sagemaker
from sagemaker.content_types import CONTENT_TYPE_JSON
from sagemaker.fw_utils import create_image_uri
Expand Down Expand Up @@ -88,7 +89,7 @@ def predict(self, data, initial_args=None):
return super(Predictor, self).predict(data, args)


class Model(sagemaker.Model):
class Model(sagemaker.model.FrameworkModel):
FRAMEWORK_NAME = 'tensorflow-serving'
LOG_LEVEL_PARAM_NAME = 'SAGEMAKER_TFS_NGINX_LOGLEVEL'
LOG_LEVEL_MAP = {
Expand All @@ -99,7 +100,7 @@ class Model(sagemaker.Model):
logging.CRITICAL: 'crit',
}

def __init__(self, model_data, role, image=None, framework_version=TF_VERSION,
def __init__(self, model_data, role, entry_point=None, image=None, framework_version=TF_VERSION,
container_log_level=None, predictor_cls=Predictor, **kwargs):
"""Initialize a Model.

Expand All @@ -118,14 +119,23 @@ def __init__(self, model_data, role, image=None, framework_version=TF_VERSION,
**kwargs: Keyword arguments passed to the ``Model`` initializer.
"""
super(Model, self).__init__(model_data=model_data, role=role, image=image,
predictor_cls=predictor_cls, **kwargs)
predictor_cls=predictor_cls, entry_point=entry_point, **kwargs)
self._framework_version = framework_version
self._container_log_level = container_log_level

def prepare_container_def(self, instance_type, accelerator_type=None):
image = self._get_image_uri(instance_type, accelerator_type)
env = self._get_container_env()
return sagemaker.container_def(image, self.model_data, env)

if self.entry_point:
model_data = sagemaker.utils.repack_model(self.entry_point,
self.source_dir,
self.model_data,
self.sagemaker_session)
else:
model_data = self.model_data

return sagemaker.container_def(image, model_data, env)

def _get_container_env(self):
if not self._container_log_level:
Expand Down
93 changes: 90 additions & 3 deletions src/sagemaker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import contextlib
import errno
import os
import random
import re
import shutil
import sys
import tarfile
import tempfile
import time

from datetime import datetime
from functools import wraps
from six.moves.urllib import parse

import six

import sagemaker

ECR_URI_PATTERN = r'^(\d+)(\.)dkr(\.)ecr(\.)(.+)(\.)(amazonaws.com|c2s.ic.gov)(/)(.*:.*)$'

Expand Down Expand Up @@ -258,13 +262,10 @@ def download_folder(bucket_name, prefix, target, sagemaker_session):

def create_tar_file(source_files, target=None):
"""Create a tar file containing all the source_files

Args:
source_files (List[str]): List of file paths that will be contained in the tar file

Returns:
(str): path to created tar file

"""
if target:
filename = target
Expand All @@ -278,6 +279,92 @@ def create_tar_file(source_files, target=None):
return filename


@contextlib.contextmanager
def _tmpdir(suffix='', prefix='tmp'):
"""Create a temporary directory with a context manager. The file is deleted when the context exits.

The prefix, suffix, and dir arguments are the same as for mkstemp().

Args:
suffix (str): If suffix is specified, the file name will end with that suffix, otherwise there will be no
suffix.
prefix (str): If prefix is specified, the file name will begin with that prefix; otherwise,
a default prefix is used.
dir (str): If dir is specified, the file will be created in that directory; otherwise, a default directory is
used.
Returns:
str: path to the directory
"""
tmp = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=None)
yield tmp
shutil.rmtree(tmp)


def repack_model(inference_script, source_directory, model_uri, sagemaker_session):
"""Unpack model tarball and creates a new model tarball with the provided code script.

This function does the following:
- uncompresses model tarball from S3 or local system into a temp folder
- replaces the inference code from the model with the new code provided
- compresses the new model tarball and saves it in S3 or local file system

Args:
inference_script (str): path or basename of the inference script that will be packed into the model
source_directory (str): path including all the files that will be packed into the model
model_uri (str): S3 or file system location of the original model tar
sagemaker_session (:class:`sagemaker.session.Session`): a sagemaker session to interact with S3.

Returns:
str: path to the new packed model
"""
new_model_name = 'model-%s.tar.gz' % sagemaker.utils.sagemaker_short_timestamp()

with _tmpdir() as tmp:
tmp_model_dir = os.path.join(tmp, 'model')
os.mkdir(tmp_model_dir)

model_from_s3 = model_uri.startswith('s3://')
if model_from_s3:
local_model_path = os.path.join(tmp, 'tar_file')
download_file_from_url(model_uri, local_model_path, sagemaker_session)

new_model_path = os.path.join(tmp, new_model_name)
else:
local_model_path = model_uri.replace('file://', '')
new_model_path = os.path.join(os.path.dirname(local_model_path), new_model_name)

with tarfile.open(name=local_model_path, mode='r:gz') as t:
t.extractall(path=tmp_model_dir)

code_dir = os.path.join(tmp_model_dir, 'code')
if os.path.exists(code_dir):
shutil.rmtree(code_dir, ignore_errors=True)

dirname = source_directory if source_directory else os.path.dirname(inference_script)

shutil.copytree(dirname, code_dir)

with tarfile.open(new_model_path, mode='w:gz') as t:
t.add(tmp_model_dir, arcname=os.path.sep)

if model_from_s3:
url = parse.urlparse(model_uri)
bucket, key = url.netloc, url.path.lstrip('/')
new_key = key.replace(os.path.basename(key), new_model_name)

sagemaker_session.boto_session.resource('s3').Object(bucket, new_key).upload_file(new_model_path)
return 's3://%s/%s' % (bucket, new_key)
else:
return 'file://%s' % new_model_path


def download_file_from_url(url, dst, sagemaker_session):
url = parse.urlparse(url)
bucket, key = url.netloc, url.path.lstrip('/')

download_file(bucket, key, dst, sagemaker_session)


def download_file(bucket_name, path, target, sagemaker_session):
"""Download a Single File from S3 into a local path

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
asset-file-contents
Binary file not shown.
Binary file not shown.
Binary file not shown.
26 changes: 26 additions & 0 deletions tests/data/tfs/tfs-test-model-with-inference/code/inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import json


def input_handler(data, context):
data = json.loads(data.read().decode('utf-8'))
new_values = [x + 1 for x in data['instances']]
dumps = json.dumps({'instances': new_values})
return dumps


def output_handler(data, context):
response_content_type = context.accept_header
prediction = data.content
return prediction, response_content_type
92 changes: 86 additions & 6 deletions tests/integ/test_tfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import tarfile

import botocore.exceptions
import os

import pytest
import sagemaker
import sagemaker.predictor
Expand All @@ -36,28 +40,87 @@ def instance_type(request):
def tfs_predictor(instance_type, sagemaker_session, tf_full_version):
endpoint_name = sagemaker.utils.unique_name_from_base('sagemaker-tensorflow-serving')
model_data = sagemaker_session.upload_data(
path='tests/data/tensorflow-serving-test-model.tar.gz',
path=os.path.join(tests.integ.DATA_DIR, 'tensorflow-serving-test-model.tar.gz'),
key_prefix='tensorflow-serving/models')
with tests.integ.timeout.timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
with tests.integ.timeout.timeout_and_delete_endpoint_by_name(endpoint_name,
sagemaker_session):
model = Model(model_data=model_data, role='SageMakerRole',
framework_version=tf_full_version,
sagemaker_session=sagemaker_session)
predictor = model.deploy(1, instance_type, endpoint_name=endpoint_name)
yield predictor


def tar_dir(directory, tmpdir):
target = os.path.join(str(tmpdir), 'model.tar.gz')

with tarfile.open(target, mode='w:gz') as t:
t.add(directory, arcname=os.path.sep)
return target


@pytest.fixture
def tfs_predictor_with_model_and_entry_point_same_tar(instance_type,
sagemaker_session,
tf_full_version,
tmpdir):
endpoint_name = sagemaker.utils.unique_name_from_base('sagemaker-tensorflow-serving')

model_tar = tar_dir(os.path.join(tests.integ.DATA_DIR, 'tfs/tfs-test-model-with-inference'),
tmpdir)

model_data = sagemaker_session.upload_data(
path=model_tar,
key_prefix='tensorflow-serving/models')

with tests.integ.timeout.timeout_and_delete_endpoint_by_name(endpoint_name,
sagemaker_session):
model = Model(model_data=model_data,
role='SageMakerRole',
framework_version=tf_full_version,
sagemaker_session=sagemaker_session)
predictor = model.deploy(1, instance_type, endpoint_name=endpoint_name)
yield predictor


@pytest.fixture(scope='module')
def tfs_predictor_with_model_and_entry_point_separated(instance_type,
sagemaker_session, tf_full_version):
endpoint_name = sagemaker.utils.unique_name_from_base('sagemaker-tensorflow-serving')

model_data = sagemaker_session.upload_data(
path=os.path.join(tests.integ.DATA_DIR,
'tensorflow-serving-test-model.tar.gz'),
key_prefix='tensorflow-serving/models')

with tests.integ.timeout.timeout_and_delete_endpoint_by_name(endpoint_name,
sagemaker_session):
entry_point = os.path.join(tests.integ.DATA_DIR,
'tfs/tfs-test-model-with-inference/code/inference.py')
model = Model(entry_point=entry_point,
model_data=model_data,
role='SageMakerRole',
framework_version=tf_full_version,
sagemaker_session=sagemaker_session)
predictor = model.deploy(1, instance_type, endpoint_name=endpoint_name)
yield predictor


@pytest.fixture(scope='module')
def tfs_predictor_with_accelerator(sagemaker_session, tf_full_version):
endpoint_name = sagemaker.utils.unique_name_from_base("sagemaker-tensorflow-serving")
instance_type = 'ml.c4.large'
accelerator_type = 'ml.eia1.medium'
model_data = sagemaker_session.upload_data(path='tests/data/tensorflow-serving-test-model.tar.gz',
key_prefix='tensorflow-serving/models')
with tests.integ.timeout.timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
model_data = sagemaker_session.upload_data(
path=os.path.join(tests.integ.DATA_DIR, 'tensorflow-serving-test-model.tar.gz'),
key_prefix='tensorflow-serving/models')
with tests.integ.timeout.timeout_and_delete_endpoint_by_name(endpoint_name,
sagemaker_session):
model = Model(model_data=model_data, role='SageMakerRole',
framework_version=tf_full_version,
sagemaker_session=sagemaker_session)
predictor = model.deploy(1, instance_type, endpoint_name=endpoint_name, accelerator_type=accelerator_type)
predictor = model.deploy(1, instance_type, endpoint_name=endpoint_name,
accelerator_type=accelerator_type)
yield predictor


Expand All @@ -81,6 +144,23 @@ def test_predict_with_accelerator(tfs_predictor_with_accelerator):
assert expected_result == result


def test_predict_with_entry_point(tfs_predictor_with_model_and_entry_point_same_tar):
input_data = {'instances': [1.0, 2.0, 5.0]}
expected_result = {'predictions': [4.0, 4.5, 6.0]}

result = tfs_predictor_with_model_and_entry_point_same_tar.predict(input_data)
assert expected_result == result


def test_predict_with_model_and_entry_point_separated(
tfs_predictor_with_model_and_entry_point_separated):
input_data = {'instances': [1.0, 2.0, 5.0]}
expected_result = {'predictions': [4.0, 4.5, 6.0]}

result = tfs_predictor_with_model_and_entry_point_separated.predict(input_data)
assert expected_result == result


def test_predict_generic_json(tfs_predictor):
input_data = [[1.0, 2.0, 5.0], [1.0, 2.0, 5.0]]
expected_result = {'predictions': [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]}
Expand Down
Loading