diff --git a/tests/data/cifar_10/source/keras_cnn_cifar_10.py b/tests/data/cifar_10/source/keras_cnn_cifar_10.py new file mode 100644 index 0000000000..c73a1a5f90 --- /dev/null +++ b/tests/data/cifar_10/source/keras_cnn_cifar_10.py @@ -0,0 +1,198 @@ +# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os + +import tensorflow as tf +from tensorflow.python.keras.layers import InputLayer, Conv2D, Activation, MaxPooling2D, Dropout, Flatten, Dense +from tensorflow.python.keras.models import Sequential +from tensorflow.python.keras.optimizers import RMSprop +from tensorflow.python.saved_model.signature_constants import PREDICT_INPUTS + +HEIGHT = 32 +WIDTH = 32 +DEPTH = 3 +NUM_CLASSES = 10 +NUM_DATA_BATCHES = 5 +NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 10000 * NUM_DATA_BATCHES +BATCH_SIZE = 128 + + +def keras_model_fn(hyperparameters): + """keras_model_fn receives hyperparameters from the training job and returns a compiled keras model. + The model will be transformed into a TensorFlow Estimator before training and it will be saved in a + TensorFlow Serving SavedModel at the end of training. + Args: + hyperparameters: The hyperparameters passed to the SageMaker TrainingJob that runs your TensorFlow + training script. + Returns: A compiled Keras model + """ + model = Sequential() + + # TensorFlow Serving default prediction input tensor name is PREDICT_INPUTS. + # We must conform to this naming scheme. + model.add(InputLayer(input_shape=(HEIGHT, WIDTH, DEPTH), name=PREDICT_INPUTS)) + model.add(Conv2D(32, (3, 3), padding='same')) + model.add(Activation('relu')) + model.add(Conv2D(32, (3, 3))) + model.add(Activation('relu')) + model.add(MaxPooling2D(pool_size=(2, 2))) + model.add(Dropout(0.25)) + + model.add(Conv2D(64, (3, 3), padding='same')) + model.add(Activation('relu')) + model.add(Conv2D(64, (3, 3))) + model.add(Activation('relu')) + model.add(MaxPooling2D(pool_size=(2, 2))) + model.add(Dropout(0.25)) + + model.add(Flatten()) + model.add(Dense(512)) + model.add(Activation('relu')) + model.add(Dropout(0.5)) + model.add(Dense(NUM_CLASSES)) + model.add(Activation('softmax')) + + _model = tf.keras.Model(inputs=model.input, outputs=model.output) + + opt = RMSprop(lr=hyperparameters['learning_rate'], decay=hyperparameters['decay']) + + _model.compile(loss='categorical_crossentropy', + optimizer=opt, + metrics=['accuracy']) + + return _model + + +def serving_input_fn(params): + # Notice that the input placeholder has the same input shape as the Keras model input + tensor = tf.placeholder(tf.float32, shape=[None, HEIGHT, WIDTH, DEPTH]) + + # The inputs key PREDICT_INPUTS matches the Keras InputLayer name + inputs = {PREDICT_INPUTS: tensor} + return tf.estimator.export.ServingInputReceiver(inputs, inputs) + + +def train_input_fn(training_dir, params): + return _input(tf.estimator.ModeKeys.TRAIN, + batch_size=BATCH_SIZE, data_dir=training_dir) + + +def eval_input_fn(training_dir, params): + return _input(tf.estimator.ModeKeys.EVAL, + batch_size=BATCH_SIZE, data_dir=training_dir) + + +def _input(mode, batch_size, data_dir): + """Uses the tf.data input pipeline for CIFAR-10 dataset. + Args: + mode: Standard names for model modes (tf.estimators.ModeKeys). + batch_size: The number of samples per batch of input requested. + """ + dataset = _record_dataset(_filenames(mode, data_dir)) + + # For training repeat forever. + if mode == tf.estimator.ModeKeys.TRAIN: + dataset = dataset.repeat() + + dataset = dataset.map(_dataset_parser) + dataset.prefetch(2 * batch_size) + + # For training, preprocess the image and shuffle. + if mode == tf.estimator.ModeKeys.TRAIN: + dataset = dataset.map(_train_preprocess_fn) + dataset.prefetch(2 * batch_size) + + # Ensure that the capacity is sufficiently large to provide good random + # shuffling. + buffer_size = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN * 0.4) + 3 * batch_size + dataset = dataset.shuffle(buffer_size=buffer_size) + + # Subtract off the mean and divide by the variance of the pixels. + dataset = dataset.map( + lambda image, label: (tf.image.per_image_standardization(image), label)) + dataset.prefetch(2 * batch_size) + + # Batch results by up to batch_size, and then fetch the tuple from the + # iterator. + iterator = dataset.batch(batch_size).make_one_shot_iterator() + images, labels = iterator.get_next() + + return {PREDICT_INPUTS: images}, labels + + +def _train_preprocess_fn(image, label): + """Preprocess a single training image of layout [height, width, depth].""" + # Resize the image to add four extra pixels on each side. + image = tf.image.resize_image_with_crop_or_pad(image, HEIGHT + 8, WIDTH + 8) + + # Randomly crop a [HEIGHT, WIDTH] section of the image. + image = tf.random_crop(image, [HEIGHT, WIDTH, DEPTH]) + + # Randomly flip the image horizontally. + image = tf.image.random_flip_left_right(image) + + return image, label + + +def _dataset_parser(value): + """Parse a CIFAR-10 record from value.""" + # Every record consists of a label followed by the image, with a fixed number + # of bytes for each. + label_bytes = 1 + image_bytes = HEIGHT * WIDTH * DEPTH + record_bytes = label_bytes + image_bytes + + # Convert from a string to a vector of uint8 that is record_bytes long. + raw_record = tf.decode_raw(value, tf.uint8) + + # The first byte represents the label, which we convert from uint8 to int32. + label = tf.cast(raw_record[0], tf.int32) + + # The remaining bytes after the label represent the image, which we reshape + # from [depth * height * width] to [depth, height, width]. + depth_major = tf.reshape(raw_record[label_bytes:record_bytes], + [DEPTH, HEIGHT, WIDTH]) + + # Convert from [depth, height, width] to [height, width, depth], and cast as + # float32. + image = tf.cast(tf.transpose(depth_major, [1, 2, 0]), tf.float32) + + return image, tf.one_hot(label, NUM_CLASSES) + + +def _record_dataset(filenames): + """Returns an input pipeline Dataset from `filenames`.""" + record_bytes = HEIGHT * WIDTH * DEPTH + 1 + return tf.data.FixedLengthRecordDataset(filenames, record_bytes) + + +def _filenames(mode, data_dir): + """Returns a list of filenames based on 'mode'.""" + data_dir = os.path.join(data_dir, 'cifar-10-batches-bin') + + assert os.path.exists(data_dir), ('Run cifar10_download_and_extract.py first ' + 'to download and extract the CIFAR-10 data.') + + if mode == tf.estimator.ModeKeys.TRAIN: + return [ + os.path.join(data_dir, 'data_batch_%d.bin' % i) + for i in range(1, NUM_DATA_BATCHES + 1) + ] + elif mode == tf.estimator.ModeKeys.EVAL: + return [os.path.join(data_dir, 'test_batch.bin')] + else: + raise ValueError('Invalid mode: %s' % mode) \ No newline at end of file diff --git a/tests/integ/test_tf_keras.py b/tests/integ/test_tf_keras.py new file mode 100644 index 0000000000..37880dbe72 --- /dev/null +++ b/tests/integ/test_tf_keras.py @@ -0,0 +1,49 @@ +# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os + +import numpy as np +import pytest + +from sagemaker.tensorflow import TensorFlow +from tests.integ import DATA_DIR +from tests.integ.timeout import timeout_and_delete_endpoint_by_name, timeout + + +@pytest.mark.continuous_testing +def test_keras(sagemaker_session, tf_full_version): + script_path = os.path.join(DATA_DIR, 'cifar_10', 'source') + dataset_path = os.path.join(DATA_DIR, 'cifar_10', 'data') + + with timeout(minutes=45): + estimator = TensorFlow(entry_point='keras_cnn_cifar_10.py', + source_dir=script_path, + role='SageMakerRole', sagemaker_session=sagemaker_session, + hyperparameters={'learning_rate': 1e-4, 'decay': 1e-6}, + training_steps=500, evaluation_steps=5, + train_instance_count=1, train_instance_type='ml.c4.xlarge', + train_max_run=45 * 60) + + inputs = estimator.sagemaker_session.upload_data(path=dataset_path, key_prefix='data/cifar10') + + estimator.fit(inputs) + + endpoint_name = estimator.latest_training_job.name + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session): + predictor = estimator.deploy(initial_instance_count=1, instance_type='ml.p2.xlarge') + + data = np.random.randn(32, 32, 3) + predict_response = predictor.predict(data) + assert len(predict_response['outputs']['probabilities']['floatVal']) == 10