Skip to content

Add integ tests for tuning jobs #220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 8, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CHANGELOG
========

* bug-fix: Unit Tests: Improve unit test runtime
* bug-fix: Estimators: Fix attach for LDA

1.4.1
=====
Expand Down
4 changes: 3 additions & 1 deletion src/sagemaker/amazon/lda.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ def __init__(self, role, train_instance_type, num_topics,
tol (float): Optional. Target error tolerance for the ALS phase of the algorithm.
**kwargs: base class keyword argument values.
"""

# this algorithm only supports single instance training
if kwargs.pop('train_instance_count', 1) != 1:
print('LDA only supports single instance training. Defaulting to 1 {}.'.format(train_instance_type))

super(LDA, self).__init__(role, 1, train_instance_type, **kwargs)
self.num_topics = num_topics
self.alpha0 = alpha0
Expand Down
3 changes: 2 additions & 1 deletion tests/data/chainer_mnist/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
parser.add_argument('--epochs', type=int, default=20)
parser.add_argument('--frequency', type=int, default=20)
parser.add_argument('--batch-size', type=int, default=100)
parser.add_argument('--alpha', type=float, default=0.001)
parser.add_argument('--model-dir', type=str, default=env.model_dir)

parser.add_argument('--train', type=str, default=env.channel_input_dirs['train'])
Expand Down Expand Up @@ -103,7 +104,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
chainer.cuda.get_device_from_id(0).use()

# Setup an optimizer
optimizer = chainer.optimizers.Adam()
optimizer = chainer.optimizers.Adam(alpha=args.alpha)
optimizer.setup(model)

# Load the MNIST dataset
Expand Down
4 changes: 3 additions & 1 deletion tests/data/iris/iris-dnn-classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@


def estimator_fn(run_config, hyperparameters):
input_tensor_name = hyperparameters['input_tensor_name']
input_tensor_name = hyperparameters.get('input_tensor_name', 'inputs')
learning_rate = hyperparameters.get('learning_rate', 0.05)
feature_columns = [tf.feature_column.numeric_column(input_tensor_name, shape=[4])]
return tf.estimator.DNNClassifier(feature_columns=feature_columns,
hidden_units=[10, 20, 10],
optimizer=tf.train.AdagradOptimizer(learning_rate=learning_rate),
n_classes=3,
config=run_config)

Expand Down
300 changes: 256 additions & 44 deletions tests/integ/test_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,178 @@
import os
import pickle
import sys
import time

import numpy as np
import pytest

from sagemaker import LDA, RandomCutForest
from sagemaker.amazon.common import read_records
from sagemaker.amazon.kmeans import KMeans
from sagemaker.chainer import Chainer
from sagemaker.mxnet.estimator import MXNet
from sagemaker.tensorflow import TensorFlow
from sagemaker.tuner import IntegerParameter, ContinuousParameter, CategoricalParameter, HyperparameterTuner
from tests.integ import DATA_DIR
from tests.integ.timeout import timeout


@pytest.mark.skip(reason='functionality is not ready yet')
def test_fit_1p(sagemaker_session):
data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz')
pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'}

# Load the data into memory as numpy arrays
with gzip.open(data_path, 'rb') as f:
train_set, _, _ = pickle.load(f, **pickle_args)

kmeans = KMeans(role='SageMakerRole', train_instance_count=1,
train_instance_type='ml.c4.xlarge',
k=10, sagemaker_session=sagemaker_session, base_job_name='tk',
output_path='s3://{}/'.format(sagemaker_session.default_bucket()))

# set kmeans specific hp
kmeans.init_method = 'random'
kmeans.max_iterators = 1
kmeans.tol = 1
kmeans.num_trials = 1
kmeans.local_init_method = 'kmeans++'
kmeans.half_life_time_size = 1
kmeans.epochs = 1

records = kmeans.record_set(train_set[0][:100])
test_records = kmeans.record_set(train_set[0][:100], channel='test')

# specify which hp you want to optimize over
hyperparameter_ranges = {'extra_center_factor': IntegerParameter(1, 10),
'mini_batch_size': IntegerParameter(10, 100),
'epochs': IntegerParameter(1, 2),
'init_method': CategoricalParameter(['kmeans++', 'random'])}
objective_metric_name = 'test:msd'

tuner = HyperparameterTuner(estimator=kmeans, objective_metric_name=objective_metric_name,
hyperparameter_ranges=hyperparameter_ranges, objective_type='Minimize', max_jobs=2,
from tests.integ.record_set import prepare_record_set_from_local_files
from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name

DATA_PATH = os.path.join(DATA_DIR, 'iris', 'data')


@pytest.mark.continuous_testing
def test_tuning_kmeans(sagemaker_session):
with timeout(minutes=20):
data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz')
pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'}

# Load the data into memory as numpy arrays
with gzip.open(data_path, 'rb') as f:
train_set, _, _ = pickle.load(f, **pickle_args)

kmeans = KMeans(role='SageMakerRole', train_instance_count=1,
train_instance_type='ml.c4.xlarge',
k=10, sagemaker_session=sagemaker_session, base_job_name='tk',
output_path='s3://{}/'.format(sagemaker_session.default_bucket()))

# set kmeans specific hp
kmeans.init_method = 'random'
kmeans.max_iterators = 1
kmeans.tol = 1
kmeans.num_trials = 1
kmeans.local_init_method = 'kmeans++'
kmeans.half_life_time_size = 1
kmeans.epochs = 1

records = kmeans.record_set(train_set[0][:100])
test_records = kmeans.record_set(train_set[0][:100], channel='test')

# specify which hp you want to optimize over
hyperparameter_ranges = {'extra_center_factor': IntegerParameter(1, 10),
'mini_batch_size': IntegerParameter(10, 100),
'epochs': IntegerParameter(1, 2),
'init_method': CategoricalParameter(['kmeans++', 'random'])}
objective_metric_name = 'test:msd'

tuner = HyperparameterTuner(estimator=kmeans, objective_metric_name=objective_metric_name,
hyperparameter_ranges=hyperparameter_ranges, objective_type='Minimize', max_jobs=2,
max_parallel_jobs=2)

tuner.fit([records, test_records])

print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name)

time.sleep(15)
tuner.wait()

best_training_job = tuner.best_training_job()
with timeout_and_delete_endpoint_by_name(best_training_job, sagemaker_session):
predictor = tuner.deploy(1, 'ml.c4.xlarge')
result = predictor.predict(train_set[0][:10])

assert len(result) == 10
for record in result:
assert record.label['closest_cluster'] is not None
assert record.label['distance_to_cluster'] is not None


def test_tuning_lda(sagemaker_session):
with timeout(minutes=20):
data_path = os.path.join(DATA_DIR, 'lda')
data_filename = 'nips-train_1.pbr'

with open(os.path.join(data_path, data_filename), 'rb') as f:
all_records = read_records(f)

# all records must be same
feature_num = int(all_records[0].features['values'].float32_tensor.shape[0])

lda = LDA(role='SageMakerRole', train_instance_type='ml.c4.xlarge', num_topics=10,
sagemaker_session=sagemaker_session, base_job_name='test-lda')

record_set = prepare_record_set_from_local_files(data_path, lda.data_location,
len(all_records), feature_num, sagemaker_session)
test_record_set = prepare_record_set_from_local_files(data_path, lda.data_location,
len(all_records), feature_num, sagemaker_session)
test_record_set.channel = 'test'

# specify which hp you want to optimize over
hyperparameter_ranges = {'alpha0': ContinuousParameter(1, 10),
'num_topics': IntegerParameter(1, 2)}
objective_metric_name = 'test:pwll'

tuner = HyperparameterTuner(estimator=lda, objective_metric_name=objective_metric_name,
hyperparameter_ranges=hyperparameter_ranges, objective_type='Maximize', max_jobs=2,
max_parallel_jobs=2)

tuner.fit([record_set, test_record_set], mini_batch_size=1)

print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name)

time.sleep(15)
tuner.wait()

best_training_job = tuner.best_training_job()
with timeout_and_delete_endpoint_by_name(best_training_job, sagemaker_session):
predictor = tuner.deploy(1, 'ml.c4.xlarge')
predict_input = np.random.rand(1, feature_num)
result = predictor.predict(predict_input)

assert len(result) == 1
for record in result:
assert record.label['topic_mixture'] is not None


@pytest.mark.continuous_testing
def test_stop_tuning_job(sagemaker_session):
feature_num = 14
train_input = np.random.rand(1000, feature_num)

rcf = RandomCutForest(role='SageMakerRole', train_instance_count=1, train_instance_type='ml.c4.xlarge',
num_trees=50, num_samples_per_tree=20, sagemaker_session=sagemaker_session,
base_job_name='test-randomcutforest')

records = rcf.record_set(train_input)
records.distribution = 'FullyReplicated'

test_records = rcf.record_set(train_input, channel='test')
test_records.distribution = 'FullyReplicated'

hyperparameter_ranges = {'num_trees': IntegerParameter(50, 100),
'num_samples_per_tree': IntegerParameter(1, 2)}

objective_metric_name = 'test:f1'
tuner = HyperparameterTuner(estimator=rcf, objective_metric_name=objective_metric_name,
hyperparameter_ranges=hyperparameter_ranges, objective_type='Maximize', max_jobs=2,
max_parallel_jobs=2)

tuner.fit([records, test_records])

print('Started HPO job with name:' + tuner.latest_tuning_job.name)
time.sleep(15)

latest_tuning_job_name = tuner.latest_tuning_job.name

print('Attempting to stop {}'.format(latest_tuning_job_name))

tuner.stop_tuning_job()

desc = tuner.latest_tuning_job.sagemaker_session.sagemaker_client\
.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=latest_tuning_job_name)
assert desc['HyperParameterTuningJobStatus'] == 'Stopping'

@pytest.mark.skip(reason='functionality is not ready yet')
def test_mxnet_tuning(sagemaker_session, mxnet_full_version):

@pytest.mark.continuous_testing
def test_tuning_mxnet(sagemaker_session):
with timeout(minutes=15):
script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'tuning.py')
data_path = os.path.join(DATA_DIR, 'mxnet_mnist')

estimator = MXNet(entry_point=script_path,
role='SageMakerRole',
framework_version=mxnet_full_version,
train_instance_count=1,
train_instance_type='ml.m4.xlarge',
sagemaker_session=sagemaker_session,
base_job_name='hpo')
base_job_name='tune-mxnet')

hyperparameter_ranges = {'learning_rate': ContinuousParameter(0.01, 0.2)}
objective_metric_name = 'Validation-accuracy'
Expand All @@ -94,4 +201,109 @@ def test_mxnet_tuning(sagemaker_session, mxnet_full_version):
key_prefix='integ-test-data/mxnet_mnist/test')
tuner.fit({'train': train_input, 'test': test_input})

print('tuning job successfully created: {}'.format(tuner.latest_tuning_job.name))
print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name)

time.sleep(15)
tuner.wait()

best_training_job = tuner.best_training_job()
with timeout_and_delete_endpoint_by_name(best_training_job, sagemaker_session):
predictor = tuner.deploy(1, 'ml.c4.xlarge')
data = np.zeros(shape=(1, 1, 28, 28))
predictor.predict(data)


@pytest.mark.continuous_testing
def test_tuning_tf(sagemaker_session):
with timeout(minutes=15):
script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py')

estimator = TensorFlow(entry_point=script_path,
role='SageMakerRole',
training_steps=1,
evaluation_steps=1,
hyperparameters={'input_tensor_name': 'inputs'},
train_instance_count=1,
train_instance_type='ml.c4.xlarge',
sagemaker_session=sagemaker_session,
base_job_name='tune-tf')

inputs = sagemaker_session.upload_data(path=DATA_PATH, key_prefix='integ-test-data/tf_iris')
hyperparameter_ranges = {'learning_rate': ContinuousParameter(0.05, 0.2)}

objective_metric_name = 'loss'
metric_definitions = [{'Name': 'loss', 'Regex': 'loss=([0-9\\.]+)'}]

tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, metric_definitions,
objective_type='Minimize', max_jobs=2, max_parallel_jobs=2)

tuner.fit(inputs)

print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name)

time.sleep(15)
tuner.wait()

best_training_job = tuner.best_training_job()
with timeout_and_delete_endpoint_by_name(best_training_job, sagemaker_session):
predictor = tuner.deploy(1, 'ml.c4.xlarge')

features = [6.4, 3.2, 4.5, 1.5]
dict_result = predictor.predict({'inputs': features})
print('predict result: {}'.format(dict_result))
list_result = predictor.predict(features)
print('predict result: {}'.format(list_result))

assert dict_result == list_result


@pytest.mark.continuous_testing
def test_tuning_chainer(sagemaker_session):
with timeout(minutes=15):
script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py')
data_path = os.path.join(DATA_DIR, 'chainer_mnist')

estimator = Chainer(entry_point=script_path,
role='SageMakerRole',
train_instance_count=1,
train_instance_type='ml.c4.xlarge',
sagemaker_session=sagemaker_session,
hyperparameters={'epochs': 1})

train_input = estimator.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'),
key_prefix='integ-test-data/chainer_mnist/train')
test_input = estimator.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'),
key_prefix='integ-test-data/chainer_mnist/test')

hyperparameter_ranges = {'alpha': ContinuousParameter(0.001, 0.005)}

objective_metric_name = 'Validation-accuracy'
metric_definitions = [
{'Name': 'Validation-accuracy', 'Regex': '\[J1\s+\d\.\d+\s+\d\.\d+\s+\d\.\d+\s+(\d\.\d+)'}]

tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, metric_definitions,
max_jobs=2, max_parallel_jobs=2)

tuner.fit({'train': train_input, 'test': test_input})

print('Started hyperparameter tuning job with name:' + tuner.latest_tuning_job.name)

time.sleep(15)
tuner.wait()

best_training_job = tuner.best_training_job()
with timeout_and_delete_endpoint_by_name(best_training_job, sagemaker_session):
predictor = tuner.deploy(1, 'ml.c4.xlarge')

batch_size = 100
data = np.zeros((batch_size, 784), dtype='float32')
output = predictor.predict(data)
assert len(output) == batch_size

data = np.zeros((batch_size, 1, 28, 28), dtype='float32')
output = predictor.predict(data)
assert len(output) == batch_size

data = np.zeros((batch_size, 28, 28), dtype='float32')
output = predictor.predict(data)
assert len(output) == batch_size